Joining DataFrames together

So far, we have only considered operations on a single DataFrame. Spark also offers SQL-like joins to combine DataFrames. Let's assume that we have another DataFrame mapping the patient id to a (systolic) blood pressure measurement. We will assume we have the data as a list of pairs mapping patient IDs to blood pressures:

scala> val bloodPressures = List((1 -> 110), (3 -> 100), (4 -> 125))
bloodPressures: List[(Int, Int)] = List((1,110), (3,100), (4,125))

scala> val bloodPressureRDD = sc.parallelize(bloodPressures)
res16: rdd.RDD[(Int, Int)] = ParallelCollectionRDD[74] at parallelize at <console>:24

We can construct a DataFrame from this RDD of tuples. However, unlike when constructing DataFrames from RDDs of case classes, Spark cannot infer column names. We must therefore pass these explicitly to .toDF:

scala> val bloodPressureDF = bloodPressureRDD.toDF(
  "patientId", "bloodPressure")
bloodPressureDF: DataFrame = [patientId: int, bloodPressure: int]

scala> bloodPressureDF.show
+---------+-------------+
|patientId|bloodPressure|
+---------+-------------+
|        1|          110|
|        3|          100|
|        4|          125|
+---------+-------------+

Let's join bloodPressureDF with readingsDF, using the patient ID as the join key:

scala> readingsDF.join(bloodPressureDF, 
  readingsDF("patientId") === bloodPressureDF("patientId")
).show
+---------+--------+--------+---+--------+---------+-------------+
|patientId|heightCm|weightKg|age|isSmoker|patientId|bloodPressure|
+---------+--------+--------+---+--------+---------+-------------+
|        1|     175|      72| 43|   false|        1|          110|
|        3|     164|      61| 41|   false|        3|          100|
|        4|     161|      62| 43|    true|        4|          125|
+---------+--------+--------+---+--------+---------+-------------+

This performs an inner join: only patient IDs present in both DataFrames are included in the result. The type of join can be passed as an extra argument to join. For instance, we can perform a left join:

scala> readingsDF.join(bloodPressureDF,
  readingsDF("patientId") === bloodPressureDF("patientId"),
  "leftouter"
).show
+---------+--------+--------+---+--------+---------+-------------+
|patientId|heightCm|weightKg|age|isSmoker|patientId|bloodPressure|
+---------+--------+--------+---+--------+---------+-------------+
|        1|     175|      72| 43|   false|        1|          110|
|        2|     182|      78| 28|    true|     null|         null|
|        3|     164|      61| 41|   false|        3|          100|
|        4|     161|      62| 43|    true|        4|          125|
+---------+--------+--------+---+--------+---------+-------------+

Possible join types are inner, outer, leftouter, rightouter, or leftsemi. These should all be familiar, apart from leftsemi, which corresponds to a left semi join. This is the same as an inner join, but only the columns on the left-hand side are retained after the join. It is thus a way to filter a DataFrame for rows which are present in another DataFrame.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset