How to do it...

  1. Start the Spark shell or Databricks Cloud Scala notebook:
        $ spark-shell
  1. Load the reviews as a DataFrame:
        scala> val reviews = spark.read.format("json").option
("inferschema","true").load("s3a://sparkcookbook/yelpdata/
yelp_academic_dataset_review.json").withColumn("rtype",
($"type")).drop("type")
  1. Print the schema:
        scala> reviews.printSchema
root
|-- business_id: string (nullable = true)
|-- cool: long (nullable = true)
|-- date: string (nullable = true)
|-- funny: long (nullable = true)
|-- review_id: string (nullable = true)
|-- stars: long (nullable = true)
|-- text: string (nullable = true)
|-- useful: long (nullable = true)
|-- user_id: string (nullable = true)
|-- rtype: string (nullable = true)
  1. Create the Review case class:
        scala> case class Review(business_id:String,cool:Long,date:
String,funny:Long,review_id:String,stars:
Long,text:String,rtype:String,useful:Long,user_id:String)
  1. Convert the review DF into a DataSet:
        scala> val reviewsDS = reviews.as[Review]
  1. Create the BusinessReviews case class to represent all the reviews for a given business:
        scala> case class BusinessReviews(business_id:String, 
reviews:List[Review])
  1. Create the nesting function:
       scala> val nestingFunction = (key: String, iter: 
Iterator[Review]) => BusinessReviews(business_id=key,
reviews=iter.toList)
  1. Group reviews by business_id and convert it into a nested format:
         scala> val nestedreviews = reviewDS.groupByKey(_.business_id)
.mapGroups((key,iter) => nestingFunction(key,iter))
  1. Save nestedreviews to the s3 bucket (change it to the appropriate bucket, otherwise the following command will fail):
        scala> nestedreviews.write.json("s3a://io.playground/nested")
  1. Load the bucket:
        scala> val r = spark.read.format("json").load
("s3a://io.playground/nested")
  1. Print the schema:
        scala> r.printSchema
root
|-- business_id: string (nullable = true)
|-- reviews: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- business_id: string (nullable = true)
| | |-- cool: long (nullable = true)
| | |-- date: string (nullable = true)
| | |-- funny: long (nullable = true)
| | |-- review_id: string (nullable = true)
| | |-- rtype: string (nullable = true)
| | |-- stars: long (nullable = true)
| | |-- text: string (nullable = true)
| | |-- useful: long (nullable = true)
| | |-- user_id: string (nullable = true)
  1. Import SQL functions, such as explode:
         scala> import org.apache.spark.sql.functions._
  1. Use the explode function so that business_id is repeated for every review:
          scala> val exploded = n.select($"business_id", 
explode($"reviews"))
  1. Display the exploded DataFrame:
         scala> display(exploded)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset