Analytics with the Dataset API

Datasets are similar to RDDs; however, instead of using Java or Kryo Serialization, they use a specialized Encoder to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are generated dynamically and use a format that allows Spark to perform many operations such as filtering, sorting, and hashing without deserializing the bytes back into an object. Source: https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets.

Creating Datasets

The following Scala example creates a Dataset and DataFrame from an RDD. Enter the scala shell with the spark-shell command:

scala> case class Dept(dept_id: Int, dept_name: String)
defined class Dept

scala> val deptRDD = sc.makeRDD(Seq(Dept(1,"Sales"),Dept(2,"HR")))
deptRDD: org.apache.spark.rdd.RDD[Dept] = ParallelCollectionRDD[0] at makeRDD at <console>:26

scala> val deptDS = spark.createDataset(deptRDD)
deptDS: org.apache.spark.sql.Dataset[Dept] = [dept_id: int, dept_name: string]

scala> val deptDF = spark.createDataFrame(deptRDD)
deptDF: org.apache.spark.sql.DataFrame = [dept_id: int, dept_name: string]

scala> deptDS.rdd
res12: org.apache.spark.rdd.RDD[Dept] = MapPartitionsRDD[5] at rdd at <console>:31

scala> deptDF.rdd
res13: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[8] at rdd at <console>:31

Notice that when you convert a Dataset to RDD, you get RDD[Dept]. But, when you convert a DataFrame to RDD, you get RDD[Row].

A compile-time safety check is done as shown in the following code. Since dept_location is not a member of the Dept case class, it will throw an error.

scala> deptDS.filter(x => x.dept_location > 1).show()
<console>:31: error: value dept_location is not a member of Dept
       deptDS.filter(x => x.dept_location > 1).show()

Converting a DataFrame to a Dataset

A DataFrame can be converted to a Dataset by providing a class with the as method as shown in the following example:

scala> val newDeptDS = deptDF.as[Dept]
newDeptDS: org.apache.spark.sql.Dataset[Dept] = [dept_id: int, dept_name: string]
scala> newDeptDS.show()
+-------+---------+
|dept_id|dept_name|
+-------+---------+
|      1|    Sales|
|      2|       HR|
+-------+---------+

Converting a Dataset to a DataFrame

Use the toDF function to convert a Dataset to a DataFrame. Here is another Scala example for converting the Dataset created above to a DataFrame:

scala> newDeptDS.first()
res27: Dept = Dept(1,Sales)

scala> newDeptDS.toDF.first()
res28: org.apache.spark.sql.Row = [1,Sales]

Note that res27 is resulting in a Dept case class object and res28 is resulting in a Row object.

Accessing metadata using Catalog

Accessing metadata information about Hive tables and UDFs is made easy with the Catalog API. The following commands explain how to access metadata:

scala> spark.catalog.listDatabases().select("name").show()
+-------+
|   name|
+-------+
|default|
+-------+

scala> spark.catalog.listTables.show()
+---------+--------+-----------+---------+-----------+
|     name|database|description|tableType|isTemporary|
+---------+--------+-----------+---------+-----------+
|customers| default|       null| EXTERNAL|      false|
|sample_07| default|       null|  MANAGED|      false|
|sample_08| default|       null|  MANAGED|      false|
| web_logs| default|       null|  MANAGED|      false|
|jsontable|    null|       null|TEMPORARY|       true|
+---------+--------+-----------+---------+-----------+

scala> spark.catalog.isCached("sample_07")
res29: Boolean = false

The createTempView and dropTempView methods are used to create and drop temporary tables using the DataFrame API or the Dataset API.

spark.catalog.listFunctions().show() will list the functions available with a description.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset