- Start the Spark shell or Databricks Cloud Scala notebook:
$ spark-shell
- Load the reviews as a DataFrame:
scala> val reviews = spark.read.format("json").option
("inferschema","true").load("s3a://sparkcookbook/yelpdata/
yelp_academic_dataset_review.json").withColumn("rtype",
($"type")).drop("type")
- Print the schema:
scala> reviews.printSchema
root
|-- business_id: string (nullable = true)
|-- cool: long (nullable = true)
|-- date: string (nullable = true)
|-- funny: long (nullable = true)
|-- review_id: string (nullable = true)
|-- stars: long (nullable = true)
|-- text: string (nullable = true)
|-- useful: long (nullable = true)
|-- user_id: string (nullable = true)
|-- rtype: string (nullable = true)
- Create the Review case class:
scala> case class Review(business_id:String,cool:Long,date:
String,funny:Long,review_id:String,stars:
Long,text:String,rtype:String,useful:Long,user_id:String)
- Convert the review DF into a DataSet:
scala> val reviewsDS = reviews.as[Review]
- Create the BusinessReviews case class to represent all the reviews for a given business:
scala> case class BusinessReviews(business_id:String,
reviews:List[Review])
- Create the nesting function:
scala> val nestingFunction = (key: String, iter:
Iterator[Review]) => BusinessReviews(business_id=key,
reviews=iter.toList)
- Group reviews by business_id and convert it into a nested format:
scala> val nestedreviews = reviewDS.groupByKey(_.business_id)
.mapGroups((key,iter) => nestingFunction(key,iter))
- Save nestedreviews to the s3 bucket (change it to the appropriate bucket, otherwise the following command will fail):
scala> nestedreviews.write.json("s3a://io.playground/nested")
- Load the bucket:
scala> val r = spark.read.format("json").load
("s3a://io.playground/nested")
- Print the schema:
scala> r.printSchema
root
|-- business_id: string (nullable = true)
|-- reviews: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- business_id: string (nullable = true)
| | |-- cool: long (nullable = true)
| | |-- date: string (nullable = true)
| | |-- funny: long (nullable = true)
| | |-- review_id: string (nullable = true)
| | |-- rtype: string (nullable = true)
| | |-- stars: long (nullable = true)
| | |-- text: string (nullable = true)
| | |-- useful: long (nullable = true)
| | |-- user_id: string (nullable = true)
- Import SQL functions, such as explode:
scala> import org.apache.spark.sql.functions._
- Use the explode function so that business_id is repeated for every review:
scala> val exploded = n.select($"business_id",
explode($"reviews"))
- Display the exploded DataFrame:
scala> display(exploded)