In the previous recipe, we saw how to create a DataFrame. The next natural step, after creating DataFrames, is to play with the data inside them. Other than the numerous functions that help us to do that, we also find other interesting functions that help us sample the data, print the schema of the data, and so on. We'll take a look at them one by one in this recipe.
The code and the sample file for this recipe could be found at https://github.com/arunma/ScalaDataAnalysisCookbook/blob/master/chapter1-spark-csv/src/main/scala/com/packt/scaladata/spark/csv/DataFrameCSV.scala.
Now, let's see how we can manipulate DataFrames using the following subrecipes:
After creating the DataFrame from various sources, we would obviously want to quickly check its schema. The printSchema
function lets us do just that. It prints our column names and the data types to the default output stream:
StudentData.csv
file://Now, lets load our pipe-separated file //students is of type org.apache.spark.sql.DataFrame val students=sqlContext.csvFile(filePath="StudentData.csv", useHeader=true, delimiter='|')
students.printSchema
Output
root |-- id: string (nullable = true) |-- studentName: string (nullable = true) |-- phone: string (nullable = true) |-- email: string (nullable = true)
The next logical thing that we would like to do is to check whether our data got loaded into the DataFrame correctly. There are a few ways of sampling the data in the newly created DataFrame:
show
method. This is the simplest way. There are two variants of the show
method, as explained here:The distinct quality about the show
method as compared to the other functions that sample data is that it displays the rows along with the headers and prints the output directly to the default output stream (console):
//Sample n records along with headers students.show (3) //Sample 20 records along with headers students.show () //Output of show(3) +--+-----------+--------------+--------------------+ |id|studentName| phone| email| +--+-----------+--------------+--------------------+ | 1| Burke|1-300-746-8446|ullamcorper.velit...| | 2| Kamal|1-668-571-5046|pede.Suspendisse@...| | 3| Olga|1-956-311-1686|Aenean.eget.metus...| +--+-----------+--------------+--------------------+
head
method. This method also accepts an integer parameter representing the number of rows to be fetched. The head
method returns an array of rows. To print these rows, we can pass the println
method to the foreach
function of the arrays://Sample the first 5 records students.head(5).foreach(println)
If you are not a great fan of head
, you can use the take
function, which is common across all Scala sequences. The take
method is just an alias of the head
method and delegates all its calls to head
:
//Alias of head students.take(5).foreach(println) //Output [1,Burke,1-300-746-8446,[email protected]] [2,Kamal,1-668-571-5046,[email protected]] [3,Olga,1-956-311-1686,[email protected]] [4,Belle,1-246-894-6340,[email protected]] [5,Trevor,1-300-527-4967,[email protected]]
As you have seen, all DataFrame columns have names. The select
function helps us pick and choose specific columns from a previously existing DataFrame and form a completely new one out of it:
email
column from a DataFrame. Since DataFrames are immutable, the selection returns a new DataFrame: val emailDataFrame:DataFrame=students.select("email")
Now, we have a new DataFrame called emailDataFrame
, which has only the e-mail as its contents. Let's sample and check whether that is true:
emailDataFrame.show(3) //Output +--------------------+ | email| +--------------------+ |ullamcorper.velit...| |pede.Suspendisse@...| |Aenean.eget.metus...| +--------------------+
select
function actually accepts an arbitrary number of column names, which means that you can easily select more than one column from your source DataFrame:val studentEmailDF=students.select("studentName", "email")
Let's sample and check whether we have indeed selected the studentName
and email
columns in the new DataFrame:
studentEmailDF.show(3)
Output
+-----------+--------------------+ |studentName| email| +-----------+--------------------+ | Burke|ullamcorper.velit...| | Kamal|pede.Suspendisse@...| | Olga|Aenean.eget.metus...| +-----------+--------------------+
Now that we have seen how to select columns from a DataFrame, let's see how to filter the rows of a DataFrame based on conditions. For row-based filtering, we can treat the DataFrame as a normal Scala collection and filter the data based on a condition. In all of these examples, I have added the show
method at the end for clarity:
//Print the first 5 records that has student id more than 5 students.filter("id > 5").show(7)
Output
+--+-----------+--------------+--------------------+ |id|studentName| phone| email| +--+-----------+--------------+--------------------+ | 6| Laurel|1-691-379-9921|adipiscing@consec...| | 7| Sara|1-608-140-1995|Donec.nibh@enimEt...| | 8| Kaseem|1-881-586-2689|cursus.et.magna@e...| | 9| Lev|1-916-367-5608|Vivamus.nisi@ipsu...| |10| Maya|1-271-683-2698|accumsan.convalli...| |11| Emi|1-467-270-1337| [email protected]| |12| Caleb|1-683-212-0896|Suspendisse@Quisq...| +--+-----------+--------------+--------------------+
Notice that even though the id
field is inferenced as a String type, it does the numerical comparison correctly. On the other hand, students.filter("email > 'c'")
would give back all the e-mail IDs that start with a character greater than 'c'
.
students.filter("studentName =''").show(7)
Output
+--+-----------+--------------+--------------------+ |id|studentName| phone| email| +--+-----------+--------------+--------------------+ |21| |1-598-439-7549|consectetuer.adip...| |32| |1-184-895-9602|accumsan.laoreet@...| |45| |1-245-752-0481|Suspendisse.eleif...| |83| |1-858-810-2204|sociis.natoque@eu...| |94| |1-443-410-7878|Praesent.eu.nulla...| +--+-----------+--------------+--------------------+
NULL
string value:students.filter("studentName ='' OR studentName = 'NULL'").show(7)
Output
+--+-----------+--------------+--------------------+ |id|studentName| phone| email| +--+-----------+--------------+--------------------+ |21| |1-598-439-7549|consectetuer.adip...| |32| |1-184-895-9602|accumsan.laoreet@...| |33| NULL|1-105-503-0141|[email protected]| |45| |1-245-752-0481|Suspendisse.eleif...| |83| |1-858-810-2204|sociis.natoque@eu...| |94| |1-443-410-7878|Praesent.eu.nulla...| +--+-----------+--------------+--------------------+
We are just limiting the output to seven records using the show(7)
function.
This filter gets the entries of all students whose names start with the letter 'M'
.
students.filter("SUBSTR(studentName,0,1) ='M'").show(7)
Output
+--+-----------+--------------+--------------------+ |id|studentName| phone| email| +--+-----------+--------------+--------------------+ |10| Maya|1-271-683-2698|accumsan.convalli...| |19| Malachi|1-608-637-2772|Proin.mi.Aliquam@...| |24| Marsden|1-477-629-7528|Donec.dignissim.m...| |37| Maggy|1-910-887-6777|facilisi.Sed.nequ...| |61| Maxine|1-422-863-3041|aliquet.molestie....| |77| Maggy|1-613-147-4380| [email protected]| |97| Maxwell|1-607-205-1273|metus.In@musAenea...| +--+-----------+--------------+--------------------+
Using the sort
function, we can order the DataFrame by a particular column:
students.sort(students("studentName").desc).show(7)
Output
+--+-----------+--------------+--------------------+ |id|studentName| phone| email| +--+-----------+--------------+--------------------+ |50| Yasir|1-282-511-4445|eget.odio.Aliquam...| |52| Xena|1-527-990-8606|in.faucibus.orci@...| |86| Xandra|1-677-708-5691|libero@arcuVestib...| |43| Wynter|1-440-544-1851|amet.risus.Donec@...| |31| Wallace|1-144-220-8159| [email protected]| |66| Vance|1-268-680-0857|pellentesque@netu...| |41| Tyrone|1-907-383-5293|non.bibendum.sed@...| | 5| Trevor|1-300-527-4967|dapibus.id@acturp...| |65| Tiger|1-316-930-7880|nec@mollisnoncurs...| |15| Tarik|1-398-171-2268|[email protected]| +--+-----------+--------------+--------------------+
students.sort("studentName", "id").show(10)
Output
+--+-----------+--------------+--------------------+ |id|studentName| phone| email| +--+-----------+--------------+--------------------+ |21| |1-598-439-7549|consectetuer.adip...| |32| |1-184-895-9602|accumsan.laoreet@...| |45| |1-245-752-0481|Suspendisse.eleif...| |83| |1-858-810-2204|sociis.natoque@eu...| |94| |1-443-410-7878|Praesent.eu.nulla...| |91| Abel|1-530-527-7467| [email protected]| |69| Aiko|1-682-230-7013|turpis.vitae.puru...| |47| Alma|1-747-382-6775| [email protected]| |26| Amela|1-526-909-2605| [email protected]| |16| Amena|1-878-250-3129|lorem.luctus.ut@s...| +--+-----------+--------------+--------------------+
Alternatively, the orderBy
alias of the sort
function can be used to achieve this. Also, multiple column orders could be specified using the DataFrame's apply
method:
students.sort(students("studentName").desc, students("id").asc).show(10)
If we don't like the column names of the source DataFrame and wish to change them to something nice and meaningful, we can do that using the as
function while selecting the columns.
In this example, we rename the "studentName"
column to "name"
and retain the "email"
column's name as is:
val copyOfStudents=students.select(students("studentName").as("name"), students("email")) copyOfStudents.show()
Output
+--------+--------------------+ | name| email| +--------+--------------------+ | Burke|ullamcorper.velit...| | Kamal|pede.Suspendisse@...| | Olga|Aenean.eget.metus...| | Belle|vitae.aliquet.nec...| | Trevor|dapibus.id@acturp...| | Laurel|adipiscing@consec...| | Sara|Donec.nibh@enimEt...|
The real power of DataFrames lies in the fact that we can treat it like a relational table and use SQL to query. This involves two simple steps:
students
DataFrame as a table with the name "students"
(or any other name):students.registerTempTable("students")
val dfFilteredBySQL=sqlContext.sql("select * from students where studentName!='' order by email desc") dfFilteredBySQL.show(7) id studentName phone email 87 Selma 1-601-330-4409 vulputate.velit@p 96 Channing 1-984-118-7533 viverra.Donec.tem 4 Belle 1-246-894-6340 vitae.aliquet.nec 78 Finn 1-213-781-6969 vestibulum.massa@ 53 Kasper 1-155-575-9346 velit.eget@pedeCu 63 Dylan 1-417-943-8961 vehicula.aliquet@ 35 Cadman 1-443-642-5919 ut.lacus@adipisci
Now that we have seen how to register a DataFrame as a table, let's see how to perform SQL-like join operations on DataFrames.
An inner join is the default join and it just gives those results that are matching on both DataFrames when a condition is given:
val students1=sqlContext.csvFile(filePath="StudentPrep1.csv", useHeader=true, delimiter='|') val students2=sqlContext.csvFile(filePath="StudentPrep2.csv", useHeader=true, delimiter='|') val studentsJoin=students1.join(students2, students1("id")===students2("id")) studentsJoin.show(studentsJoin.count.toInt)
A right outer join shows all the additional unmatched rows that are available in the right-hand-side DataFrame. We can see from the following output that the entry with ID 999 from the right- hand-side DataFrame is now shown:
val studentsRightOuterJoin=students1.join(students2, students1("id")===students2("id"), "right_outer") studentsRightOuterJoin.show(studentsRightOuterJoin.count.toInt)
Similar to a right outer join, a left outer join returns not only the matching rows, but also the additional unmatched rows of the left-hand-side DataFrame:
val studentsLeftOuterJoin=students1.join(students2, students1("id")===students2("id"), "left_outer") studentsLeftOuterJoin.show(studentsLeftOuterJoin.count.toInt)
As the next step, let's save a DataFrame in a file store. The load
function, which we used in an earlier recipe, has a similar-looking counterpart called save
.
This involves two steps:
save
method to use. In this case, we specify the filename and ask it to have a header:val options=Map("header"->"true", "path"->"ModifiedStudent.csv")
To keep it interesting, let's choose column names from the source DataFrame. In this example, we pick the studentName
and email
columns and change the studentName
column's name to just name
.
val copyOfStudents=students.select(students("studentName").as("name"), students("email"))
ModifiedStudent.csv
:copyOfStudents.save("com.databricks.spark.csv", SaveMode.Overwrite, options)
The second argument is a little interesting. We can choose Overwrite
(as we did here), Append
, Ignore
, or ErrorIfExists
. Overwrite
— as the name implies—overwrites the file if it already exists, Ignore
ignores writing if the file exists, ErrorIfExists
complains for pre-existence of the file, and Append
continues writing from the last edit location. Throwing an error is the default behavior.
The output of the save
method looks like this: