Let's learn how to create and use DataFrames for Big Data Analytics. For easy understanding and a quick example, the pyspark
shell is to be used for the code in this chapter. The data needed for exercises used in this chapter can be found at https://github.com/apache/spark/tree/master/examples/src/main/resources. You can always create multiple data formats by reading one type of data file. For example, once you read .json
file, you can write data in parquet, ORC, or other formats.
In Spark versions 1.6 and below, the entry point into all relational functionality in Spark is the SQLContext
class. To create SQLContext
in an application, we need to create a SparkContext
and wrap SQLContext
around it. Also, when working with Hive, we need to create HiveContext
as shown in the following code:
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) from pyspark.sql import HiveContext sqlContext = HiveContext(sc)
From Spark version 2.0 and above, SparkSession
is the entry point for relational functionality. When working with Hive, SparkSession
must be created with the enableHiveSupport
method to get access to Hive Metastore, SerDes, and user-defined functions.
from pyspark.sql import SparkSession spark = SparkSession .builder .appName("Log Analytics") .config("configuration_key", "configuration_value") .enableHiveSupport() .getOrCreate()
In pyspark shell or spark-shell, a pre-configured SparkSession
called spark
will be created automatically.
There are multiple ways to create a DataFrame. It can be created from structured data files such as .json
, .avro
, .parquet
, or it can be created from RDDs, hive tables, external databases, or pandas DataFrames. Copy the input files located in Spark's /examples/src/main/resources/
directory to HDFS before executing the following code.
To create DataFrames from Parquet, Json, and ORC files, use the sqlContext.read
API as shown in the following. Start a pyspark shell, which will provide a pre-built SparkSession called spark.
>>> df_parquet = spark.read.parquet("users.parquet") >>> df_json = spark.read.json("people.json")
There are multiple ways to create a DataFrame from RDDs. RDDs have no schema, so schema must be assigned before creating the DataFrame. Let's use the following list and schema to create a DataFrame in multiple ways:
>>> mylist = [(50, "DataFrame"),(60, "pandas")] >>> myschema = ['col1', 'col2']
>>> df = spark.createDataFrame(mylist, myschema)
>>> mylist = [(50, "DataFrame"),(60, "pandas")] >>> myschema = ['col1', 'col2'] >>> df = sc.parallelize(mylist).toDF(myschema) >>> df.printSchema() >>> df.show()
>>> from pyspark.sql import Row >>> peopleRDD = sc.textFile("people.txt") >>> people_sp = peopleRDD.map(lambda l: l.split(",")) >>> people = people_sp.map(lambda p: Row(name=p[0], age=int(p[1]))) >>> df_people = spark.createDataFrame(people) >>> df_people.createOrReplaceTempView("people") >>> spark.sql("SHOW TABLES").show() +---------+-----------+ |tableName|isTemporary| +---------+-----------+ | people| true| +---------+-----------+ >>> spark.sql("SELECT name,age FROM people where age > 19").show() +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| +-------+---+
>>> from pyspark.sql.types import * >>> peopleRDD = sc.textFile("people.txt") >>> people_sp = peopleRDD.map(lambda l: l.split(",")) >>> people = people_sp.map(lambda p: Row(name=p[0], age=int(p[1]))) >>> df_people = people_sp.map(lambda p: (p[0], p[1].strip())) >>> schemaStr = "name age" >>> fields = [StructField(field_name, StringType(), True) for field_name in schemaStr.split()] >>> schema = StructType(fields) >>> df_people = spark.createDataFrame(people,schema) >>> df_people.show() >>> df_people.createOrReplaceTempView("people") >>> spark.sql("select * from people").show()
To create DataFrames from Hive tables, create a SparkSession with the enableHiveSupport method. This is equivalent to creating HiveContext. Use the following code for working with the existing sample_07
Hive table. If the sample_07
table does not exist in Hive, install Hive examples from step 2 in the HUE quick start wizard:
>>> sample_07 = spark.table("sample_07") >>> sample_07.show() >>> spark.sql("select * from sample_07 limit 5").show()
To create a DataFrame from external databases, use the sqlContext.read
API with jdbc
as the format and provide the connect string, table name, user ID, and password as options.
First, copy the /usr/lib/hive/lib/mysql-connector-java.jar
to the JARs directory of Spark, or when starting pyspark shell provide the --jars
dependency for mysql-connector-java.jar
:
>>> df = spark.read.format('jdbc').options(url='jdbc:mysql://localhost:3306/retail_db?user=root&password=cloudera', dbtable='departments').load() # This code can be re-written as follows. >>> df = spark.read.format('jdbc').options(url='jdbc:mysql://localhost:3306/retail_db', dbtable='departments', user='root', password='cloudera').load() >>> df.show()
Converting DataFrames to RDDs is very simple and you just need to use the .rdd
method as shown in Figure 4.8. Let's display the content of the df
DataFrame from the previous example, convert it to a .rdd
, and collect the content of the .rdd
. Notice that when you convert the DataFrame to an RDD, you get an RDD of the rows:
>>> df.show() // Print the content of DataFrame >>> df2rdd.df.rdd >>> df2rdd.take(2)
A Dataset/DataFrame has many functions for reading and writing data, transforming data with Domain Specific Language (DSL), and actions to kick off transformations. Let's go through some of the most commonly-used DataFrame functions.
Input and Output operations are used for loading and saving data:
Basic Dataset/DataFrame functions are to create DataFrames and perform operations for debugging on console:
As[U]
: Returns new Dataset mapping records to specific typestoDF
: Returns a new DataFrame with columns renamedexplain
: Prints the (logical and physical) plans to the console for debugging purposesprintSchema
: Prints out the schema in the tree formatcreateTempView
: Registers this DataFrame as a temporary table using the given namecache()
or persist()
: Persists the Dataset with specified persistence levelDomain Specific Language (DSL) functions are used for analytics. These functions are lazy and do not kick off the execution.
agg
: Aggregates on the entire Dataset/DataFrame without groupsdistinct
: Return a new Dataset/DataFrame with unique rowsdrop
: Return a new Dataset/DataFrame with a column droppedfilter
: Filters rows using the given conditionjoin
: Join with another Dataset/DataFrame using the given join expressionlimit
: Returns a new Dataset/DataFrame by taking the first n
rowssort
: Returns a new Dataset/DataFrame sorted in ascending order by the specified columngroupby
: Groups the Dataset/DataFrame using the specified columnsunionAll
: Returns a new Dataset/DataFrame containing a union of rows from two DataFramesna
: For working with missing or null valuesBuilt-in functions or user-defined functions operate on single rows to calculate a single value. Examples of built-in functions are substr
or round
. Aggregate functions operate on a set of rows and calculate a single value. Examples of aggregate functions are min
, max
, sum
, mean
, first
, last
, avg
, count
, countDistinct
, and approxCountDistinct
. Window functions operate on a set of rows related to the current row. Examples of DataFrame window functions are rank
, denseRank
, percentRank
, ntile
, and rowNumber
.
Actions begin execution. Some examples of actions are:
Regular RDD operations such as map
, flatMap
, coalesce
, repartition
, foreach
, toJson
, and toJavaRDD
can be applied on DataFrames. These are really helpful because all kinds of functions are not available within DSL. For example, you can apply the foreach
function to print col2
of the DataFrame as shown in the following:
>>> def f(mydf): ... print(mydf.department_name) ... >>> df.foreach(f) Fitness Footwear Apparel Golf Outdoors Fan Shop
Spark SQL tables can be cached in an in-memory columnar format by using the df.persist()
method. Spark SQL will scan only the required columns from the columnar in-memory table. It also automatically tunes compression to minimize memory usage and garbage collection pressure. The DataFrame can be un-persisted using df.unpersist()
.
Let's look at some of the important performance tuning parameters of Spark SQL:
spark.sql.inMemoryColumnarStorage.compressed
is enabled to true by default, which will compress the data.spark.sql.inMemoryColumnarStorage.batchSize
is set to 10000
, which controls the batch size for columnar caching.spark.sql.autoBroadcastJoinThreshold
is set to 10 MB by default. This configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join.spark.sql.files.maxPartitionBytes
is set to 128 MB, which is the size to store in a single partition while reading data.spark.sql.shuffle.partitions
is set to 200
by default. This configures the number of partitions when shuffling data.spark.sql.planner.externalSort
is set to true
by default. This performs sorts that spill to disk as needed; otherwise it sorts each partition in memory.