How to do it...

  1. Start the Spark shell or Databricks Cloud Scala notebook:
        $ spark-shell
  1. Load business data as a DataFrame:
        scala> val businesses = spark.read.format("json").load
("s3a://sparkcookbook/yelpdata/
yelp_academic_dataset_business.json")
  1. Print the schema:
         scala> businesses.printSchema
root
|-- address: string (nullable = true)
|-- attributes: array (nullable = true)
| |-- element: string (containsNull = true)
|-- business_id: string (nullable = true)
|-- categories: array (nullable = true)
| |-- element: string (containsNull = true)
|-- city: string (nullable = true)
|-- hours: array (nullable = true)
| |-- element: string (containsNull = true)
|-- is_open: long (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- name: string (nullable = true)
|-- neighborhood: string (nullable = true)
|-- postal_code: string (nullable = true)
|-- review_count: long (nullable = true)
|-- stars: double (nullable = true)
|-- state: string (nullable = true)
|-- type: string (nullable = true)
  1. Do the count:
         scala> businesses.count
Long = 144072
The size of the dataset helps us determine the join strategy. Here's an example: if one data size is significantly smaller than the other, we can use the broadcast join. 
  1. Load reviews as a DataFrame:
        scala> val reviews = spark.read.format("json").load
("s3a://sparkcookbook/yelpdata/
yelp_academic_dataset_review.json")
  1. Print the schema:
       scala> reviews.printSchema
root
|-- business_id: string (nullable = true)
|-- cool: long (nullable = true)
|-- date: string (nullable = true)
|-- funny: long (nullable = true)
|-- review_id: string (nullable = true)
|-- stars: long (nullable = true)
|-- text: string (nullable = true)
|-- type: string (nullable = true)
|-- useful: long (nullable = true)
|-- user_id: string (nullable = true)
  1. Count the number of reviews: 
       scala> reviews.count
Long = 4153150
  1. Load users as a DataFrame:
        scala> val users = spark.read.format("json").load
("s3a://sparkcookbook/yelpdata/
yelp_academic_dataset_user.json")
  1. Print the schema:
        scala> users.printSchema
root
|-- average_stars: double (nullable = true)
|-- compliment_cool: long (nullable = true)
|-- compliment_cute: long (nullable = true)
|-- compliment_funny: long (nullable = true)
|-- compliment_hot: long (nullable = true)
|-- compliment_list: long (nullable = true)
|-- compliment_more: long (nullable = true)
|-- compliment_note: long (nullable = true)
|-- compliment_photos: long (nullable = true)
|-- compliment_plain: long (nullable = true)
|-- compliment_profile: long (nullable = true)
|-- compliment_writer: long (nullable = true)
|-- cool: long (nullable = true)
|-- elite: array (nullable = true)
| |-- element: string (containsNull = true)
|-- fans: long (nullable = true)
|-- friends: array (nullable = true)
| |-- element: string (containsNull = true)
|-- funny: long (nullable = true)
|-- name: string (nullable = true)
|-- review_count: long (nullable = true)
|-- type: string (nullable = true)
|-- useful: long (nullable = true)
|-- user_id: string (nullable = true)
|-- yelping_since: string (nullable = true)
  1. Count the number of users: 
        scala> users.count
Long = 1029432
  1. Load tips as a DataFrame:
        scala> val tips = spark.read.format("json").load
("s3a://sparkcookbook/yelpdata/
yelp_academic_dataset_tip.json")
  1. Print the schema:
        scala> tips.printSchema
root
|-- business_id: string (nullable = true)
|-- date: string (nullable = true)
|-- likes: long (nullable = true)
|-- text: string (nullable = true)
|-- type: string (nullable = true)
|-- user_id: string (nullable = true)
  1. Count the tips
        scala> tips.count
Long = 946600
  1. Load reviews and tips as temporary views:
        scala> reviews.createOrReplaceTempView("reviews")
scala> tips.createOrReplaceTempView("tips")
  1. Do a Shuffle hash join on reviews and tips:
        scala> val reviews_tips = spark.sql("select * FROM reviews JOIN 
tips ON reviews.user_id = tips.user_id and
reviews.business_id = tips.business_id")
  1. Do a count on reviews_tips
        scala> reviews_tips.count
Long = 372288

This count is much lower than the individual DataFrame count in reviews and tips, and the reason behind this is that the businesses for which the user has provided tips need not be the same as the businesses for which reviews are written.

  1. What if we would like all the reviews, regardless of whether they have associated tips or not:
         scala> val all_reviews_tips = spark.sql("select * FROM reviews 
LEFT JOIN tips ON reviews.user_id = tips.user_id and
reviews.business_id = tips.business_id")
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset