Datasets are similar to RDDs; however, instead of using Java or Kryo Serialization, they use a specialized Encoder to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are generated dynamically and use a format that allows Spark to perform many operations such as filtering, sorting, and hashing without deserializing the bytes back into an object. Source: https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets.
The following Scala example creates a Dataset and DataFrame from an RDD. Enter the scala shell with the spark-shell command:
scala> case class Dept(dept_id: Int, dept_name: String) defined class Dept scala> val deptRDD = sc.makeRDD(Seq(Dept(1,"Sales"),Dept(2,"HR"))) deptRDD: org.apache.spark.rdd.RDD[Dept] = ParallelCollectionRDD[0] at makeRDD at <console>:26 scala> val deptDS = spark.createDataset(deptRDD) deptDS: org.apache.spark.sql.Dataset[Dept] = [dept_id: int, dept_name: string] scala> val deptDF = spark.createDataFrame(deptRDD) deptDF: org.apache.spark.sql.DataFrame = [dept_id: int, dept_name: string] scala> deptDS.rdd res12: org.apache.spark.rdd.RDD[Dept] = MapPartitionsRDD[5] at rdd at <console>:31 scala> deptDF.rdd res13: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[8] at rdd at <console>:31
Notice that when you convert a Dataset to RDD, you get RDD[Dept]
. But, when you convert a DataFrame to RDD, you get RDD[Row]
.
A compile-time safety check is done as shown in the following code. Since dept_location
is not a member of the Dept
case class, it will throw an error.
scala> deptDS.filter(x => x.dept_location > 1).show() <console>:31: error: value dept_location is not a member of Dept deptDS.filter(x => x.dept_location > 1).show()
A DataFrame can be converted to a Dataset by providing a class with the as method as shown in the following example:
scala> val newDeptDS = deptDF.as[Dept] newDeptDS: org.apache.spark.sql.Dataset[Dept] = [dept_id: int, dept_name: string] scala> newDeptDS.show() +-------+---------+ |dept_id|dept_name| +-------+---------+ | 1| Sales| | 2| HR| +-------+---------+
Use the toDF
function to convert a Dataset to a DataFrame. Here is another Scala example for converting the Dataset created above to a DataFrame:
scala> newDeptDS.first() res27: Dept = Dept(1,Sales) scala> newDeptDS.toDF.first() res28: org.apache.spark.sql.Row = [1,Sales]
Note that res27
is resulting in a Dept
case class object and res28
is resulting in a Row
object.
Accessing metadata information about Hive tables and UDFs is made easy with the Catalog API. The following commands explain how to access metadata:
scala> spark.catalog.listDatabases().select("name").show() +-------+ | name| +-------+ |default| +-------+ scala> spark.catalog.listTables.show() +---------+--------+-----------+---------+-----------+ | name|database|description|tableType|isTemporary| +---------+--------+-----------+---------+-----------+ |customers| default| null| EXTERNAL| false| |sample_07| default| null| MANAGED| false| |sample_08| default| null| MANAGED| false| | web_logs| default| null| MANAGED| false| |jsontable| null| null|TEMPORARY| true| +---------+--------+-----------+---------+-----------+ scala> spark.catalog.isCached("sample_07") res29: Boolean = false
The createTempView
and dropTempView
methods are used to create and drop temporary tables using the DataFrame API or the Dataset API.
spark.catalog.listFunctions().show()
will list the functions available with a description.