Data Sources API

The Data Sources API provides a single interface for loading and storing data using Spark SQL. In addition to the built-in sources, this API provides an easy way for developers to add support for custom data sources. All available external packages are listed at http://spark-packages.org/. Let's learn how to use built-in sources and external sources in this section.

Read and write functions

The Data Sources API provides generic read and write functions that can used for any kind of data source. Generic read and write functions provide two functionalities as given in the following:

  • Parses text records, JSON records, and other formats and deserializes data stored in binary
  • Converts Java objects to rows of Avro, JSON, Parquet, and HBase records

The default data source is set to parquet with the spark.sql.sources.default configuration property. This can be changed as needed.

Built-in sources

Built-in sources are pre-packaged with Spark by default. Examples of built-in sources are Text, JSON, Parquet, ORC, JDBC, and CSV.

Working with text files

To load text files, we use the text method, which will return a single column with the column name set to value and type as string. Note that the Dataset API is not supported in Python language. So, in Python language, a DataFrame is returned while in Scala language Dataset it is returned, as shown in the following:

>>> df_txt = spark.read.text("people.txt")
>>> df_txt.show()
+-----------+
|      value|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+
>>> df_txt
DataFrame[value: string]

Working with JSON

Spark SQL can automatically infer the schema of a JSON dataset when loading to a DataFrame:

>>> df_json = spark.read.json("people.json")
>>> df_json.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

You can also use the load method to load files. The following command will throw an exception with the message NOT A PARQUET FILE. Because, the default data source is configured to parquet. So, to read a .json file, use format to specify the type of datasource:

>>> df_json = spark.read.load("people.json")

Manually specify the type of file with format:

>>> df_json = spark.read.load("people.json", format="json")

To write data to another JSON file, use one of the following commands:

>>> df_json.write.json("newjson_dir")
>>> df_json.write.format("json").save("newjson_dir2")

To write data to any other format, just mention the format you want to save. The following example saves the df_json DataFrame in Parquet format:

>>> df_json.write.parquet("parquet_dir")
>>> df_json.write.format("parquet").save("parquet_dir2")

Working with Parquet

Apache Parquet is a columnar storage format that provides superior performance and is available in any Hadoop project. File format documentation can be found at https://parquet.apache.org/documentation/latest/.

As mentioned earlier, whatever the type of data source, similar read and write functions of DataSources API are used. So, let's use read and write functions for parquet files in the parquet_dir directory as well.

To create a DataFrame from a Parquet file and issue SQL commands, use the following command:

>>> df_parquet = spark.read.load("parquet_dir")

Or use the following command:

>>> df_parquet = spark.read.parquet("parquet_dir")

Note that format is not needed when using the load method because parquet is the default data source. This reads all parquet files from the parquet_dir directory:

>>> df_parquet.createOrReplaceTempView("parquet_table");
>>> teenagers = spark.sql("SELECT name from parquet_table where age >= 13 AND age <= 19")

To write the data from a parquet DataFrame to .json format, use one of the write commands:

>>> df_parquet.write.json("myjson_dir")
>>> df_parquet.write.format("json").save("myjson_dir2")

There are multiple modes while writing data. By default, error mode is enabled, which will throw an error if output already exists. Other modes are append, overwrite, and ignore when the target data source already exists. The following example appends data to the myjson_dir and parquet_dir directory:

>>> df_parquet.write.mode("append").json("myjson_dir")
>>> df_parquet.write.mode("append").save("parquet_dir")

A DataFrame can be written to a Hive Table. The following example writes data to a hive managed table in the default Parquet format:

>>> df_parquet.write.saveAsTable("hive_parquet_table")

Parquet supports schema evolution such as ProtocolBuffer, Avro, and Thrift. Multiple schemas can be merged by setting the global option, spark.sql.parquet.mergeSchema to true or using the mergeSchema option as shown in the following:

>>> df_parquet = spark.read.option("mergeSchema", "true").parquet("parquet_dir")

Table partitioning is a common optimization approach used in parquet hive tables as well. Partition discovery and partition pruning happen automatically for queries performed on Parquet tables. Also, while writing the parquet tables, data can be partitioned using the partitionBy method.

Since Parquet is a columnar format, column pruning is automatic and also predicate pushdown is automatic by default with the spark.sql.parquet.filterPushdown parameter set to true.

Working with ORC

The Optimized Row Columnar (ORC) file format provides an efficient way to store data. More details on file format and documentation can be found at https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC.

  1. Let's create an ORC file by writing data from a json DataFrame. Use one of the two commands listed:
    >>> df_json.write.orc("myorc_dir")
    >>> df_json.write.format("orc").save("myorc_dir")
    
  2. Then, let's read the ORC files with a generic read function:
    >>> df_orc = spark.read.orc("myorc_dir")
    >>> df_orc = spark.read.load("myorc_dir", format="orc")
    
  3. Write the DataFrame created in JSON format to a hive managed table in ORC format:
    >>> df_json.write.saveAsTable("hive_orc_table","orc")
    

Column Pruning: Since ORC is a columnar format, columns needed by the query are read-only.

Predicate Push-down: ORC enables predicate pushdown with the following parameter (with the help of indexes within each file). Check the current configuration and then change the value:

>>> spark.conf.get("spark.sql.orc.filterPushdown")
u'false'
>>> spark.conf.set("spark.sql.orc.filterPushdown","true")
>>> spark.conf.get("spark.sql.orc.filterPushdown")
u'true'

Partition Pruning: Just like Parquet tables, partition discovery and partition pruning are automatic for ORC tables too. To create ORC files, partitioning data by a column, use the following syntax:

>>> df_json.write.format("orc").partitionBy("age").save("partitioned_orc")

Working with JDBC

Spark SQL provides easy integration with external databases using JDBC. To use this functionality, you will need to include the JDBC driver for your particular database on the spark classpath. For example, in order to connect to a Postgres database from the Spark shell, you will use the following command or copy the jdbc jar to the jars directory of SPARK_HOME:

[cloudera@quickstart spark-2.0.0-bin-hadoop2.7 ]$  SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/pyspark

Tables from external databases can be loaded into Spark SQL as a DataFrame or as a temporary table. The following options are supported while creating a DataFrame using jdbc:

Parameter

Meaning

url

JDBC URL connection string

Dbtable

Name of the table

This can be a SQL query as well

Driver

Class name of the JDBC driver

partitionColumn, lowerBound, upperBound, numPartitions

Partitioning info on the database

All options must be used together

Let's create a DataFrame using one of the two commands given in the following:

>>> df = spark.read.format('jdbc').options(url='jdbc:mysql://localhost:3306/retail_db?user=root&password=cloudera', dbtable='departments').load()
>>> df = spark.read.format('jdbc').options(url='jdbc:mysql://localhost:3306/retail_db', dbtable='departments', user='root', password='cloudera').load()
>>> df.show()

Then, register it as a temporary table and query it:

>>> df.createTempView("dept")
>>> df_new = spark.sql("select * from dept where department_id> 5")

Let's write the DataFrame to another table in the MySQL database. This will create the table and write the data out:

>>> df_new.write.jdbc("jdbc:mysql://localhost:3306/retail_db?user=root&password=cloudera","new_table")

To write the DataFrame to another table in the MySQL database (and also specify the overwrite option to overwrite the data if the table is already existing):

>>> df_new.write.jdbc("jdbc:mysql://localhost:3306/retail_db?user=root&password=cloudera","new_table","overwrite")

Working with CSV

Download a .csv dataset as shown in the following and then copy it to HDFS. Start a pyspark shell to create a DataFrame from a .csv file:

[cloudera@quickstart spark-2.0.0-bin-hadoop2.7 ]$  wget https://raw.githubusercontent.com/databricks/spark-csv/master/src/test/resources/cars.csv --no-check-certificate

[cloudera@quickstart spark-2.0.0-bin-hadoop2.7 ]$ hadoop fs -put cars.csv

>>> csv_df = spark.read.options(header='true',inferSchema='true').csv("cars.csv")
>>> csv_df.printSchema()
root
 |-- year: integer (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

Let's select a few columns and write the DataFrame to another CSV file with a different compression codec.

>>> csv_df.select('year', 'model').write.options(codec="org.apache.hadoop.io.compress.GzipCodec").csv('newcars.csv')

External sources

External data sources are not included in spark by default. External data sources are available in Spark Packages, which is an index of packages contributed by the Spark community.

Spark Packages offers packages for reading different file formats and data from NoSQL databases such as HBase, Cassandra, and so on. When you want to include a Spark package in your application you need to use the --packages command-line option with a comma-separated list of maven coordinates of JARs to include on the driver and executor classpaths. When specified, it will search the local maven repo, then maven central and any additional remote repositories given by the --repositories option. The format for the coordinates should be groupId:artifactId:version.

Working with AVRO

Let's start the pyspark shell by providing the --packages dependency:

[cloudera@quickstart spark-2.0.0-bin-hadoop2.7 ]~  bin/pyspark --packages com.databricks:spark-avro_2.11:3.0.0

Let's read the JSON file, write it in AVRO format, and then read the same file:

>>> df_json = spark.read.json("people.json")
>>> df_json.write.format("com.databricks.spark.avro").save("avro_out")
>>> df_avro = spark.read.format("com.databricks.spark.avro").load("avro_out")

You can specify the record name and namespace to use when writing out by passing recordName and recordNamespace as optional parameters to Spark:

>>> df_avro.write.format("com.databricks.spark.avro").option("recordName","AvroTest").option("recordNamespace","com.cloudera.spark").save("newavro")

Working with XML

Reading XML data will be same as AVRO where we include external .xml package with pyspark shell. Perform the following steps to download .xml file and work with it:

  1. Let's download a sample .xml file from the following mentioned location and copy it to HDFS:
    [cloudera@quickstart ~]$ wget https://raw.githubusercontent.com/databricks/spark-xml/master/src/test/resources/books.xml --no-check-certificate
    [cloudera@quickstart ~]$ hadoop fs -put books.xml
    
  2. Let's start the pyspark shell by providing --packages dependency. Spark compiled with Scala 2.11 use spark-xml_2.11:0.4.0 and Spark compiled with Scala 2.10 use spark-xml_2.10:0.4.0:
    [cloudera@quickstart spark-2.0.0-bin-hadoop2.7]$ bin/pyspark --master yarn --packages com.databricks:spark-xml_2.11:0.4.0
    
  3. Then, let's read the .xml file and select data elements from DataFrame:
    >>> df_xml = spark.read.format('com.databricks.spark.xml').options(rowTag='book',attributePrefix='@').load('books.xml')
    >>> df_xml.select('@id','author','title').show(5,False)
    +-----+--------------------+---------------------+
    |@id  |author              |title                |
    +-----+--------------------+---------------------+
    |bk101|Gambardella, Matthew|XML Developer's Guide|
    |bk102|Ralls, Kim          |Midnight Rain        |
    |bk103|Corets, Eva         |Maeve Ascendant      |
    |bk104|Corets, Eva         |Oberon's Legacy      |
    |bk105|Corets, Eva         |The Sundered Grail   |
    +-----+--------------------+---------------------+
    >>> df_xml.where(df_xml.author == "Corets, Eva").select("@id", "author", "title", "price").withColumn("new_price",df_xml.price * 10).drop("price").show()
    +-----+-----------+------------------+---------+
    |  @id|     author|             title|new_price|
    +-----+-----------+------------------+---------+
    |bk103|Corets, Eva|   Maeve Ascendant|     59.5|
    |bk104|Corets, Eva|   Oberon's Legacy|     59.5|
    |bk105|Corets, Eva|The Sundered Grail|     59.5|
    +-----+-----------+------------------+---------+
    

Working with Pandas

Pandas is a Python library for data manipulation. Use the following commands to create a DataFrame in Pandas and convert it to Spark DataFrame and vice versa. Install pandas using pip if it is not done already:

>>> import pandas
>>> data = [
('v1', 'v5', 'v9'),
('v2', 'v6', 'v10'),
('v3', 'v7', 'v11'),
('v4', 'v8', 'v12')]

>>> pandas_df = pandas.DataFrame(data, columns=['col1', 'col2', 'col3'])

>>> spark_df = spark.createDataFrame(pandas_df)

>>> spark_df.toPandas()

DataFrame based Spark-on-HBase connector

HBase does not offer SQL as a means to query the data. It exposes GET, SCAN, and FILTER APIs, which are difficult to implement in ad-hoc queries. So usually, for analyzing the data on HBase, it is common to write MapReduce programs using Java. Integration with Hive or Pig provides an easy way to write SQLs or scripts to analyze the data, but they are not efficient since they use TableInputFormat instead of the native HBase API. Apache Phoenix provides an easy way to integrate with HBase and write relational queries. Spark-on-HBase connector is a new connector based on the DataFrame framework and is created using the standard Datasource API so it can be simply used by specifying the packages option. It does not leverage TableInputFormat, but it creates a customized RDD that can implement partition and column pruning, predicate push down, and data locality. It also leverages the Spark Catalyst optimizer for optimizations. The following exercise was tried on HDP 2.5 sandbox version with Spark version 1.6.2.

Let's get started with Spark-on-HBase connector using the following steps:

  1. Copy hbase-site.xml to the spark configuration directory:
    cp /etc/hbase/conf/hbase-site.xml /etc/spark/conf/
    
  2. Create an HBase table and insert some data into it. Though there is a way to create a table and insert data using this connector, it is more common to analyze the data on an existing table:
    hbase(main):001:0> create 'IOTEvents', {NAME => 'e', VERSIONS => 100}
    hbase(main):002:0> put 'IOTEvents', '100', 'e:status', 'active'
    hbase(main):003:0> put 'IOTEvents', '100', 'e:e_time', '1470009600'
    hbase(main):004:0> put 'IOTEvents', '200', 'e:status', 'active'
    hbase(main):005:0> put 'IOTEvents', '200', 'e:e_time', '1470013200'
    hbase(main):006:0> put 'IOTEvents', '300', 'e:status', 'inactive'
    
  3. Start spark shell with the packages option:
    [cloudera@quickstart spark-2.0.0-bin-hadoop2.7 ]$ spark-shell --packages zhzhan:shc:0.0.11-1.6.1-s_2.10
    
  4. Define a catalog with HBase and DataFrame mapping and define a function to read data from HBase. Create a DataFrame using the catalog and read functions:
    import org.apache.spark.sql.execution.datasources.hbase._
    
    def iotCatalog = s"""{
      |"table":{"namespace":"default", "name":"IOTEvents"},
      |"rowkey":"key",
      |"columns":{
        |"Rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"Status":{"cf":"e", "col":"status", "type":"string"},
        |"Event_time":{"cf":"e", "col":"e_time", "type":"string"}
      |}
    |}""".stripMargin
    
    import org.apache.spark.sql._
    
    def readHBase(cat: String): DataFrame = {
      sqlContext
      .read
      .options(Map(HBaseTableCatalog.tableCatalog->cat))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()
    }
    
    val iotEventsDF = readHBase(iotCatalog)
    
  5. Once the DataFrame is created, all DataFrame operations can be applied on top of it:
    iotEventsDF.show()
    
    +------+----------+--------+
    |Rowkey|Event_time|  Status|
    +------+----------+--------+
    |   100|1470009600|  active|
    |   200|1470013200|  active|
    |   300|      null|inactive|
    +------+----------+--------+
    
    iotEventsDF.filter($"Status" === "inactive").show()
    
    +------+----------+--------+
    |Rowkey|Event_time|  Status|
    +------+----------+--------+
    |   300|      null|inactive|
    +------+----------+--------+
    
  6. Register the DataFrame as a temporary table and query it:
    iotEventsDF.registerTempTable("iotevents")
    
    
    sqlContext.sql("select count(Rowkey) as count from iotevents").show
    
    +-----+
    |count|
    +-----+
    |    3|
    +-----+
    
    sqlContext.sql("select Rowkey, Event_time, Status from iotevents where Status = 'active'").show
    +------+----------+------+
    |Rowkey|Event_time|Status|
    +------+----------+------+
    |   100|1470009600|active|
    |   200|1470013200|active|
    +------+----------+------+
    
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset