Chapter 11. Structured Streaming Sinks

In the previous chapter, you learned about sources, the abstraction that allows Structured Streaming to acquire data for processing. After that data has been processed, we would want to do something with it. We might want to write it to a database for later querying, to a file for further (batch) processing, or to another streaming backend to keep the data in motion.

In Structured Streaming, sinks are the abstraction that represents how to produce data to an external system. Structured Streaming comes with several built-in sources and defines an API that lets us create custom sinks to other systems that are not natively supported.

In this chapter, we look at how a sink works, review the details of the sinks provided by Structured Streaming, and explore how to create custom sinks to write data to systems not supported by the default implementations.

Understanding Sinks

Sinks serve as output adaptors between the internal data representation in Structured Streaming and external systems. They provide a write path for the data resulting from the stream processing. Additionally, they must also close the loop of reliable data delivery.

To participate in the end-to-end reliable data delivery, sinks must provide an idempotent write operation. Idempotent means that the result of executing the operation two or more times is equal to executing the operation once. When recovering from a failure, Spark might reprocess some data that was partially processed at the time the failure occurred. At the side of the source, this is done by using the replay functionality. Recall from “Understanding Sources”, reliable sources must provide a means of replaying uncommitted data, based on a given offset. Likewise, sinks must provide the means of removing duplicated records before those are written to the external source.

The combination of a replayable source and an idempotent sink is what grants Structured Streaming its effectively exactly once data delivery semantics. Sinks that are not able to implement the idempotent requirement will result in end-to-end delivery guarantees of at most “at least once” semantics. Sinks that are not able to recover from the failure of the streaming process are deemed “unreliable” because they might lose data.

In the next section, we go over the available sinks in Structured Streaming in detail.

Note

Third-party vendors might provide custom Structured Streaming sinks for their products. When integrating one of these external sinks in your project, consult their documentation to determine the data delivery warranties they support.

Available Sinks

Structured Streaming comes with several sinks that match the supported sources as well as sinks that let us output data to temporary storage or to the console. In rough terms, we can divide the provided sinks into reliable and learning/experimentation support. In addition, it also offers a programmable interface that allows us to work with arbitrary external systems.

Reliable Sinks

The sinks considered reliable or production ready provide well-defined data delivery semantics and are resilient to total failure of the streaming process.

The following are the provided reliable sinks:

The File sink

This writes data to files in a directory in the filesystem. It supports the same file formats as the File source: JSON, Parquet, comma-separated values (CSV), and Text.

The Kafka sink

This writes data to Kafka, effectively keeping the data “on the move.” This is an interesting option to integrate the results of our process with other streaming frameworks that rely on Kafka as the data backbone.

Sinks for Experimentation

The following sinks are provided to support interaction and experimentation with Structured Streaming. They do not provide failure recovery and therefore their use in production is discouraged because it can result in data loss.

The following are nonreliable sinks:

The Memory sink

This creates a temporary table with the results of the streaming query. The resulting table can be queried within the same Java virtual machine (JVM) process, which allows in-cluster queries to access the results of the streaming process.

The Console sink

This prints the results of the query to the console. This is useful at development time to visually inspect the results of the stream process.

The Sink API

Next to the built-in sinks, we also have the option to create a sink programmatically. This is achieved with the foreach operation that, as its name implies, offers access to each individual resulting record of the output stream. Finally, it is possible to develop our own custom sinks using the sink API directly.

Exploring Sinks in Detail

In the rest of this chapter, we explore the configuration and options available for each sink. We present in-depth coverage of the reliable sinks that should provide a thorough view of their applicability and can serve as a reference when you begin developing your own applications.

The experimental sinks are limited in scope, and that is also reflected in the level of coverage that follows.

Toward the end of this chapter, we look at the custom sink API options and review the considerations we need to take when developing our own sinks.

Tip

If you are in your initial exploration phase of Structured Streaming, you might want to skip this section and come back to it later, when you are busy developing your own Structured Streaming jobs.

The File Sink

Files are a common intersystem boundary. When used as a sink for a streaming process, they allow the data to become at rest after the stream-oriented processing. Those files can become part of a data lake or can be consumed by other (batch) processes as part of a larger processing pipeline that combines streaming and batch modes.

Scalable, reliable, and distributed filesystems—such as HDFS or object stores like Amazon Simple Storage Service (Amazon S3)—make it possible to store large datasets as files in arbitrary formats. When running in local mode, at exploration or development time, it’s possible to use the local filesystem for this sink.

The File sink supports the same formats as the File source:

  • CSV

  • JSON

  • Parquet

  • ORC

  • Text

Note

Structured Streaming shares the same File Data Source implementation used in batch mode. The write options offered by the DataFrameWriter for each File format are also available in streaming mode. In this section, we highlight the most commonly used options. For the most up-to-date list, always consult the online documentation for your specific Spark version.

Before going into the details of each format, let’s explore a general File sink example that’s presented in Example 11-1.

Example 11-1. File sink example
// assume an existing streaming dataframe df
val query = stream.writeStream
  .format("csv")
  .option("sep", "	")
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .option("path","<dest/path>")
  .option("checkpointLocation", "<checkpoint/path>")
  .start()

In this example, we are using the csv format to write the stream results to the <dest/path> destination directory using TAB as the custom separator. We also specify a checkpointLocation, where the checkpoint metadata is stored at regular intervals.

The File sink supports only append as outputMode, and it can be safely omitted in the writeStream declaration. Attempting to use another mode will result in the following exception when the query starts: org.apache.spark.sql.AnalysisException: Data source ${format} does not support ${output_mode} output mode;.

Using Triggers with the File Sink

One additional parameter that we see in Example 11-1 is the use of a trigger. When no trigger is specified, Structured Streaming starts the processing of a new batch as soon as the previous one is finished. In the case of the File sink, and depending on the throughput of the input stream, this might result in the generation of many small files. This might be detrimental for the filesystem storage capacity and performance.

Consider Example 11-2.

Example 11-2. Rate source with File sink
val stream = spark.readStream.format("rate").load()
val query = stream.writeStream
  .format("json")
  .option("path","/tmp/data/rate")
  .option("checkpointLocation", "/tmp/data/rate/checkpoint")
  .start()

If we let this query run for a little while and then we check the target directory, we should observe a large number of small files:

$ ls -1
part-00000-03a1ed33-3203-4c54-b3b5-dc52646311b2-c000.json
part-00000-03be34a1-f69a-4789-ad65-da351c9b2d49-c000.json
part-00000-03d296dd-c5f2-4945-98a8-993e3c67c1ad-c000.json
part-00000-0645a678-b2e5-4514-a782-b8364fb150a6-c000.json
...

# Count the files in the directory
$ ls -1 | wc -l
562

# A moment later
$ ls -1 | wc -l
608

# What's the content of a file?
$ cat part-00007-e74a5f4c-5e04-47e2-86f7-c9194c5e85fa-c000.json
{"timestamp":"2018-05-13T19:34:45.170+02:00","value":104}

As we learned in Chapter 10, the Rate source generates one record per second by default. When we see the data contained in one file, we can indeed see that single record. The query is, in fact, generating one file each time new data is available. Although the contents of that file is not large, filesystems incur some overhead in keeping track of the number of files in the filesystem. Even more, in the Hadoop Distributed File System (HDFS) each file occupies a block and replicates n times regardless of the contents. Given that the typical HDFS block is 128 MB, we can see how our naive query that uses a File sink can quickly deplete our storage.

The trigger configuration is there to help us avoid this situation. By providing time triggers for the production of files, we can ensure that we have a reasonably sufficient amount of data in each file.

We can observe the effect of a time trigger by modifying our previous example as follows:

import org.apache.spark.sql.streaming.Trigger

val stream = spark.readStream.format("rate").load()
val query = stream.writeStream
  .format("json")
  .trigger(Trigger.ProcessingTime("1 minute")) // <-- Add Trigger configuration
  .option("path","/tmp/data/rate")
  .option("checkpointLocation", "/tmp/data/rate/checkpoint")
  .start()

Let’s issue the query and wait for a couple of minutes. When we inspect the target directory, we should see considerably fewer files than before, and each file should contain more records. The number of records per file depends on the DataFrame partitioning:

$ ls -1
part-00000-2ffc26f9-bd43-42f3-93a7-29db2ffb93f3-c000.json
part-00000-3cc02262-801b-42ed-b47e-1bb48c78185e-c000.json
part-00000-a7e8b037-6f21-4645-9338-fc8cf1580eff-c000.json
part-00000-ca984a73-5387-49fe-a864-bd85e502fd0d-c000.json
...

# Count the files in the directory
$ ls -1 | wc -l
34

# Some seconds later
$ ls -1 | wc -l
42

# What's the content of a file?

$ cat part-00000-ca984a73-5387-49fe-a864-bd85e502fd0d-c000.json
{"timestamp":"2018-05-13T22:02:59.275+02:00","value":94}
{"timestamp":"2018-05-13T22:03:00.275+02:00","value":95}
{"timestamp":"2018-05-13T22:03:01.275+02:00","value":96}
{"timestamp":"2018-05-13T22:03:02.275+02:00","value":97}
{"timestamp":"2018-05-13T22:03:03.275+02:00","value":98}
{"timestamp":"2018-05-13T22:03:04.275+02:00","value":99}
{"timestamp":"2018-05-13T22:03:05.275+02:00","value":100}

If you are trying this example on a personal computer, the number of partitions defaults to the number of cores present. In our case, we have eight cores, and we observe seven or eight records per partition. That’s still very few records, but it shows the principle that can be extrapolated to real scenarios.

Even though a trigger based on the number of records or the size of the data would arguably be more interesting in this scenario, currently, only time-based triggers are supported. This might change in the future, as Structured Streaming evolves.

Common Configuration Options Across All Supported File Formats

We have already seen in previous examples the use of the method option that takes a key and a value to set configuration options in a sink.

All supported file formats share the following configuration options:

path

A directory in a target filesystem where the streaming query writes the data files.

checkpointLocation

A directory in a resilient filesystem where the checkpointing metadata is stored. New checkpoint information is written with each query execution interval.

compression (default: None)

All supported file formats share the capability to compress the data, although the available compression codecs might differ from format to format. The specific compression algorithms are shown for each format in their corresponding section.

Note

When configuring options of the File sink, it’s often useful to remember that any file written by the File sink can be read back using the corresponding File source. For example, when we discussed “JSON File Source Format”, we saw that it normally expects each line of the file to be a valid JSON document. Likewise, the JSON sink format will produce files containing one record per line.

Common Time and Date Formatting (CSV, JSON)

Text-based files formats such as CSV and JSON accept custom formatting for date and timestamp data types:

dateFormat (default: yyyy-MM-dd)

Configures the pattern used to format date fields. Custom patterns follow the formats defined at java.text.SimpleDateFormat.

timestampFormat (default “yyyy-MM-dd’T’HH:mm:ss.SSSXXX”)

Configures the pattern used to format timestamp fields. Custom patterns follow the formats defined at java.text.SimpleDateFormat.

timeZone (default: local timezone)

Configures the time zone to use to format timestamps.

The CSV Format of the File Sink

Using the CSV File format, we can write our data in a ubiquitous tabular format that can be read by many programs, from spreadsheet applications to a wide range of enterprise software.

Options

The CSV support in Spark supports many options to control the field separator, quoting behavior, and the inclusion of a header. In addition to that, the common File sink options and the date formatting options apply to the CSV sink.

Note

In this section, we list the most commonly used options. For a comprehensive list, check the online documentation.

Following are the commonly used options for the CSV sink:

header (default: false)

A flag to indicate whether we should include a header to the resulting file. The header consists of the name of the fields in this streaming DataFrame.

quote (default: " [double quote])

Sets the character used to quote records. Quoting is necessary when records might contain the separator character, which, without quoting, would result in corrupt records.

quoteAll (default: false)

A flag used to indicate whether all values should be quoted or only those that contain a separator character. Some external systems require all values to be quoted. When using the CSV format to import the resulting files in an external system, check that system’s import requirements to correctly configure this option.

sep (default: , [comma])

Configures the separator character used between fields. The separator must be a single character. Otherwise, the query will throw an IllegalArgumentException at runtime when it starts.

The JSON File Sink Format

The JSON File sink lets us write output data to files using the JSON Lines format. This format transforms each record in the output dataset into a valid JSON document written in a line of text. The JSON File sink is symmetrical to the JSON source. As we would expect, files written with this format can be read again using the JSON source.

Note

When using third-party JSON libraries to read the resulting files, we should take care of first reading the file(s) as lines of text and then parse each line as a JSON document representing one record.

Options

Next to the common file and text format options, the JSON sink supports these specific configurations:

encoding (default: UTF-8)

Configures the charset encoding used to write the JSON files.

lineSep (default: )

Sets the line separator to be used between JSON records.

Supported compression options (default: none): none, bzip2, deflate, gzip, lz4, and snappy.

The Parquet File Sink Format

The Parquet File sink supports the common File sink configuration and does not have format-specific options.

Supported compression options (default: snappy): none, gzip, lzo, and snappy.

The Text File Sink Format

The text file sink writes plain-text files. Although the other file formats would perform a conversion from the streaming DataFrame or Dataset schema to the particular file format structure, the text sink expects either a flattened streaming Dataset[String] or a streaming DataFrame with a schema containing a single value field of StringType.

The typical use of the text file format is to write custom text-based formats not natively supported in Structured Streaming. To achieve this goal, we first transform programmatically the data into the desired text representation. After that, we use the text format to write the data to files. Attempting to write any complex schema to a text sink will result in an error.

Options

Beside the common options for sinks and text-based formats, the text sink supports the following configuration option:

lineSep (default: )

Configures the line separator used for terminating each text line.

The Kafka Sink

As we discussed in “The Kafka Source”, Kafka is a Publish/Subscribe (pub/sub) system. Although the Kafka source functions as a subscriber, the Kafka sink is the publisher counterpart. The Kafka sink allows us to write data (back) to Kafka, which then can be consumed by other subscribers to continue a chain of streaming processors.

Downstream consumers might be other streaming processors, implemented using Structured Streaming or any of the other streaming frameworks available, or (micro) services that consume the streaming data to fuel applications in the enterprise ecosystem.

Understanding the Kafka Publish Model

In Kafka, data is represented as key–value records exchanged over a topic. Topics are composed of distributed partitions. Each partition maintains the messages in the order in which they were received. This ordering is indexed by an offset, which, in turn, is used by the consumer to indicate the record(s) to read. When a record is published to a topic, it’s placed in a partition of a topic. The choice of the partition depends on the key. The ruling principle is that a record with the same key will land in the same partition. As a result, the ordering in Kafka is partial. The sequence of records from a single partition will be ordered sequentially by arrival time but there are no ordering guarantees among partitions.

This model is highly scalable and Kafka implementation ensures low-latency reads and writes, making it an excellent carrier for streaming data.

Using the Kafka Sink

Now that you’ve learned about the Kafka publishing model, we can look at the practical side of producing data to Kafka. We just saw that the Kafka records are structured as key–value pairs. We need to structure our data in the same shape.

In a minimal implementation, we must ensure that our streaming DataFrame or Dataset has a value field of BinaryType or StringType. The implication of this requirement is that we usually need to encode our data into a transport representation before sending it to Kafka.

When the key is not specified, Structured Streaming will replace the key with null. This makes the Kafka sink use a round-robin assignment of the partition for the corresponding topic.

If we want to preserve control over the key assignment, we must have a key field, also of BinaryType or StringType. This key is used for the partition assignment, resulting in a guaranteed ordering between records with equal keys.

Optionally, we can control the destination topic at the record level by adding a topic field. If present, the topic value must correspond to a Kafka topic. Setting the topic on the writeStream option overrides the value in the topic field.

The related record will be published to that topic. This option is useful when implementing a fan-out pattern in which incoming records are sorted in different dedicated topics for further processing. Think for example about classifying incoming support tickets into dedicated sales, technical, and troubleshooting topics that are consumed downstream by their corresponding (micro) service.

After we have the data in the right shape, we also need the address of the target bootstrap servers in order to connect to the brokers.

In practical terms, this generally involves two steps:

  1. Transform each record as a single field called value and optionally assign a key and a topic to each record.

  2. Declare our stream sink using the writeStream builder.

Example 11-3 shows these steps in use.

Example 11-3. Kafka sink example
// Assume an existing streaming dataframe 'sensorData'
// with schema: id: String, timestamp: Long, sensorType: String, value: Double

// Create a key and a value from each record:

val kafkaFormattedStream = sensorData.select(
  $"id" as "key",
  to_json(
    struct($"id", $"timestamp", $"sensorType", $"value")
  ) as "value"
)

// In step two, we declare our streaming query:

val kafkaWriterQuery = kafkaFormat.writeStream
  .queryName("kafkaWriter")
  .outputMode("append")
  .format("kafka") // determines that the kafka sink is used
  .option("kafka.bootstrap.servers", kafkaBootstrapServer)
  .option("topic", targetTopic)
  .option("checkpointLocation", "/path/checkpoint")
  .option("failOnDataLoss", "false") // use this option when testing
  .start()

When we add topic information at the record level, we must omit the topic configuration option.

In Example 11-4, we modify the previous code to write each record to a dedicated topic matching the sensorType. That is, all humidity records go to the humidity topic, all radiation records go to the radiation topic, and so on.

Example 11-4. Kafka sink to different topics
// assume an existing streaming dataframe 'sensorData'
// with schema: id: String, timestamp: Long, sensorType: String, value: Double

// Create a key, value and topic from each record:

val kafkaFormattedStream = sensorData.select(
  $"id" as "key",
  $"sensorType" as "topic",
  to_json(struct($"id", $"timestamp", $"value")) as "value"
)

// In step two, we declare our streaming query:

val kafkaWriterQuery = kafkaFormat.writeStream
  .queryName("kafkaWriter")
  .outputMode("append")
  .format("kafka") // determines that the kafka sink is used
  .option("kafka.bootstrap.servers", kafkaBootstrapServer)
  .option("checkpointLocation", "/path/checkpoint")
  .option("failOnDataLoss", "false") // use this option when testing
  .start()

Note that we have removed the setting option("topic", targetTopic) and added a topic field to each record. This results in each record being routed to the topic corresponding to its sensorType. If we leave the setting option("topic", targetTopic) in place, the value of the topic field would have no effect. The option("topic", targetTopic) setting takes precedence.

Choosing an encoding

When we look closely at the code in Example 11-3, we see that we create a single value field by converting the existing data into its JSON representation. In Kafka, each record consists of a key and a value. The value field contains the payload of the record. To send or receive a record of arbitrary complexity to and from Kafka, we need to convert said record into a single-field representation that we can fit into this value field. In Structured Streaming, the conversion to and from this transport value representation to the actual record must be done through user code. Ideally, the encoding we choose can be easily transformed into a structured record to take advantage of the Spark capabilities to manipulate data.

A common encoding format is JSON. JSON has native support in the structured APIs of Spark, and that extends to Structured Streaming. As we saw in Example 11-4, we write JSON by using the SQL function to_json: to_json(struct($"id", $"timestamp", $"value")) as "value").

Binary representations such as AVRO and ProtoBuffers are also possible. In such cases, we treat the value field as a BinaryType and use third-party libraries to do the encoding/decoding.

As of this writing, there is no built-in support for binary encodings, but AVRO support has been announced for an upcoming version.

Warning

An important factor to consider when choosing an encoding format is schema support. In a multiservice model that uses Kafka as the communications backbone, it’s typical to find services producing data that use a different programming model, language, and/or framework than a streaming processor or other service that consumes it.

To ensure interoperability, schema-oriented encodings are the preferred choice. Having a schema definition allows for the creation of artifacts in different languages and ensures that produced data can be consumed later on.

The Memory Sink

The Memory sink is a nonreliable sink that saves the results of the stream processing in an in-memory temporary table. It is considered nonreliable because all data will be lost in the case the streaming process ends, but it’s certainly useful in scenarios for which low-latency access to the streaming results are is required.

The temporary table created by this sink is named after the query name. This table is backed up by the streaming query and will be updated at each trigger following the semantics of the chosen outputMode.

The resulting table contains an up-to-date view of the query results and can be queried using classical Spark SQL operations. The query must be executed in the same process (JVM) where the Structured Streaming query is started.

The table maintained by the Memory Sink can be accessed interactively. That property makes it an ideal interface with interactive data exploration tools like the Spark REPL or a notebook.

Another common use is to provide a query service on top of the streaming data. This is done by combining a server module, like an HTTP server, with the Spark driver. Calls to specific HTTP endpoints can then be served with data from this in-memory table.

Example 11-5 assumes a sensorData streaming dataset. The result of the stream processing is materialized in this in-memory table, which is available in the SQL context as sample_memory_query.

Example 11-5. Memory sink example
val sampleMemoryQuery = sensorData.writeStream
  .queryName("sample_memory_query")    // this query name will be the SQL table name
  .outputMode("append")
  .format("memory")
  .start()

// After the query starts we can access the data in the temp table
val memData = session.sql("select * from sample_memory_query")
memData.count() // show how many elements we have in our table

Output Modes

The Memory sink supports all output modes: Append, Update, and Complete. Hence, we can use it with all queries, including aggregations. The combination of the Memory sink with the Complete mode is particularly interesting because it provides a fast, in-memory queryable store for the up-to-date computed complete state. Note that for a query to support Complete state, it must aggregate over a bounded-cardinality key. This is to ensure that the memory requirements to handle the state are likewise bounded within the system resources.

The Console Sink

For all of us who love to print “Hello, world!” to the screen output, we have the Console sink. Indeed, the Console sink lets us print a small sample of the results of the query to the standard output.

Its use is limited to debugging and data exploration in an interactive shell-based environment, such as the spark-shell. As we would expect, this sink is not reliable given that it does not commit any data to another system.

You should avoid using the Console sink in production environments, much like printlns are frowned upon from an operational code base.

Options

Here are the configurable options for the Console sink:

numRows (default: 20)

The maximum number of rows to show at each query trigger.

truncate (default: true)

A flag that indicates whether the output of each cell in a row should be truncated.

Output Modes

As of Spark 2.3, the Console sink supports all output modes: Append, Update, and Complete.

The Foreach Sink

There are times when we need to integrate our stream-processing applications with legacy systems in the enterprise. Also, as a young project, the range of available sinks in Structured Streaming is rather limited.

The foreach sink consists of an API and sink definition that provides access to the results of the query execution. It extends the writing capabilities of Structured Streaming to any external system that provides a Java virtual machine (JVM) client library.

The ForeachWriter Interface

To use the Foreach sink we must provide an implementation of the ForeachWriter interface. The ForeachWriter controls the life cycle of the writer operation. Its execution takes place distributed on the executors, and the methods are called for each partition of the streaming DataFrame or Dataset, as demonstrated in Example 11-6.

Example 11-6. The API definition of the ForeachWriter
abstract class ForeachWriter[T] extends Serializable {

  def open(partitionId: Long, version: Long): Boolean

  def process(value: T): Unit

  def close(errorOrNull: Throwable): Unit

}

As we can see in Example 11-6, the ForeachWriter is bound to a type [T] that corresponds to the type of the streaming Dataset or to spark.sql.Row in case of a streaming DataFrame. Its API consists of three methods: open, process, and close:

open

This is called at every trigger interval with the partitionId and a unique version number. Using these two parameters, the ForeachWriter must decide whether to process the partition being offered. Returning true will lead to the processing of each element using the logic in the process method. If the method returns false, the partition will be skipped for processing.

process

This provides access to the data, one element at a time. The function applied to the data must produce a side effect, such as inserting the record in a database, calling a REST API, or using a networking library to communicate the data to another system.

close

This is called to notify the end of writing a partition. The error object will be null when the output operation terminated successfully for this partition or will contain a Throwable otherwise. close is called at the end of every partition writing operation, even when open returned false (to indicate that the partition should not be processed).

This contract is part of the data delivery semantics because it allows us to remove duplicated partitions that might already have been sent to the sink but are reprocessed by Structured Streaming as part of a recovery scenario. For that mechanism to properly work, the sink must implement some persistent way to remember the partition/version combinations that it has already seen.

After we have our ForeachWriter implementation, we use the customary writeStream method of declaring a sink and we call the dedicated foreach method with the ForeachWriter instance.

The ForeachWriter implementation must be Serializable. This is mandatory because the ForeachWriter is executed distributedly on each node of the cluster that contains a partition of the streaming Dataset or DataFrame being processed. At runtime, a new deserialized copy of the provided ForeachWriter instance will be created for each partition of the Dataset or DataFrame. As a consequence, we might not pass any state in the initial constructor of the ForeachWriter.

Let’s put this all together in a small example that shows how the Foreach sink works and illustrates the subtle intricacies of dealing with the state handling and serialization requirements.

TCP Writer Sink: A Practical ForeachWriter Example

For this example, we are going to develop a text-based TCP sink that transmits the results of the query to an external TCP socket receiving server. In this example, we will be using the spark-shell utility that comes with the Spark installation.

In Example 11-7, we create a simple TCP client that can connect and write text to a server socket, provided its host and port. Note that this class is not Serializable. Sockets are inherently nonserializable because they are dependent on the underlying system I/O ports.

Example 11-7. TCP socket client
class TCPWriter(host:String, port: Int) {
  import java.io.PrintWriter
  import java.net.Socket
  val socket = new Socket(host, port)
  val printer = new PrintWriter(socket.getOutputStream, true)
  def println(str: String) = printer.println(str)
  def close() = {
    printer.flush()
    printer.close()
    socket.close()
  }
}

Next, in Example 11-8, we use this TCPWriter in a ForeachWriter implementation.

Example 11-8. TCPForeachWriter implementation
import org.apache.spark.sql.ForeachWriter
class TCPForeachWriter(host: String, port: Int)
    extends ForeachWriter[RateTick] {

  @transient var writer: TCPWriter = _
  var localPartition: Long = 0
  var localVersion: Long = 0

  override def open(
      partitionId: Long,
      version: Long
    ): Boolean = {
    writer = new TCPWriter(host, port)
    localPartition = partitionId
    localVersion = version
    println(
      s"Writing partition [$partitionId] and version[$version]"
    )
    true // we always accept to write
  }

  override def process(value: RateTick): Unit = {
    val tickString = s"${v.timestamp}, ${v.value}"
    writer.println(
      s"$localPartition, $localVersion, $tickString"
    )
  }

  override def close(errorOrNull: Throwable): Unit = {
    if (errorOrNull == null) {
      println(
        s"Closing partition [$localPartition] and version[$localVersion]"
      )
      writer.close()
    } else {
      print("Query failed with: " + errorOrNull)
    }
  }
}

Pay close attention to how we have declared the TCPWriter variable: @transient var writer:TCPWriter = _. @transient means that this reference should not be serialized. The initial value is null (using the empty variable initialization syntax _). It’s only in the call to open that we create an instance of TCPWriter and assign it to our variable for later use.

Also, note how the process method takes an object of type RateTick. Implementing a ForeachWriter is easier when we have a typed Dataset to start with as we deal with a specific object structure instead of spark.sql.Rows, which are the generic data container for streaming DataFrames. In this case, we transformed the initial streaming DataFrame to a typed Dataset[RateTick] before proceeding to the sink phase.

Now, to complete our example, we create a simple Rate source and write the produced stream directly to our newly developed TCPForeachWriter:

case class RateTick(timestamp: Long, value: Long)

val stream = spark.readStream.format("rate")
                  .option("rowsPerSecond", 100)
                  .load()
                  .as[RateTick]

val writerInstance = new TCPForeachWriter("localhost", 9876)

val query = stream
      .writeStream
      .foreach(writerInstance)
      .outputMode("append")

Before starting our query, we run a simple TCP server to observe the results. For this purpose, we use an nc, a useful *nix command to create TCP/UDP clients and servers in the command line. In this case, we use a TCP server listening to port 9876:

# Tip: the syntax of netcat is system-specific.
# The following command should work on *nix and on OSX nc managed by homebrew.
# Check your system documentation for the proper syntax.
nc -lk 9876

Finally, we start our query:

val queryExecution = query.start()

In the shell running the nc command, we should see output like the following:

5, 1, 1528043018, 72
5, 1, 1528043018, 73
5, 1, 1528043018, 74
0, 1, 1528043018, 0
0, 1, 1528043018, 1
0, 1, 1528043018, 2
0, 1, 1528043018, 3
0, 1, 1528043018, 4
0, 1, 1528043018, 5
0, 1, 1528043018, 6
0, 1, 1528043018, 7
0, 1, 1528043018, 8
0, 1, 1528043018, 9
0, 1, 1528043018, 10
0, 1, 1528043018, 11
7, 1, 1528043019, 87
7, 1, 1528043019, 88
7, 1, 1528043019, 89
7, 1, 1528043019, 90
7, 1, 1528043019, 91
7, 1, 1528043019, 92

In the output, the first column is the partition, and the second is the version, followed by the data produced by the Rate source. It’s interesting to note that the data is ordered within a partition, like partition 0 in our example, but there are no ordering guarantees among different partitions. Partitions are processed in parallel in different machines of the cluster. There’s no guarantee which one comes first.

Finally, to end the query execution, we call the stop method:

queryExecution.stop()

The Moral of this Example

In this example, you have seen how to correctly use a minimalistic socket client to output the data of a streaming query with the Foreach sink. Socket communication is the underlying interaction mechanism of most database drivers and many other application clients in the wild. The method that we have illustrated here is a common pattern that you can effectively apply to write to a variety of external systems that offer a JVM-based client library. In a nutshell, we can summarize this pattern as follows:

  1. Create a @transient mutable reference to our driver class in the body of the ForeachWriter.

  2. In the open method, initialize a connection to the external system. Assign this connection to the mutable reference. It’s guaranteed that this reference will be used by a single thread.

  3. In process, publish the provided data element to the external system.

  4. Finally, in close, we terminate all connections and clean up any state.

Troubleshooting ForeachWriter Serialization Issues

In Example 11-8, we saw how we needed an uninitialized mutable reference to the TCPWriter: @transient var writer:TCPWriter = _. This seemingly elaborate construct is required to ensure that we instantiate the nonserializable class only when the ForeachWriter is already deserialized and running remotely, in an executor.

If we want to explore what happens when we attempt to include a nonserializable reference in a ForeachWriter implementation, we could declare our TCPWriter instance like this, instead:

import org.apache.spark.sql.ForeachWriter
class TCPForeachWriter(host: String, port: Int) extends ForeachWriter[RateTick] {

  val nonSerializableWriter:TCPWriter = new TCPWriter(host,port)
  // ... same code as before ...
}

Although this looks simpler and more familiar, when we attempt to run our query with this ForeachWriter implementation, we get a org.apache.spark.SparkException: Task not serializable. This produces a very long stack trace that contains a best-effort attempt at pointing out the offending class. We must follow the stack trace until we find the Caused by statement, such as that shown in the following trace:

Caused by: java.io.NotSerializableException: $line17.$read$$iw$$iw$TCPWriter
Serialization stack:
  - object not serializable (class: $line17.$read$$iw$$iw$TCPWriter,
      value: $line17.$read$$iw$$iw$TCPWriter@4f44d3e0)
  - field (class: $line20.$read$$iw$$iw$TCPForeachWriter,
      name: nonSerializableWriter, type: class $line17.$read$$iw$$iw$TCPWriter)
  - object (class $line20.$read$$iw$$iw$TCPForeachWriter,
      $line20.$read$$iw$$iw$TCPForeachWriter@54832ad9)
  - field (class: org.apache.spark.sql.execution.streaming.ForeachSink, name:
      org$apache$spark$sql$execution$streaming$ForeachSink$$writer,
      type: class org.apache.spark.sql.ForeachWriter)

As this example was running in the spark-shell, we find some weird $$-notation, but when we remove that noise, we can see that the nonserializable object is object not serializable (class: TCPWriter), and the reference to it is the field field name: nonSerializableWriter, type: class TCPWriter.

Serialization issues are common in ForeachWriter implementations. Hopefully, with the tips in this section, you will be able to avoid any trouble in your own implementation. But for cases when this happens, Spark makes a best-effort attempt at determining the source of the problem. This information, provided in the stack trace, is very valuable to debug and solve these serialization issues.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset