Exploring data using Spark SQL

Spark SQL is a relational query engine built on top of Spark Core. Spark SQL uses a query optimizer called Catalyst.

Relational queries can be expressed using SQL or HiveQL and executed against JSON, CSV, and various databases. Spark SQL gives us the full expressiveness of declarative programing with Spark dataframes on top of functional programming with RDDs.

Understanding Spark dataframes

Here's a tweet from @bigdata announcing Spark 1.3.0, the advent of Spark SQL and dataframes. It also highlights the various data sources in the lower part of the diagram. On the top part, we can notice R as the new language that will be gradually supported on top of Scala, Java, and Python. Ultimately, the Data Frame philosophy is pervasive between R, Python, and Spark.

Spark dataframes originate from SchemaRDDs. It combines RDD with a schema that can be inferred by Spark, if requested, when registering the dataframe. It allows us to query complex nested JSON data with plain SQL. Lazy evaluation, lineage, partitioning, and persistence apply to dataframes.

Let's query the data with Spark SQL, by first importing SparkContext and SQLContext:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
In [95]:
<pyspark.context.SparkContext at 0x7f7829581890>
In [96]:
''In [98]:
# Instantiate Spark  SQL context
sqlc =  SQLContext(sc)

We read in the JSON file we saved with Odo:

twts_sql_df_01 = sqlc.jsonFile ("/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data/twtr15051401_distinct.json")
In [101]:
created_at           id                 tweet_text           user_id    user_name          
2015-05-14T12:43:57Z 598831111406510082 RT @pacoid: Great... 14755521   raulsaeztapia      
2015-05-14T11:15:52Z 598808944719593472 RT @alvaroagea: S... 14755521   raulsaeztapia      
2015-05-14T10:25:15Z 598796205091500032 RT @PrabhaGana: W... 48695135   John Humphreys     
2015-05-14T09:54:52Z 598788561127735296 RT @Ellen_Friedma... 2385931712 Leonardo D'Ambrosi
2015-05-14T09:42:53Z 598785545557438464 RT @Ellen_Friedma... 461020977  Alexey Kosenkov    
2015-05-14T09:32:39Z 598782970082807808 RT @BigDataTechCo... 1377652806 embeddedcomputer.nl
2015-05-14T09:12:38Z 598777933730160640 I'm still on Euro... 294862170  Ellen Friedman     

We print the schema of the Spark dataframe:

 |-- created_at: string (nullable = true)
 |-- id: long (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_name: string (nullable = true)

We select the user_name column from the dataframe:

John Humphreys     
Leonardo D'Ambrosi
Alexey Kosenkov    
Ellen Friedman     

We register the dataframe as a table, so we can execute a SQL query on it:


We execute a SQL statement against the dataframe:

twts_sql_df_01_selection = sqlc.sql("SELECT * FROM tweets_01 WHERE user_name = 'raulsaeztapia'")
In [109]:
created_at           id                 tweet_text           user_id  user_name    
2015-05-14T12:43:57Z 598831111406510082 RT @pacoid: Great... 14755521 raulsaeztapia
2015-05-14T11:15:52Z 598808944719593472 RT @alvaroagea: S... 14755521 raulsaeztapia

Let's process some more complex JSON; we read the original Twitter JSON file:

tweets_sqlc_inf = sqlc.jsonFile(infile)

Spark SQL is able to infer the schema of a complex nested JSON file:

 |-- contributors: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
... (snip) ...
|    |-- statuses_count: long (nullable = true)
 |    |-- time_zone: string (nullable = true)
 |    |-- url: string (nullable = true)
 |    |-- utc_offset: long (nullable = true)
 |    |-- verified: boolean (nullable = true)

We extract the key information of interest from the wall of data by selecting specific columns in the dataframe (in this case, ['created_at', 'id', 'text', 'user.id', 'user.name', 'entities.urls.expanded_url']):

tweets_extract_sqlc = tweets_sqlc_inf[['created_at', 'id', 'text', 'user.id', 'user.name', 'entities.urls.expanded_url']].distinct()
In [145]:
created_at           id                 text                 id         name                expanded_url        
Thu May 14 09:32:... 598782970082807808 RT @BigDataTechCo... 1377652806 embeddedcomputer.nl ArrayBuffer(http:...
Thu May 14 12:43:... 598831111406510082 RT @pacoid: Great... 14755521   raulsaeztapia       ArrayBuffer(http:...
Thu May 14 12:18:... 598824733086523393 @rabbitonweb spea... 

Thu May 14 12:28:... 598827171168264192 RT @baandrzejczak... 20909005   Paweł Szulc         ArrayBuffer()       

Understanding the Spark SQL query optimizer

We execute a SQL statement against the dataframe:

tweets_extract_sqlc_sel = sqlc.sql("SELECT * from Tweets_xtr_001 WHERE name='raulsaeztapia'")

We get a detailed view of the query plans executed by Spark SQL:

  • Parsed logical plan
  • Analyzed logical plan
  • Optimized logical plan
  • Physical plan

The query plan uses Spark SQL's Catalyst optimizer. In order to generate the compiled bytecode from the query parts, the Catalyst optimizer runs through logical plan parsing and optimization followed by physical plan evaluation and optimization based on cost.

This is illustrated in the following tweet:

Looking back at our code, we call the .explain function on the Spark SQL query we just executed, and it delivers the full details of the steps taken by the Catalyst optimizer in order to assess and optimize the logical plan and the physical plan and get to the result RDD:

tweets_extract_sqlc_sel.explain(extended = True)
== Parsed Logical Plan ==
'Project [*]
 'Filter ('name = raulsaeztapia)'name'  'UnresolvedRelation' [Tweets_xtr_001], None
== Analyzed Logical Plan ==
Project [created_at#7,id#12L,text#27,id#80L,name#81,expanded_url#82]
 Filter (name#81 = raulsaeztapia)
   Project [created_at#7,id#12L,text#27,user#29.id AS id#80L,user#29.name AS name#81,entities#8.urls.expanded_url AS expanded_url#82]
    Relation[contributors#5,coordinates#6,created_at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#23L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,user#29] JSONRelation(/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data/twtr15051401.json,1.0,None)
== Optimized Logical Plan ==
Filter (name#81 = raulsaeztapia)
  Project [created_at#7,id#12L,text#27,user#29.id AS id#80L,user#29.name AS name#81,entities#8.urls.expanded_url AS expanded_url#82]
   Relation[contributors#5,coordinates#6,created_at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#23L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,user#29] JSONRelation(/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data/twtr15051401.json,1.0,None)
== Physical Plan ==
Filter (name#81 = raulsaeztapia)
 Distinct false
  Exchange (HashPartitioning [created_at#7,id#12L,text#27,id#80L,name#81,expanded_url#82], 200)
   Distinct true
    Project [created_at#7,id#12L,text#27,user#29.id AS id#80L,user#29.name AS name#81,entities#8.urls.expanded_url AS expanded_url#82]
     PhysicalRDD [contributors#5,coordinates#6,created_at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#23L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,user#29], MapPartitionsRDD[165] at map at JsonRDD.scala:41
Code Generation: false
== RDD ==

Finally, here's the result of the query:

created_at           id                 text                 id       name          expanded_url        
Thu May 14 12:43:... 598831111406510082 RT @pacoid: Great... 14755521 raulsaeztapia ArrayBuffer(http:...
Thu May 14 11:15:... 598808944719593472 RT @alvaroagea: S... 14755521 raulsaeztapia ArrayBuffer(http:...
In [148]:

Loading and processing CSV files with Spark SQL

We will use the Spark package spark-csv_2.11:1.2.0. The command to be used to launch PySpark with the IPython Notebook and the spark-csv package should explicitly state the –packages argument:

$ IPYTHON_OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark --packages com.databricks:spark-csv_2.11:1.2.0

This will trigger the following output; we can see that the spark-csv package is installed with all its dependencies:

an@an-VB:~/spark/spark-1.5.0-bin-hadoop2.6/examples/AN_Spark$ IPYTHON_OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark --packages com.databricks:spark-csv_2.11:1.2.0
... (snip) ...
Ivy Default Cache set to: /home/an/.ivy2/cache
The jars for the packages stored in: /home/an/.ivy2/jars
:: loading settings :: url = jar:file:/home/an/spark/spark-1.5.0-bin-hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.databricks#spark-csv_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
  confs: [default]
  found com.databricks#spark-csv_2.11;1.2.0 in central
  found org.apache.commons#commons-csv;1.1 in central
  found com.univocity#univocity-parsers;1.5.1 in central
:: resolution report :: resolve 835ms :: artifacts dl 48ms
  :: modules in use:
  com.databricks#spark-csv_2.11;1.2.0 from central in [default]
  com.univocity#univocity-parsers;1.5.1 from central in [default]
  org.apache.commons#commons-csv;1.1 from central in [default]
  |               |          modules            ||   artifacts   |
  |    conf     | number| search|dwnlded|evicted|| number|dwnlded|
  |    default     |   3   |   0   |   0   |   0   ||   3   |   0   
:: retrieving :: org.apache.spark#spark-submit-parent
  confs: [default]
  0 artifacts copied, 3 already retrieved (0kB/45ms)

We are now ready to load our csv file and process it. Let's first import the SQLContext:

# Read csv in a Spark DF
sqlContext = SQLContext(sc)
spdf_in = sqlContext.read.format('com.databricks.spark.csv')

We access the schema of the dataframe created from the loaded csv:

In [10]:
 |-- : string (nullable = true)
 |-- id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- tweet_text: string (nullable = true)

We check the columns of the dataframe:

In [12]:
['', 'id', 'created_at', 'user_id', 'user_name', 'tweet_text']

We introspect the dataframe content:

In [13]:
|   |                id|          created_at|   user_id|         user_name|          tweet_text|
|  0|638830426971181057|Tue Sep 01 21:46:...|3276255125|     True Equality|ernestsgantt: Bey...|
|  1|638830426727911424|Tue Sep 01 21:46:...|3276255125|     True Equality|ernestsgantt: Bey...|
|  2|638830425402556417|Tue Sep 01 21:46:...|3276255125|     True Equality|ernestsgantt: Bey...|
... (snip) ...
| 41|638830280988426250|Tue Sep 01 21:46:...| 951081582|      Jack Baldwin|RT @cloudaus: We ...|
| 42|638830276626399232|Tue Sep 01 21:46:...|   6525302|Masayoshi Nakamura|PynamoDB使いやすいです  |
only showing top 20 rows

Querying MongoDB from Spark SQL

There are two major ways to interact with MongoDB from Spark: the first is through the Hadoop MongoDB connector, and the second one is directly from Spark to MongoDB.

The first approach to interact with MongoDB from Spark is to set up a Hadoop environment and query through the Hadoop MongoDB connector. The connector details are hosted on GitHub at https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage. An actual use case is described in the series of blog posts from MongoDB:

Setting up a full Hadoop environment is bit elaborate. We will favor the second approach. We will use the spark-mongodb connector developed and maintained by Stratio. We are using the Stratio spark-mongodb package hosted at spark.packages.org. The packages information and version can be found in spark.packages.org:



Version: 0.10.1 ( 8263c8 | zip | jar ) / Date: 2015-11-18 / License: Apache-2.0 / Scala version: 2.10


The command to launch PySpark with the IPython Notebook and the spark-mongodb package should explicitly state the packages argument:

$ IPYTHON_OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.10:0.10.1

This will trigger the following output; we can see that the spark-mongodb package is installed with all its dependencies:

an@an-VB:~/spark/spark-1.5.0-bin-hadoop2.6/examples/AN_Spark$ IPYTHON_OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.10:0.10.1
... (snip) ... 
Ivy Default Cache set to: /home/an/.ivy2/cache
The jars for the packages stored in: /home/an/.ivy2/jars
:: loading settings :: url = jar:file:/home/an/spark/spark-1.5.0-bin-hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.stratio.datasource#spark-mongodb_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
  confs: [default]
  found com.stratio.datasource#spark-mongodb_2.10;0.10.1 in central
[W 22:10:50.910 NotebookApp] Timeout waiting for kernel_info reply from 764081d3-baf9-4978-ad89-7735e6323cb6
  found org.mongodb#casbah-commons_2.10;2.8.0 in central
  found com.github.nscala-time#nscala-time_2.10;1.0.0 in central
  found joda-time#joda-time;2.3 in central
  found org.joda#joda-convert;1.2 in central
  found org.slf4j#slf4j-api;1.6.0 in central
  found org.mongodb#mongo-java-driver;2.13.0 in central
  found org.mongodb#casbah-query_2.10;2.8.0 in central
  found org.mongodb#casbah-core_2.10;2.8.0 in central
downloading https://repo1.maven.org/maven2/com/stratio/datasource/spark-mongodb_2.10/0.10.1/spark-mongodb_2.10-0.10.1.jar ...
  [SUCCESSFUL ] com.stratio.datasource#spark-mongodb_2.10;0.10.1!spark-mongodb_2.10.jar (3130ms)
downloading https://repo1.maven.org/maven2/org/mongodb/casbah-commons_2.10/2.8.0/casbah-commons_2.10-2.8.0.jar ...
  [SUCCESSFUL ] org.mongodb#casbah-commons_2.10;2.8.0!casbah-commons_2.10.jar (2812ms)
downloading https://repo1.maven.org/maven2/org/mongodb/casbah-query_2.10/2.8.0/casbah-query_2.10-2.8.0.jar ...
  [SUCCESSFUL ] org.mongodb#casbah-query_2.10;2.8.0!casbah-query_2.10.jar (1432ms)
downloading https://repo1.maven.org/maven2/org/mongodb/casbah-core_2.10/2.8.0/casbah-core_2.10-2.8.0.jar ...
  [SUCCESSFUL ] org.mongodb#casbah-core_2.10;2.8.0!casbah-core_2.10.jar (2785ms)
downloading https://repo1.maven.org/maven2/com/github/nscala-time/nscala-time_2.10/1.0.0/nscala-time_2.10-1.0.0.jar ...
  [SUCCESSFUL ] com.github.nscala-time#nscala-time_2.10;1.0.0!nscala-time_2.10.jar (2725ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.6.0/slf4j-api-1.6.0.jar ...
  [SUCCESSFUL ] org.slf4j#slf4j-api;1.6.0!slf4j-api.jar (371ms)
downloading https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/2.13.0/mongo-java-driver-2.13.0.jar ...
  [SUCCESSFUL ] org.mongodb#mongo-java-driver;2.13.0!mongo-java-driver.jar (5259ms)
downloading https://repo1.maven.org/maven2/joda-time/joda-time/2.3/joda-time-2.3.jar ...
  [SUCCESSFUL ] joda-time#joda-time;2.3!joda-time.jar (6949ms)
downloading https://repo1.maven.org/maven2/org/joda/joda-convert/1.2/joda-convert-1.2.jar ...
  [SUCCESSFUL ] org.joda#joda-convert;1.2!joda-convert.jar (548ms)
:: resolution report :: resolve 11850ms :: artifacts dl 26075ms
  :: modules in use:
  com.github.nscala-time#nscala-time_2.10;1.0.0 from central in [default]
  com.stratio.datasource#spark-mongodb_2.10;0.10.1 from central in [default]
  joda-time#joda-time;2.3 from central in [default]
  org.joda#joda-convert;1.2 from central in [default]
  org.mongodb#casbah-commons_2.10;2.8.0 from central in [default]
  org.mongodb#casbah-core_2.10;2.8.0 from central in [default]
  org.mongodb#casbah-query_2.10;2.8.0 from central in [default]
  org.mongodb#mongo-java-driver;2.13.0 from central in [default]
  org.slf4j#slf4j-api;1.6.0 from central in [default]
  |                  |            modules            ||   artifacts   |
  |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
  |      default     |   9   |   9   |   9   |   0   ||   9   |   9   |
:: retrieving :: org.apache.spark#spark-submit-parent
  confs: [default]
  9 artifacts copied, 0 already retrieved (2335kB/51ms)
... (snip) ... 

We are now ready to query MongoDB on localhost:27017 from the collection twtr01_coll in the database twtr01_db.

We first import the SQLContext:

In [5]:
from pyspark.sql import SQLContext
sqlContext.sql("CREATE TEMPORARY TABLE tweet_table USING com.stratio.datasource.mongodb OPTIONS (host 'localhost:27017', database 'twtr01_db', collection 'twtr01_coll')")
sqlContext.sql("SELECT * FROM tweet_table where id=598830778269769728 ").collect()

Here's the output of our query:

[Row(text=u'@spark_io is now @particle - awesome news - now I can enjoy my Particle Cores/Photons + @sparkfun sensors + @ApacheSpark analytics :-)', _id=u'55aa640fd770871cba74cb88', contributors=None, retweeted=False, user=Row(contributors_enabled=False, created_at=u'Mon Aug 25 14:01:26 +0000 2008', default_profile=True, default_profile_image=False, description=u'Building open source tools for and teaching enterprise software developers', entities=Row(description=Row(urls=[]), url=Row(urls=[Row(url=u'http://t.co/TSHp13EWeu', indices=[0, 22], 

... (snip) ...

 9], name=u'Spark is Particle', screen_name=u'spark_io'), Row(id=487010011, id_str=u'487010011', indices=[17, 26], name=u'Particle', screen_name=u'particle'), Row(id=17877351, id_str=u'17877351', indices=[88, 97], name=u'SparkFun Electronics', screen_name=u'sparkfun'), Row(id=1551361069, id_str=u'1551361069', indices=[108, 120], name=u'Apache Spark', screen_name=u'ApacheSpark')]), is_quote_status=None, lang=u'en', quoted_status_id_str=None, quoted_status_id=None, created_at=u'Thu May 14 12:42:37 +0000 2015', retweeted_status=None, truncated=False, place=None, id=598830778269769728, in_reply_to_user_id=3187046084, retweet_count=0, in_reply_to_status_id=None, in_reply_to_screen_name=u'spark_io', in_reply_to_user_id_str=u'3187046084', source=u'<a href="http://twitter.com" rel="nofollow">Twitter Web Client</a>', id_str=u'598830778269769728', coordinates=None, metadata=Row(iso_language_code=u'en', result_type=u'recent'), quoted_status=None)]
