Spark SQL is a relational query engine built on top of Spark Core. Spark SQL uses a query optimizer called Catalyst.
Relational queries can be expressed using SQL or HiveQL and executed against JSON, CSV, and various databases. Spark SQL gives us the full expressiveness of declarative programing with Spark dataframes on top of functional programming with RDDs.
Here's a tweet from @bigdata
announcing Spark 1.3.0, the advent of Spark SQL and dataframes. It also highlights the various data sources in the lower part of the diagram. On the top part, we can notice R as the new language that will be gradually supported on top of Scala, Java, and Python. Ultimately, the Data Frame philosophy is pervasive between R, Python, and Spark.
Spark dataframes originate from SchemaRDDs. It combines RDD with a schema that can be inferred by Spark, if requested, when registering the dataframe. It allows us to query complex nested JSON data with plain SQL. Lazy evaluation, lineage, partitioning, and persistence apply to dataframes.
Let's query the data with Spark SQL, by first importing SparkContext
and SQLContext
:
from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext, Row In [95]: sc Out[95]: <pyspark.context.SparkContext at 0x7f7829581890> In [96]: sc.master Out[96]: u'local[*]' ''In [98]: # Instantiate Spark SQL context sqlc = SQLContext(sc)
We read in the JSON file we saved with Odo:
twts_sql_df_01 = sqlc.jsonFile ("/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data/twtr15051401_distinct.json") In [101]: twts_sql_df_01.show() created_at id tweet_text user_id user_name 2015-05-14T12:43:57Z 598831111406510082 RT @pacoid: Great... 14755521 raulsaeztapia 2015-05-14T11:15:52Z 598808944719593472 RT @alvaroagea: S... 14755521 raulsaeztapia 2015-05-14T10:25:15Z 598796205091500032 RT @PrabhaGana: W... 48695135 John Humphreys 2015-05-14T09:54:52Z 598788561127735296 RT @Ellen_Friedma... 2385931712 Leonardo D'Ambrosi 2015-05-14T09:42:53Z 598785545557438464 RT @Ellen_Friedma... 461020977 Alexey Kosenkov 2015-05-14T09:32:39Z 598782970082807808 RT @BigDataTechCo... 1377652806 embeddedcomputer.nl 2015-05-14T09:12:38Z 598777933730160640 I'm still on Euro... 294862170 Ellen Friedman
We print the schema of the Spark dataframe:
twts_sql_df_01.printSchema() root |-- created_at: string (nullable = true) |-- id: long (nullable = true) |-- tweet_text: string (nullable = true) |-- user_id: long (nullable = true) |-- user_name: string (nullable = true)
We select the user_name
column from the dataframe:
twts_sql_df_01.select('user_name').show() user_name raulsaeztapia raulsaeztapia John Humphreys Leonardo D'Ambrosi Alexey Kosenkov embeddedcomputer.nl Ellen Friedman
We register the dataframe as a table, so we can execute a SQL query on it:
twts_sql_df_01.registerAsTable('tweets_01')
We execute a SQL statement against the dataframe:
twts_sql_df_01_selection = sqlc.sql("SELECT * FROM tweets_01 WHERE user_name = 'raulsaeztapia'") In [109]: twts_sql_df_01_selection.show() created_at id tweet_text user_id user_name 2015-05-14T12:43:57Z 598831111406510082 RT @pacoid: Great... 14755521 raulsaeztapia 2015-05-14T11:15:52Z 598808944719593472 RT @alvaroagea: S... 14755521 raulsaeztapia
Let's process some more complex JSON; we read the original Twitter JSON file:
tweets_sqlc_inf = sqlc.jsonFile(infile)
Spark SQL is able to infer the schema of a complex nested JSON file:
tweets_sqlc_inf.printSchema() root |-- contributors: string (nullable = true) |-- coordinates: string (nullable = true) |-- created_at: string (nullable = true) |-- entities: struct (nullable = true) | |-- hashtags: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- indices: array (nullable = true) | | | | |-- element: long (containsNull = true) | | | |-- text: string (nullable = true) | |-- media: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- display_url: string (nullable = true) | | | |-- expanded_url: string (nullable = true) | | | |-- id: long (nullable = true) | | | |-- id_str: string (nullable = true) | | | |-- indices: array (nullable = true) ... (snip) ... | |-- statuses_count: long (nullable = true) | |-- time_zone: string (nullable = true) | |-- url: string (nullable = true) | |-- utc_offset: long (nullable = true) | |-- verified: boolean (nullable = true)
We extract the key information of interest from the wall of data by selecting specific columns in the dataframe (in this case, ['created_at', 'id', 'text', 'user.id', 'user.name', 'entities.urls.expanded_url']
):
tweets_extract_sqlc = tweets_sqlc_inf[['created_at', 'id', 'text', 'user.id', 'user.name', 'entities.urls.expanded_url']].distinct() In [145]: tweets_extract_sqlc.show() created_at id text id name expanded_url Thu May 14 09:32:... 598782970082807808 RT @BigDataTechCo... 1377652806 embeddedcomputer.nl ArrayBuffer(http:... Thu May 14 12:43:... 598831111406510082 RT @pacoid: Great... 14755521 raulsaeztapia ArrayBuffer(http:... Thu May 14 12:18:... 598824733086523393 @rabbitonweb spea... ... Thu May 14 12:28:... 598827171168264192 RT @baandrzejczak... 20909005 Paweł Szulc ArrayBuffer()
We execute a SQL statement against the dataframe:
tweets_extract_sqlc_sel = sqlc.sql("SELECT * from Tweets_xtr_001 WHERE name='raulsaeztapia'")
We get a detailed view of the query plans executed by Spark SQL:
The query plan uses Spark SQL's Catalyst optimizer. In order to generate the compiled bytecode from the query parts, the Catalyst optimizer runs through logical plan parsing and optimization followed by physical plan evaluation and optimization based on cost.
This is illustrated in the following tweet:
Looking back at our code, we call the .explain
function on the Spark SQL query we just executed, and it delivers the full details of the steps taken by the Catalyst optimizer in order to assess and optimize the logical plan and the physical plan and get to the result RDD:
tweets_extract_sqlc_sel.explain(extended = True) == Parsed Logical Plan == 'Project [*] 'Filter ('name = raulsaeztapia)'name' 'UnresolvedRelation' [Tweets_xtr_001], None == Analyzed Logical Plan == Project [created_at#7,id#12L,text#27,id#80L,name#81,expanded_url#82] Filter (name#81 = raulsaeztapia) Distinct Project [created_at#7,id#12L,text#27,user#29.id AS id#80L,user#29.name AS name#81,entities#8.urls.expanded_url AS expanded_url#82] Relation[contributors#5,coordinates#6,created_at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#23L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,user#29] JSONRelation(/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data/twtr15051401.json,1.0,None) == Optimized Logical Plan == Filter (name#81 = raulsaeztapia) Distinct Project [created_at#7,id#12L,text#27,user#29.id AS id#80L,user#29.name AS name#81,entities#8.urls.expanded_url AS expanded_url#82] Relation[contributors#5,coordinates#6,created_at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#23L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,user#29] JSONRelation(/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data/twtr15051401.json,1.0,None) == Physical Plan == Filter (name#81 = raulsaeztapia) Distinct false Exchange (HashPartitioning [created_at#7,id#12L,text#27,id#80L,name#81,expanded_url#82], 200) Distinct true Project [created_at#7,id#12L,text#27,user#29.id AS id#80L,user#29.name AS name#81,entities#8.urls.expanded_url AS expanded_url#82] PhysicalRDD [contributors#5,coordinates#6,created_at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#23L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,user#29], MapPartitionsRDD[165] at map at JsonRDD.scala:41 Code Generation: false == RDD ==
Finally, here's the result of the query:
tweets_extract_sqlc_sel.show() created_at id text id name expanded_url Thu May 14 12:43:... 598831111406510082 RT @pacoid: Great... 14755521 raulsaeztapia ArrayBuffer(http:... Thu May 14 11:15:... 598808944719593472 RT @alvaroagea: S... 14755521 raulsaeztapia ArrayBuffer(http:... In [148]:
We will use the Spark package spark-csv_2.11:1.2.0
. The command to be used to launch PySpark with the IPython Notebook and the spark-csv
package should explicitly state the –packages
argument:
$ IPYTHON_OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark --packages com.databricks:spark-csv_2.11:1.2.0
This will trigger the following output; we can see that the spark-csv
package is installed with all its dependencies:
an@an-VB:~/spark/spark-1.5.0-bin-hadoop2.6/examples/AN_Spark$ IPYTHON_OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark --packages com.databricks:spark-csv_2.11:1.2.0
... (snip) ... Ivy Default Cache set to: /home/an/.ivy2/cache The jars for the packages stored in: /home/an/.ivy2/jars :: loading settings :: url = jar:file:/home/an/spark/spark-1.5.0-bin-hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml com.databricks#spark-csv_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.databricks#spark-csv_2.11;1.2.0 in central found org.apache.commons#commons-csv;1.1 in central found com.univocity#univocity-parsers;1.5.1 in central :: resolution report :: resolve 835ms :: artifacts dl 48ms :: modules in use: com.databricks#spark-csv_2.11;1.2.0 from central in [default] com.univocity#univocity-parsers;1.5.1 from central in [default] org.apache.commons#commons-csv;1.1 from central in [default] ---------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| ---------------------------------------------------------------- | default | 3 | 0 | 0 | 0 || 3 | 0 ---------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 3 already retrieved (0kB/45ms)
We are now ready to load our csv
file and process it. Let's first import the SQLContext
:
# # Read csv in a Spark DF # sqlContext = SQLContext(sc) spdf_in = sqlContext.read.format('com.databricks.spark.csv') .options(delimiter=";").options(header="true") .options(header='true').load(csv_in)
We access the schema of the dataframe created from the loaded csv
:
In [10]: spdf_in.printSchema() root |-- : string (nullable = true) |-- id: string (nullable = true) |-- created_at: string (nullable = true) |-- user_id: string (nullable = true) |-- user_name: string (nullable = true) |-- tweet_text: string (nullable = true)
We check the columns of the dataframe:
In [12]: spdf_in.columns Out[12]: ['', 'id', 'created_at', 'user_id', 'user_name', 'tweet_text']
We introspect the dataframe content:
In [13]: spdf_in.show() +---+------------------+--------------------+----------+------------------+--------------------+ | | id| created_at| user_id| user_name| tweet_text| +---+------------------+--------------------+----------+------------------+--------------------+ | 0|638830426971181057|Tue Sep 01 21:46:...|3276255125| True Equality|ernestsgantt: Bey...| | 1|638830426727911424|Tue Sep 01 21:46:...|3276255125| True Equality|ernestsgantt: Bey...| | 2|638830425402556417|Tue Sep 01 21:46:...|3276255125| True Equality|ernestsgantt: Bey...| ... (snip) ... | 41|638830280988426250|Tue Sep 01 21:46:...| 951081582| Jack Baldwin|RT @cloudaus: We ...| | 42|638830276626399232|Tue Sep 01 21:46:...| 6525302|Masayoshi Nakamura|PynamoDB使いやすいです | +---+------------------+--------------------+----------+------------------+--------------------+ only showing top 20 rows
There are two major ways to interact with MongoDB from Spark: the first is through the Hadoop MongoDB connector, and the second one is directly from Spark to MongoDB.
The first approach to interact with MongoDB from Spark is to set up a Hadoop environment and query through the Hadoop MongoDB connector. The connector details are hosted on GitHub at https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage. An actual use case is described in the series of blog posts from MongoDB:
Setting up a full Hadoop environment is bit elaborate. We will favor the second approach. We will use the spark-mongodb
connector developed and maintained by Stratio. We are using the Stratio spark-mongodb
package hosted at spark.packages.org
. The packages information and version can be found in spark.packages.org
:
The command to launch PySpark with the IPython Notebook and the spark-mongodb
package should explicitly state the packages argument:
$ IPYTHON_OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.10:0.10.1
This will trigger the following output; we can see that the spark-mongodb
package is installed with all its dependencies:
an@an-VB:~/spark/spark-1.5.0-bin-hadoop2.6/examples/AN_Spark$ IPYTHON_OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.10:0.10.1 ... (snip) ... Ivy Default Cache set to: /home/an/.ivy2/cache The jars for the packages stored in: /home/an/.ivy2/jars :: loading settings :: url = jar:file:/home/an/spark/spark-1.5.0-bin-hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml com.stratio.datasource#spark-mongodb_2.10 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.stratio.datasource#spark-mongodb_2.10;0.10.1 in central [W 22:10:50.910 NotebookApp] Timeout waiting for kernel_info reply from 764081d3-baf9-4978-ad89-7735e6323cb6 found org.mongodb#casbah-commons_2.10;2.8.0 in central found com.github.nscala-time#nscala-time_2.10;1.0.0 in central found joda-time#joda-time;2.3 in central found org.joda#joda-convert;1.2 in central found org.slf4j#slf4j-api;1.6.0 in central found org.mongodb#mongo-java-driver;2.13.0 in central found org.mongodb#casbah-query_2.10;2.8.0 in central found org.mongodb#casbah-core_2.10;2.8.0 in central downloading https://repo1.maven.org/maven2/com/stratio/datasource/spark-mongodb_2.10/0.10.1/spark-mongodb_2.10-0.10.1.jar ... [SUCCESSFUL ] com.stratio.datasource#spark-mongodb_2.10;0.10.1!spark-mongodb_2.10.jar (3130ms) downloading https://repo1.maven.org/maven2/org/mongodb/casbah-commons_2.10/2.8.0/casbah-commons_2.10-2.8.0.jar ... [SUCCESSFUL ] org.mongodb#casbah-commons_2.10;2.8.0!casbah-commons_2.10.jar (2812ms) downloading https://repo1.maven.org/maven2/org/mongodb/casbah-query_2.10/2.8.0/casbah-query_2.10-2.8.0.jar ... [SUCCESSFUL ] org.mongodb#casbah-query_2.10;2.8.0!casbah-query_2.10.jar (1432ms) downloading https://repo1.maven.org/maven2/org/mongodb/casbah-core_2.10/2.8.0/casbah-core_2.10-2.8.0.jar ... [SUCCESSFUL ] org.mongodb#casbah-core_2.10;2.8.0!casbah-core_2.10.jar (2785ms) downloading https://repo1.maven.org/maven2/com/github/nscala-time/nscala-time_2.10/1.0.0/nscala-time_2.10-1.0.0.jar ... [SUCCESSFUL ] com.github.nscala-time#nscala-time_2.10;1.0.0!nscala-time_2.10.jar (2725ms) downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.6.0/slf4j-api-1.6.0.jar ... [SUCCESSFUL ] org.slf4j#slf4j-api;1.6.0!slf4j-api.jar (371ms) downloading https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/2.13.0/mongo-java-driver-2.13.0.jar ... [SUCCESSFUL ] org.mongodb#mongo-java-driver;2.13.0!mongo-java-driver.jar (5259ms) downloading https://repo1.maven.org/maven2/joda-time/joda-time/2.3/joda-time-2.3.jar ... [SUCCESSFUL ] joda-time#joda-time;2.3!joda-time.jar (6949ms) downloading https://repo1.maven.org/maven2/org/joda/joda-convert/1.2/joda-convert-1.2.jar ... [SUCCESSFUL ] org.joda#joda-convert;1.2!joda-convert.jar (548ms) :: resolution report :: resolve 11850ms :: artifacts dl 26075ms :: modules in use: com.github.nscala-time#nscala-time_2.10;1.0.0 from central in [default] com.stratio.datasource#spark-mongodb_2.10;0.10.1 from central in [default] joda-time#joda-time;2.3 from central in [default] org.joda#joda-convert;1.2 from central in [default] org.mongodb#casbah-commons_2.10;2.8.0 from central in [default] org.mongodb#casbah-core_2.10;2.8.0 from central in [default] org.mongodb#casbah-query_2.10;2.8.0 from central in [default] org.mongodb#mongo-java-driver;2.13.0 from central in [default] org.slf4j#slf4j-api;1.6.0 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 9 | 9 | 9 | 0 || 9 | 9 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 9 artifacts copied, 0 already retrieved (2335kB/51ms) ... (snip) ...
We are now ready to query MongoDB on localhost:27017
from the collection twtr01_coll
in the database twtr01_db
.
We first import the SQLContext
:
In [5]: from pyspark.sql import SQLContext sqlContext.sql("CREATE TEMPORARY TABLE tweet_table USING com.stratio.datasource.mongodb OPTIONS (host 'localhost:27017', database 'twtr01_db', collection 'twtr01_coll')") sqlContext.sql("SELECT * FROM tweet_table where id=598830778269769728 ").collect()
Here's the output of our query:
Out[5]: [Row(text=u'@spark_io is now @particle - awesome news - now I can enjoy my Particle Cores/Photons + @sparkfun sensors + @ApacheSpark analytics :-)', _id=u'55aa640fd770871cba74cb88', contributors=None, retweeted=False, user=Row(contributors_enabled=False, created_at=u'Mon Aug 25 14:01:26 +0000 2008', default_profile=True, default_profile_image=False, description=u'Building open source tools for and teaching enterprise software developers', entities=Row(description=Row(urls=[]), url=Row(urls=[Row(url=u'http://t.co/TSHp13EWeu', indices=[0, 22], ... (snip) ... 9], name=u'Spark is Particle', screen_name=u'spark_io'), Row(id=487010011, id_str=u'487010011', indices=[17, 26], name=u'Particle', screen_name=u'particle'), Row(id=17877351, id_str=u'17877351', indices=[88, 97], name=u'SparkFun Electronics', screen_name=u'sparkfun'), Row(id=1551361069, id_str=u'1551361069', indices=[108, 120], name=u'Apache Spark', screen_name=u'ApacheSpark')]), is_quote_status=None, lang=u'en', quoted_status_id_str=None, quoted_status_id=None, created_at=u'Thu May 14 12:42:37 +0000 2015', retweeted_status=None, truncated=False, place=None, id=598830778269769728, in_reply_to_user_id=3187046084, retweet_count=0, in_reply_to_status_id=None, in_reply_to_screen_name=u'spark_io', in_reply_to_user_id_str=u'3187046084', source=u'<a href="http://twitter.com" rel="nofollow">Twitter Web Client</a>', id_str=u'598830778269769728', coordinates=None, metadata=Row(iso_language_code=u'en', result_type=u'recent'), quoted_status=None)] #