So far, we have only considered operations on a single DataFrame. Spark also offers SQL-like joins to combine DataFrames. Let's assume that we have another DataFrame mapping the patient id to a (systolic) blood pressure measurement. We will assume we have the data as a list of pairs mapping patient IDs to blood pressures:
scala> val bloodPressures = List((1 -> 110), (3 -> 100), (4 -> 125)) bloodPressures: List[(Int, Int)] = List((1,110), (3,100), (4,125)) scala> val bloodPressureRDD = sc.parallelize(bloodPressures) res16: rdd.RDD[(Int, Int)] = ParallelCollectionRDD[74] at parallelize at <console>:24
We can construct a DataFrame from this RDD of tuples. However, unlike when constructing DataFrames from RDDs of case classes, Spark cannot infer column names. We must therefore pass these explicitly to .toDF
:
scala> val bloodPressureDF = bloodPressureRDD.toDF( "patientId", "bloodPressure") bloodPressureDF: DataFrame = [patientId: int, bloodPressure: int] scala> bloodPressureDF.show +---------+-------------+ |patientId|bloodPressure| +---------+-------------+ | 1| 110| | 3| 100| | 4| 125| +---------+-------------+
Let's join bloodPressureDF
with readingsDF
, using the patient ID as the join key:
scala> readingsDF.join(bloodPressureDF, readingsDF("patientId") === bloodPressureDF("patientId") ).show +---------+--------+--------+---+--------+---------+-------------+ |patientId|heightCm|weightKg|age|isSmoker|patientId|bloodPressure| +---------+--------+--------+---+--------+---------+-------------+ | 1| 175| 72| 43| false| 1| 110| | 3| 164| 61| 41| false| 3| 100| | 4| 161| 62| 43| true| 4| 125| +---------+--------+--------+---+--------+---------+-------------+
This performs an inner join: only patient IDs present in both DataFrames are included in the result. The type of join can be passed as an extra argument to join
. For instance, we can perform a left join:
scala> readingsDF.join(bloodPressureDF, readingsDF("patientId") === bloodPressureDF("patientId"), "leftouter" ).show +---------+--------+--------+---+--------+---------+-------------+ |patientId|heightCm|weightKg|age|isSmoker|patientId|bloodPressure| +---------+--------+--------+---+--------+---------+-------------+ | 1| 175| 72| 43| false| 1| 110| | 2| 182| 78| 28| true| null| null| | 3| 164| 61| 41| false| 3| 100| | 4| 161| 62| 43| true| 4| 125| +---------+--------+--------+---+--------+---------+-------------+
Possible join types are
inner
, outer
, leftouter
, rightouter
, or leftsemi
. These should all be familiar, apart from leftsemi
, which corresponds to a left semi join.
This is the same as an inner join, but only the columns on the left-hand side are retained after the join. It is thus a way to filter a DataFrame for rows which are present in another DataFrame.