SparkSQL and DataFrames

Before Apache Spark, Apache Hive was the go-to technology whenever anyone wanted to run an SQL-like query on large amount of data. Apache Hive essentially translated an SQL query into MapReduce, like logic automatically making it very easy to perform many kinds of analytics on big data without actually learning to write complex code in Java and Scala. 

With the advent of Apache Spark, there was a paradigm shift in how we could perform analysis at a big data scale. Spark SQL provides an SQL-like layer on top of Apache Spark's distributed computation abilities that is rather simple to use. In fact, Spark SQL can be used as an online analytical processing database. Spark SQL works by parsing the SQL-like statement into an abstract syntax tree (AST), subsequently converting that plan to a logical plan and then optimizing the logical plan into a physical plan that can be executed, as shown in the following diagram:

The final execution uses the underlying DataFrame API, making it very easy for anyone to use DataFrame APIs by simply using an SQL-like interface rather than learning all the internals. Since this book dives into the technical details of various APIs, we will primarily cover the DataFrame APIs, showing Spark SQL API at some places to contrast the different ways of using the APIs. Thus, the DataFrame API is the underlying layer beneath Spark SQL. In this chapter, we will show you how to create DataFrames using various techniques, including SQL queries and performing operations on the DataFrames.

A DataFrame is an abstraction over the resilient distributed dataset (RDD) dealing with higher-level functions optimized using a catalyst optimizer, and is also highly performant via the Tungsten initiative. 

Since its inception, Project Tungsten has been the largest change to Spark's execution engine. Its main focus lies in enhancing efficiency of CPU and memory for Spark applications. This project comprises three initiatives:

  • Memory management and binary processing
  • Cache-aware computation
  • Code generation

You can think of a dataset as an efficient table over an RDD with a heavily optimized binary representation of the data. The binary representation is achieved using encoders that serialize the various objects into a binary structure for much better performance than RDD representation. Since DataFrame uses the RDD internally anyway, a DataFrame/dataset is also distributed exactly like an RDD and thus is also a distributed dataset. Obviously, this also means datasets are immutable.

The following is an illustration of the binary representation of data:

Datasets were added in Spark 1.6 and provide the benefit of strong typing on top of DataFrames. In fact, since Spark 2.0 the DataFrame is simply an alias of a Dataset.

http://spark.apache.org/sql/ defines the DataFrame type as a Dataset[Row], which means that most of the APIs will work well with both dataset and DataFrame.type DataFrame = Dataset[Row].

A DataFrame is conceptually similar to a table in a relational database. Hence, a DataFrame contains rows of data with each row consisting of several columns. One of the first things we need to keep in mind is that just like RDDs, DataFrames are also immutable. This property of DataFrames being immutable means every transformation or action creates a new DataFrame.

Let's start by looking more into DataFrames and how they are different from RDDs. RDDs, as seen before, represent a low-level API for data manipulation in Apache Spark. The DataFrames were created on top of RDDs to abstract the low-level inner workings of RDDs and expose high-level APIs which are easier to use and provide lot of functionality out of the box. DataFrame was created following similar concepts found in the Python pandas package, R language, Julia language, and so on. 

As we mentioned before, the DataFrame translates the SQL code and domain-specific language expressions into optimized execution plans to be run on top of Spark Core APIs in order for the SQL statements to perform a wide variety of operations. DataFrames support many different types of input data sources and many types of operations. This includes all types of SQL operations such as joins, group by, aggregations, and window functions as most of the databases.

Spark SQL is also quite similar to the Hive query language and since Spark provides a natural adapter to Apache Hive, users who have been working in Apache Hive can easily transfer their knowledge and apply to Spark SQL, thus minimizing the transition time. The DataFrame essentially depends on the concept of table as seen previously.

The table can be operated very similarly to how Apache Hive works. In fact, many of the operations on the tables in Apache Spark are similar to how Apache Hive handles tables and operates on the tables. Once you have a table that is the DataFrame, the DataFrame can be registered as a table and you can operate on the data using Spark SQL statements in lieu of the DataFrame APIs.

The DataFrame depends on the catalyst optimizer and the Tungsten performance improvement, so let's briefly examine how the catalyst optimizer works. The catalyst optimizer creates a parsed logical plan from the input SQL and then analyzes the logical plan by looking at all the various attributes and columns used in the SQL statement. Once the analyzed logical plan is created, the catalyst optimizer further tries to optimize the plan by combining several operations and also rearranging the logic to get better performance.

In order to understand the catalyst optimizer, think about it as a common sense logic optimizer which can reorder operations such as filters and transformations, sometimes grouping several operations into one so as to minimize the amount of data that is shuffled across the worker nodes. For example, the catalyst optimizer may decide to broadcast the smaller datasets when performing joint operations between different datasets. Use explain to look at the execution plan of any DataFrame. The catalyst optimizer also computes statistics of the DataFrames columns and partitions improving the speed of execution.

For example, if there are transformations and filters on the data partitions then the order in which we filter data and apply transformations matters a lot to the overall performance of the operations. As a result of all the optimizations, the optimized logical plan is generated which is then converted to a physical plan.

Obviously, several physical plans are possible to execute the same SQL statement and generate the same result. The cost optimization logic determines and picks a good physical plan based on the cost optimizations and estimations. Tungsten performance improvements are another key ingredient in the secret sauce behind the phenomenal performance improvements offered by Spark 2.x compared to previous releases such as Spark 1.6 or older.

Tungsten implements a complete overhaul of memory management and other performance improvements. Most important memory management improvements use binary encoding of objects and reference them in both off-heap and on-heap memory. Thus, Tungsten allows usage of office heap memory by using the binary encoding mechanism to encode all the objects. Binary encoded objects take up much less memory.

Project Tungsten also improve shuffle performance. The data is typically loaded into DataFrames through the DataFrameReader and data is saved from DataFrames through DataFrameWriter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset