Analytics with DataFrames

Let's learn how to create and use DataFrames for Big Data Analytics. For easy understanding and a quick example, the pyspark shell is to be used for the code in this chapter. The data needed for exercises used in this chapter can be found at https://github.com/apache/spark/tree/master/examples/src/main/resources. You can always create multiple data formats by reading one type of data file. For example, once you read .json file, you can write data in parquet, ORC, or other formats.

Note

All programs in this chapter are executed on CDH 5.8 VM except the programs in the DataFrame based Spark-on-HBase connector section, which are executed on HDP2.5. For other environments, file paths might change, but the concepts are the same in any environment.

Creating SparkSession

In Spark versions 1.6 and below, the entry point into all relational functionality in Spark is the SQLContext class. To create SQLContext in an application, we need to create a SparkContext and wrap SQLContext around it. Also, when working with Hive, we need to create HiveContext as shown in the following code:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

From Spark version 2.0 and above, SparkSession is the entry point for relational functionality. When working with Hive, SparkSession must be created with the enableHiveSupport method to get access to Hive Metastore, SerDes, and user-defined functions.

from pyspark.sql import SparkSession

spark = SparkSession 
    .builder 
    .appName("Log Analytics") 
    .config("configuration_key", "configuration_value") 
    .enableHiveSupport() 
    .getOrCreate()

In pyspark shell or spark-shell, a pre-configured SparkSession called spark will be created automatically.

Creating DataFrames

There are multiple ways to create a DataFrame. It can be created from structured data files such as .json, .avro, .parquet, or it can be created from RDDs, hive tables, external databases, or pandas DataFrames. Copy the input files located in Spark's /examples/src/main/resources/ directory to HDFS before executing the following code.

Creating DataFrames from structured data files

To create DataFrames from Parquet, Json, and ORC files, use the sqlContext.read API as shown in the following. Start a pyspark shell, which will provide a pre-built SparkSession called spark.

>>> df_parquet = spark.read.parquet("users.parquet")
>>> df_json = spark.read.json("people.json")

Creating DataFrames from RDDs

There are multiple ways to create a DataFrame from RDDs. RDDs have no schema, so schema must be assigned before creating the DataFrame. Let's use the following list and schema to create a DataFrame in multiple ways:

>>> mylist = [(50, "DataFrame"),(60, "pandas")]
>>> myschema = ['col1', 'col2']
  1. Create a DataFrame with a list, schema, and default data types:
    >>> df = spark.createDataFrame(mylist, myschema)
    
  2. Create a DataFrame by parallelizing a list and converting the RDD to a DataFrame. Print the schema and show the contents of the DataFrame as shown in Figure 4.7:
    >>> mylist = [(50, "DataFrame"),(60, "pandas")]
    >>> myschema = ['col1', 'col2']
    >>> df = sc.parallelize(mylist).toDF(myschema)
    >>> df.printSchema()
    >>> df.show()
    
    Creating DataFrames from RDDs

    Figure 4.7: Creating a DataFrame using a list and displaying it

  3. Read the data from a file, infer schema, and convert it to DataFrame:
    >>> from pyspark.sql import Row
    >>> peopleRDD = sc.textFile("people.txt")
    >>> people_sp = peopleRDD.map(lambda l: l.split(","))
    >>> people = people_sp.map(lambda p: Row(name=p[0], age=int(p[1])))
    >>> df_people = spark.createDataFrame(people)
    >>> df_people.createOrReplaceTempView("people")
    >>> spark.sql("SHOW TABLES").show()
    +---------+-----------+
    |tableName|isTemporary|
    +---------+-----------+
    |   people|       true|
    +---------+-----------+
    
    >>> spark.sql("SELECT name,age FROM people where age > 19").show()
    +-------+---+
    |   name|age|
    +-------+---+
    |Michael| 29|
    |   Andy| 30|
    +-------+---+
    
  4. Read the data from the file and assign schema programmatically:
    >>> from pyspark.sql.types import *
    >>> peopleRDD = sc.textFile("people.txt")
    >>> people_sp = peopleRDD.map(lambda l: l.split(","))
    >>> people = people_sp.map(lambda p: Row(name=p[0], age=int(p[1])))
    >>> df_people = people_sp.map(lambda p: (p[0], p[1].strip()))
    >>> schemaStr = "name age"
    >>> fields = [StructField(field_name, StringType(), True) 
    for field_name in schemaStr.split()]
    >>> schema = StructType(fields)
    >>> df_people = spark.createDataFrame(people,schema)
    >>> df_people.show()
    >>> df_people.createOrReplaceTempView("people")
    >>> spark.sql("select * from people").show()
    

Creating DataFrames from tables in Hive

To create DataFrames from Hive tables, create a SparkSession with the enableHiveSupport method. This is equivalent to creating HiveContext. Use the following code for working with the existing sample_07 Hive table. If the sample_07 table does not exist in Hive, install Hive examples from step 2 in the HUE quick start wizard:

>>> sample_07 = spark.table("sample_07")
>>> sample_07.show()

>>> spark.sql("select * from sample_07 limit 5").show()

Creating DataFrames from external databases

To create a DataFrame from external databases, use the sqlContext.read API with jdbc as the format and provide the connect string, table name, user ID, and password as options.

First, copy the /usr/lib/hive/lib/mysql-connector-java.jar to the JARs directory of Spark, or when starting pyspark shell provide the --jars dependency for mysql-connector-java.jar:

>>> df = spark.read.format('jdbc').options(url='jdbc:mysql://localhost:3306/retail_db?user=root&password=cloudera', dbtable='departments').load()

# This code can be re-written as follows.
>>> df = spark.read.format('jdbc').options(url='jdbc:mysql://localhost:3306/retail_db', dbtable='departments', user='root', password='cloudera').load()
>>> df.show()

Converting DataFrames to RDDs

Converting DataFrames to RDDs is very simple and you just need to use the .rdd method as shown in Figure 4.8. Let's display the content of the df DataFrame from the previous example, convert it to a .rdd, and collect the content of the .rdd. Notice that when you convert the DataFrame to an RDD, you get an RDD of the rows:

>>> df.show() // Print the content of DataFrame
>>> df2rdd.df.rdd
>>> df2rdd.take(2)
Converting DataFrames to RDDs

Figure 4.8: DataFrame and RDD from MySQL database

Common Dataset/DataFrame operations

A Dataset/DataFrame has many functions for reading and writing data, transforming data with Domain Specific Language (DSL), and actions to kick off transformations. Let's go through some of the most commonly-used DataFrame functions.

Input and Output Operations

Input and Output operations are used for loading and saving data:

  • read: Provides generic read functions for any data source
  • write: Provides generic write functions for any data source

Basic Dataset/DataFrame functions

Basic Dataset/DataFrame functions are to create DataFrames and perform operations for debugging on console:

  • As[U]: Returns new Dataset mapping records to specific types
  • toDF: Returns a new DataFrame with columns renamed
  • explain: Prints the (logical and physical) plans to the console for debugging purposes
  • printSchema: Prints out the schema in the tree format
  • createTempView: Registers this DataFrame as a temporary table using the given name
  • cache() or persist(): Persists the Dataset with specified persistence level

DSL functions

Domain Specific Language (DSL) functions are used for analytics. These functions are lazy and do not kick off the execution.

  • agg: Aggregates on the entire Dataset/DataFrame without groups
  • distinct: Return a new Dataset/DataFrame with unique rows
  • drop: Return a new Dataset/DataFrame with a column dropped
  • filter: Filters rows using the given condition
  • join: Join with another Dataset/DataFrame using the given join expression
  • limit: Returns a new Dataset/DataFrame by taking the first n rows
  • sort: Returns a new Dataset/DataFrame sorted in ascending order by the specified column
  • groupby: Groups the Dataset/DataFrame using the specified columns
  • unionAll: Returns a new Dataset/DataFrame containing a union of rows from two DataFrames
  • na: For working with missing or null values

Built-in functions, aggregate functions, and window functions

Built-in functions or user-defined functions operate on single rows to calculate a single value. Examples of built-in functions are substr or round. Aggregate functions operate on a set of rows and calculate a single value. Examples of aggregate functions are min, max, sum, mean, first, last, avg, count, countDistinct, and approxCountDistinct. Window functions operate on a set of rows related to the current row. Examples of DataFrame window functions are rank, denseRank, percentRank, ntile, and rowNumber.

Actions

Actions begin execution. Some examples of actions are:

  • collect: Returns an array that contains all rows in this DataFrame
  • count: Counts the number of rows in the DataFrame
  • describe: Computes statistics for numeric columns
  • show: Prints the first n rows to the console
  • take: Returns the first num rows as a list of rows

RDD operations

Regular RDD operations such as map, flatMap, coalesce, repartition, foreach, toJson, and toJavaRDD can be applied on DataFrames. These are really helpful because all kinds of functions are not available within DSL. For example, you can apply the foreach function to print col2 of the DataFrame as shown in the following:

>>> def f(mydf):
...    print(mydf.department_name)
...
>>> df.foreach(f)
Fitness
Footwear
Apparel
Golf
Outdoors
Fan Shop

Caching data

Spark SQL tables can be cached in an in-memory columnar format by using the df.persist() method. Spark SQL will scan only the required columns from the columnar in-memory table. It also automatically tunes compression to minimize memory usage and garbage collection pressure. The DataFrame can be un-persisted using df.unpersist().

Performance optimizations

Let's look at some of the important performance tuning parameters of Spark SQL:

  • spark.sql.inMemoryColumnarStorage.compressed is enabled to true by default, which will compress the data.
  • spark.sql.inMemoryColumnarStorage.batchSize is set to 10000, which controls the batch size for columnar caching.
  • spark.sql.autoBroadcastJoinThreshold is set to 10 MB by default. This configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join.
  • spark.sql.files.maxPartitionBytes is set to 128 MB, which is the size to store in a single partition while reading data.
  • spark.sql.shuffle.partitions is set to 200 by default. This configures the number of partitions when shuffling data.
  • spark.sql.planner.externalSort is set to true by default. This performs sorts that spill to disk as needed; otherwise it sorts each partition in memory.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset