In the previous chapter, you learned about sources, the abstraction that allows Structured Streaming to acquire data for processing. After that data has been processed, we would want to do something with it. We might want to write it to a database for later querying, to a file for further (batch) processing, or to another streaming backend to keep the data in motion.
In Structured Streaming, sinks are the abstraction that represents how to produce data to an external system. Structured Streaming comes with several built-in sources and defines an API that lets us create custom sinks to other systems that are not natively supported.
In this chapter, we look at how a sink works, review the details of the sinks provided by Structured Streaming, and explore how to create custom sinks to write data to systems not supported by the default implementations.
Sinks serve as output adaptors between the internal data representation in Structured Streaming and external systems. They provide a write path for the data resulting from the stream processing. Additionally, they must also close the loop of reliable data delivery.
To participate in the end-to-end reliable data delivery, sinks must provide an idempotent write operation. Idempotent means that the result of executing the operation two or more times is equal to executing the operation once. When recovering from a failure, Spark might reprocess some data that was partially processed at the time the failure occurred. At the side of the source, this is done by using the replay functionality. Recall from “Understanding Sources”, reliable sources must provide a means of replaying uncommitted data, based on a given offset. Likewise, sinks must provide the means of removing duplicated records before those are written to the external source.
The combination of a replayable source and an idempotent sink is what grants Structured Streaming its effectively exactly once data delivery semantics. Sinks that are not able to implement the idempotent requirement will result in end-to-end delivery guarantees of at most “at least once” semantics. Sinks that are not able to recover from the failure of the streaming process are deemed “unreliable” because they might lose data.
In the next section, we go over the available sinks in Structured Streaming in detail.
Third-party vendors might provide custom Structured Streaming sinks for their products. When integrating one of these external sinks in your project, consult their documentation to determine the data delivery warranties they support.
Structured Streaming comes with several sinks that match the supported sources as well as sinks that let us output data to temporary storage or to the console. In rough terms, we can divide the provided sinks into reliable and learning/experimentation support. In addition, it also offers a programmable interface that allows us to work with arbitrary external systems.
The sinks considered reliable or production ready provide well-defined data delivery semantics and are resilient to total failure of the streaming process.
The following are the provided reliable sinks:
This writes data to files in a directory in the filesystem. It supports the same file formats as the File source: JSON, Parquet, comma-separated values (CSV), and Text.
This writes data to Kafka, effectively keeping the data “on the move.” This is an interesting option to integrate the results of our process with other streaming frameworks that rely on Kafka as the data backbone.
The following sinks are provided to support interaction and experimentation with Structured Streaming. They do not provide failure recovery and therefore their use in production is discouraged because it can result in data loss.
The following are nonreliable sinks:
This creates a temporary table with the results of the streaming query. The resulting table can be queried within the same Java virtual machine (JVM) process, which allows in-cluster queries to access the results of the streaming process.
This prints the results of the query to the console. This is useful at development time to visually inspect the results of the stream process.
Next to the built-in sinks, we also have the option to create a sink programmatically.
This is achieved with the foreach
operation that, as its name implies, offers access to each individual resulting record of the output stream.
Finally, it is possible to develop our own custom sinks using the sink
API directly.
In the rest of this chapter, we explore the configuration and options available for each sink. We present in-depth coverage of the reliable sinks that should provide a thorough view of their applicability and can serve as a reference when you begin developing your own applications.
The experimental sinks are limited in scope, and that is also reflected in the level of coverage that follows.
Toward the end of this chapter, we look at the custom sink
API options and review the considerations we need to take when developing our own sinks.
If you are in your initial exploration phase of Structured Streaming, you might want to skip this section and come back to it later, when you are busy developing your own Structured Streaming jobs.
Files are a common intersystem boundary. When used as a sink for a streaming process, they allow the data to become at rest after the stream-oriented processing. Those files can become part of a data lake or can be consumed by other (batch) processes as part of a larger processing pipeline that combines streaming and batch modes.
Scalable, reliable, and distributed filesystems—such as HDFS or object stores like Amazon Simple Storage Service (Amazon S3)—make it possible to store large datasets as files in arbitrary formats. When running in local mode, at exploration or development time, it’s possible to use the local filesystem for this sink.
The File sink supports the same formats as the File source:
CSV
JSON
Parquet
ORC
Text
Structured Streaming shares the same File Data Source implementation used in batch mode.
The write options offered by the DataFrameWriter
for each File format are also available in streaming mode.
In this section, we highlight the most commonly used options.
For the most up-to-date list, always consult the online documentation for your specific Spark version.
Before going into the details of each format, let’s explore a general File sink example that’s presented in Example 11-1.
// assume an existing streaming dataframe df
val
query
=
stream
.
writeStream
.
format
(
"csv"
)
.
option
(
"sep"
,
" "
)
.
outputMode
(
"append"
)
.
trigger
(
Trigger
.
ProcessingTime
(
"30 seconds"
))
.
option
(
"path"
,
"<dest/path>"
)
.
option
(
"checkpointLocation"
,
"<checkpoint/path>"
)
.
start
()
In this example, we are using the csv
format to write the stream results to the <dest/path>
destination directory using TAB
as the custom separator.
We also specify a checkpointLocation
, where the checkpoint metadata is stored at regular intervals.
The File sink supports only append
as outputMode
, and it can be safely omitted in the writeStream
declaration.
Attempting to use another mode will result in the following exception when the query starts:
org.apache.spark.sql.AnalysisException: Data source ${format} does not support ${output_mode} output mode;
.
One additional parameter that we see in Example 11-1 is the use of a trigger
.
When no trigger is specified, Structured Streaming starts the processing of a new batch as soon as the previous one is finished.
In the case of the File sink, and depending on the throughput of the input stream, this might result in the generation of many small files. This might be detrimental for the filesystem storage capacity and performance.
Consider Example 11-2.
val
stream
=
spark
.
readStream
.
format
(
"rate"
).
load
()
val
query
=
stream
.
writeStream
.
format
(
"json"
)
.
option
(
"path"
,
"/tmp/data/rate"
)
.
option
(
"checkpointLocation"
,
"/tmp/data/rate/checkpoint"
)
.
start
()
If we let this query run for a little while and then we check the target directory, we should observe a large number of small files:
$
ls -1 part-00000-03a1ed33-3203-4c54-b3b5-dc52646311b2-c000.json part-00000-03be34a1-f69a-4789-ad65-da351c9b2d49-c000.json part-00000-03d296dd-c5f2-4945-98a8-993e3c67c1ad-c000.json part-00000-0645a678-b2e5-4514-a782-b8364fb150a6-c000.json ...# Count the files in the directory
$
ls -1|
wc -l 562# A moment later
$
ls -1|
wc -l 608# What's the content of a file?
$
cat part-00007-e74a5f4c-5e04-47e2-86f7-c9194c5e85fa-c000.json{
"timestamp"
:"2018-05-13T19:34:45.170+02:00"
,"value"
:104}
As we learned in Chapter 10, the Rate source generates one record per second by default.
When we see the data contained in one file, we can indeed see that single record.
The query is, in fact, generating one file each time new data is available.
Although the contents of that file is not large, filesystems incur some overhead in keeping track of the number of files in the filesystem.
Even more, in the Hadoop Distributed File System (HDFS) each file occupies a block and replicates n
times regardless of the contents.
Given that the typical HDFS block is 128 MB, we can see how our naive query that uses a File sink can quickly deplete our storage.
The trigger
configuration is there to help us avoid this situation.
By providing time triggers for the production of files, we can ensure that we have a reasonably sufficient amount of data in each file.
We can observe the effect of a time trigger
by modifying our previous example as follows:
import
org.apache.spark.sql.streaming.Trigger
val
stream
=
spark
.
readStream
.
format
(
"rate"
).
load
()
val
query
=
stream
.
writeStream
.
format
(
"json"
)
.
trigger
(
Trigger
.
ProcessingTime
(
"1 minute"
))
// <-- Add Trigger configuration
.
option
(
"path"
,
"/tmp/data/rate"
)
.
option
(
"checkpointLocation"
,
"/tmp/data/rate/checkpoint"
)
.
start
()
Let’s issue the query and wait for a couple of minutes.
When we inspect the target directory, we should see considerably fewer files than before, and each file should contain more records.
The number of records per file depends on the DataFrame
partitioning:
$
ls -1 part-00000-2ffc26f9-bd43-42f3-93a7-29db2ffb93f3-c000.json part-00000-3cc02262-801b-42ed-b47e-1bb48c78185e-c000.json part-00000-a7e8b037-6f21-4645-9338-fc8cf1580eff-c000.json part-00000-ca984a73-5387-49fe-a864-bd85e502fd0d-c000.json ...# Count the files in the directory
$
ls -1|
wc -l 34# Some seconds later
$
ls -1|
wc -l 42# What's the content of a file?
$
cat part-00000-ca984a73-5387-49fe-a864-bd85e502fd0d-c000.json{
"timestamp"
:"2018-05-13T22:02:59.275+02:00"
,"value"
:94}
{
"timestamp"
:"2018-05-13T22:03:00.275+02:00"
,"value"
:95}
{
"timestamp"
:"2018-05-13T22:03:01.275+02:00"
,"value"
:96}
{
"timestamp"
:"2018-05-13T22:03:02.275+02:00"
,"value"
:97}
{
"timestamp"
:"2018-05-13T22:03:03.275+02:00"
,"value"
:98}
{
"timestamp"
:"2018-05-13T22:03:04.275+02:00"
,"value"
:99}
{
"timestamp"
:"2018-05-13T22:03:05.275+02:00"
,"value"
:100}
If you are trying this example on a personal computer, the number of partitions defaults to the number of cores present. In our case, we have eight cores, and we observe seven or eight records per partition. That’s still very few records, but it shows the principle that can be extrapolated to real scenarios.
Even though a trigger
based on the number of records or the size of the data would arguably be more interesting in this scenario, currently, only time-based triggers are supported.
This might change in the future, as Structured Streaming evolves.
We have already seen in previous examples the use of the method option
that takes a key and a value to set configuration options in a sink.
All supported file formats share the following configuration options:
path
A directory in a target filesystem where the streaming query writes the data files.
checkpointLocation
A directory in a resilient filesystem where the checkpointing metadata is stored. New checkpoint information is written with each query execution interval.
compression
(default: None)All supported file formats share the capability to compress the data, although the available compression codecs
might differ from format to format.
The specific compression algorithms are shown for each format in their corresponding section.
When configuring options of the File sink, it’s often useful to remember that any file written by the File sink can be read back using the corresponding File source. For example, when we discussed “JSON File Source Format”, we saw that it normally expects each line of the file to be a valid JSON document. Likewise, the JSON sink format will produce files containing one record per line.
Text-based files formats such as CSV and JSON accept custom formatting for date and timestamp data types:
dateFormat
(default: yyyy-MM-dd
)Configures the pattern used to format date
fields. Custom patterns follow the formats defined at java.text.SimpleDateFormat.
timestampFormat
(default “yyyy-MM-dd’T’HH:mm:ss.SSSXXX”)Configures the pattern used to format timestamp
fields. Custom patterns follow the formats defined at java.text.SimpleDateFormat.
timeZone
(default: local timezone)Configures the time zone to use to format timestamps.
Using the CSV File format, we can write our data in a ubiquitous tabular format that can be read by many programs, from spreadsheet applications to a wide range of enterprise software.
The CSV support in Spark supports many options to control the field separator, quoting behavior, and the inclusion of a header. In addition to that, the common File sink options and the date formatting options apply to the CSV sink.
In this section, we list the most commonly used options. For a comprehensive list, check the online documentation.
Following are the commonly used options for the CSV sink:
header
(default: false
) A flag to indicate whether we should include a header to the resulting file. The header consists of the name of the fields in this streaming DataFrame
.
quote
(default: "
[double quote]) Sets the character used to quote records. Quoting is necessary when records might contain the separator character, which, without quoting, would result in corrupt records.
quoteAll
(default: false
)A flag used to indicate whether all values should be quoted or only those that contain a separator character. Some external systems require all values to be quoted. When using the CSV format to import the resulting files in an external system, check that system’s import requirements to correctly configure this option.
sep
(default: ,
[comma]) Configures the separator character used between fields. The separator must be a single character. Otherwise, the query will throw an IllegalArgumentException
at runtime when it starts.
The JSON File sink lets us write output data to files using the JSON Lines format. This format transforms each record in the output dataset into a valid JSON document written in a line of text. The JSON File sink is symmetrical to the JSON source. As we would expect, files written with this format can be read again using the JSON source.
When using third-party JSON libraries to read the resulting files, we should take care of first reading the file(s) as lines of text and then parse each line as a JSON document representing one record.
Next to the common file and text format options, the JSON sink supports these specific configurations:
encoding
(default: UTF-8
)Configures the charset encoding used to write the JSON files.
lineSep
(default:
)Sets the line separator to be used between JSON records.
Supported compression
options (default: none
): none
, bzip2
, deflate
, gzip
, lz4
, and snappy
.
The text file sink writes plain-text files.
Although the other file formats would perform a conversion from the streaming DataFrame
or Dataset
schema to the particular file format structure, the text sink expects either a flattened streaming Dataset[String]
or a streaming DataFrame
with a schema containing a single value
field of StringType
.
The typical use of the text file format is to write custom text-based formats not natively supported in Structured Streaming. To achieve this goal, we first transform programmatically the data into the desired text representation. After that, we use the text format to write the data to files. Attempting to write any complex schema to a text sink will result in an error.
As we discussed in “The Kafka Source”, Kafka is a Publish/Subscribe (pub/sub) system. Although the Kafka source functions as a subscriber, the Kafka sink is the publisher counterpart. The Kafka sink allows us to write data (back) to Kafka, which then can be consumed by other subscribers to continue a chain of streaming processors.
Downstream consumers might be other streaming processors, implemented using Structured Streaming or any of the other streaming frameworks available, or (micro) services that consume the streaming data to fuel applications in the enterprise ecosystem.
In Kafka, data is represented as key–value records exchanged over a topic. Topics are composed of distributed partitions. Each partition maintains the messages in the order in which they were received. This ordering is indexed by an offset, which, in turn, is used by the consumer to indicate the record(s) to read. When a record is published to a topic, it’s placed in a partition of a topic. The choice of the partition depends on the key. The ruling principle is that a record with the same key will land in the same partition. As a result, the ordering in Kafka is partial. The sequence of records from a single partition will be ordered sequentially by arrival time but there are no ordering guarantees among partitions.
This model is highly scalable and Kafka implementation ensures low-latency reads and writes, making it an excellent carrier for streaming data.
Now that you’ve learned about the Kafka publishing model, we can look at the practical side of producing data to Kafka. We just saw that the Kafka records are structured as key–value pairs. We need to structure our data in the same shape.
In a minimal implementation, we must ensure that our streaming DataFrame
or Dataset
has a value
field of BinaryType
or StringType
.
The implication of this requirement is that we usually need to encode our data into a transport representation before sending it to Kafka.
When the key is not specified, Structured Streaming will replace the key
with null
.
This makes the Kafka sink use a round-robin assignment of the partition for the corresponding topic.
If we want to preserve control over the key assignment, we must have a key
field, also of BinaryType
or StringType
.
This key
is used for the partition assignment, resulting in a guaranteed ordering between records with equal keys.
Optionally, we can control the destination topic at the record level by adding a topic
field.
If present, the topic
value must correspond to a Kafka topic.
Setting the topic
on the writeStream
option overrides the value in the topic
field.
The related record will be published to that topic. This option is useful when implementing a fan-out pattern in which incoming records are sorted in different dedicated topics for further processing. Think for example about classifying incoming support tickets into dedicated sales, technical, and troubleshooting topics that are consumed downstream by their corresponding (micro) service.
After we have the data in the right shape, we also need the address of the target bootstrap servers in order to connect to the brokers.
In practical terms, this generally involves two steps:
Transform each record as a single field called value
and optionally assign a key and a topic to each record.
Declare our stream sink using the writeStream
builder.
Example 11-3 shows these steps in use.
// Assume an existing streaming dataframe 'sensorData'
// with schema: id: String, timestamp: Long, sensorType: String, value: Double
// Create a key and a value from each record:
val
kafkaFormattedStream
=
sensorData
.
select
(
$
"id"
as
"key"
,
to_json
(
struct
(
$
"id"
,
$
"timestamp"
,
$
"sensorType"
,
$
"value"
)
)
as
"value"
)
// In step two, we declare our streaming query:
val
kafkaWriterQuery
=
kafkaFormat
.
writeStream
.
queryName
(
"kafkaWriter"
)
.
outputMode
(
"append"
)
.
format
(
"kafka"
)
// determines that the kafka sink is used
.
option
(
"kafka.bootstrap.servers"
,
kafkaBootstrapServer
)
.
option
(
"topic"
,
targetTopic
)
.
option
(
"checkpointLocation"
,
"/path/checkpoint"
)
.
option
(
"failOnDataLoss"
,
"false"
)
// use this option when testing
.
start
()
When we add topic
information at the record level, we must omit the topic
configuration option.
In Example 11-4, we modify the previous code to write each record to a dedicated topic matching the sensorType
.
That is, all humidity
records go to the humidity topic, all radiation
records go to the radiation topic, and so on.
// assume an existing streaming dataframe 'sensorData'
// with schema: id: String, timestamp: Long, sensorType: String, value: Double
// Create a key, value and topic from each record:
val
kafkaFormattedStream
=
sensorData
.
select
(
$
"id"
as
"key"
,
$
"sensorType"
as
"topic"
,
to_json
(
struct
(
$
"id"
,
$
"timestamp"
,
$
"value"
))
as
"value"
)
// In step two, we declare our streaming query:
val
kafkaWriterQuery
=
kafkaFormat
.
writeStream
.
queryName
(
"kafkaWriter"
)
.
outputMode
(
"append"
)
.
format
(
"kafka"
)
// determines that the kafka sink is used
.
option
(
"kafka.bootstrap.servers"
,
kafkaBootstrapServer
)
.
option
(
"checkpointLocation"
,
"/path/checkpoint"
)
.
option
(
"failOnDataLoss"
,
"false"
)
// use this option when testing
.
start
()
Note that we have removed the setting option("topic", targetTopic)
and added a topic
field to each record.
This results in each record being routed to the topic corresponding to its sensorType
.
If we leave the setting option("topic", targetTopic)
in place, the value of the topic
field would have no effect.
The option("topic", targetTopic)
setting takes precedence.
When we look closely at the code in Example 11-3, we see that we create a single value
field by converting the existing data into its JSON representation.
In Kafka, each record consists of a key and a value.
The value
field contains the payload of the record.
To send or receive a record of arbitrary complexity to and from Kafka, we need to convert said record into a single-field representation that we can fit into this value
field.
In Structured Streaming, the conversion to and from this transport value representation to the actual record must be done through user code.
Ideally, the encoding we choose can be easily transformed into a structured record to take advantage of the Spark capabilities to manipulate data.
A common encoding format is JSON.
JSON has native support in the structured APIs of Spark, and that extends to Structured Streaming.
As we saw in Example 11-4, we write JSON by using the SQL function to_json
: to_json(struct($"id", $"timestamp", $"value")) as "value")
.
Binary representations such as AVRO and ProtoBuffers are also possible.
In such cases, we treat the value
field as a BinaryType
and use third-party libraries to do the encoding/decoding.
As of this writing, there is no built-in support for binary encodings, but AVRO support has been announced for an upcoming version.
An important factor to consider when choosing an encoding format is schema support. In a multiservice model that uses Kafka as the communications backbone, it’s typical to find services producing data that use a different programming model, language, and/or framework than a streaming processor or other service that consumes it.
To ensure interoperability, schema-oriented encodings are the preferred choice. Having a schema definition allows for the creation of artifacts in different languages and ensures that produced data can be consumed later on.
The Memory sink is a nonreliable sink that saves the results of the stream processing in an in-memory temporary table. It is considered nonreliable because all data will be lost in the case the streaming process ends, but it’s certainly useful in scenarios for which low-latency access to the streaming results are is required.
The temporary table created by this sink is named after the query name.
This table is backed up by the streaming query and will be updated at each trigger following the semantics of the chosen outputMode
.
The resulting table contains an up-to-date view of the query results and can be queried using classical Spark SQL operations. The query must be executed in the same process (JVM) where the Structured Streaming query is started.
The table maintained by the Memory Sink
can be accessed interactively.
That property makes it an ideal interface with interactive data exploration tools like the Spark REPL or a notebook.
Another common use is to provide a query service on top of the streaming data. This is done by combining a server module, like an HTTP server, with the Spark driver. Calls to specific HTTP endpoints can then be served with data from this in-memory table.
Example 11-5 assumes a sensorData
streaming dataset.
The result of the stream processing is materialized in this in-memory table, which is available in the SQL context as sample_memory_query
.
val
sampleMemoryQuery
=
sensorData
.
writeStream
.
queryName
(
"sample_memory_query"
)
// this query name will be the SQL table name
.
outputMode
(
"append"
)
.
format
(
"memory"
)
.
start
()
// After the query starts we can access the data in the temp table
val
memData
=
session
.
sql
(
"select * from sample_memory_query"
)
memData
.
count
()
// show how many elements we have in our table
The Memory sink supports all output modes: Append
, Update
, and Complete
.
Hence, we can use it with all queries, including aggregations.
The combination of the Memory sink with the Complete
mode is particularly interesting because it provides a fast, in-memory queryable store for the up-to-date computed complete state.
Note that for a query to support Complete
state, it must aggregate over a bounded-cardinality key.
This is to ensure that the memory requirements to handle the state are likewise bounded within the system resources.
For all of us who love to print “Hello, world!” to the screen output, we have the Console sink. Indeed, the Console sink lets us print a small sample of the results of the query to the standard output.
Its use is limited to debugging and data exploration in an interactive shell-based environment, such as the spark-shell
.
As we would expect, this sink is not reliable given that it does not commit any data to another system.
You should avoid using the Console sink in production environments, much like println
s are frowned upon from an operational code base.
There are times when we need to integrate our stream-processing applications with legacy systems in the enterprise. Also, as a young project, the range of available sinks in Structured Streaming is rather limited.
The foreach
sink consists of an API and sink definition that provides access to the results of the query execution.
It extends the writing capabilities of Structured Streaming to any external system that provides a Java virtual machine (JVM) client library.
To use the Foreach sink we must provide an implementation of the ForeachWriter
interface.
The ForeachWriter
controls the life cycle of the writer operation.
Its execution takes place distributed on the executors, and the methods are called for each partition of the streaming DataFrame
or Dataset
, as demonstrated in Example 11-6.
abstract
class
ForeachWriter
[
T
]
extends
Serializable
{
def
open
(
partitionId
:
Long
,
version
:
Long
)
:
Boolean
def
process
(
value
:
T
)
:
Unit
def
close
(
errorOrNull
:
Throwable
)
:
Unit
}
As we can see in Example 11-6, the ForeachWriter
is bound to a type [T]
that corresponds to the type of the streaming Dataset
or to spark.sql.Row
in case of a streaming DataFrame
.
Its API consists of three methods: open
, process
, and close
:
open
This is called at every trigger interval with the partitionId
and a unique version
number.
Using these two parameters, the ForeachWriter
must decide whether to process the partition being offered.
Returning true
will lead to the processing of each element using the logic in the process
method.
If the method returns false
, the partition will be skipped for processing.
process
This provides access to the data, one element at a time. The function applied to the data must produce a side effect, such as inserting the record in a database, calling a REST API, or using a networking library to communicate the data to another system.
close
This is called to notify the end of writing a partition. The error
object will be null when the output operation terminated successfully for this partition or will contain a Throwable
otherwise.
close
is called at the end of every partition writing operation, even when open
returned false
(to indicate that the partition should not be processed).
This contract is part of the data delivery semantics because it allows us to remove duplicated partitions that might already have been sent to the sink but are reprocessed by Structured Streaming as part of a recovery scenario.
For that mechanism to properly work, the sink must implement some persistent way to remember the partition/version
combinations that it has already seen.
After we have our ForeachWriter
implementation, we use the customary writeStream
method of declaring a sink and we call the dedicated foreach
method with the ForeachWriter
instance.
The ForeachWriter
implementation must be Serializable
.
This is mandatory because the ForeachWriter
is executed distributedly on each node of the cluster that contains a partition of the streaming Dataset
or DataFrame
being processed.
At runtime, a new deserialized copy of the provided ForeachWriter
instance will be created for each partition of the Dataset
or DataFrame
.
As a consequence, we might not pass any state in the initial constructor of the ForeachWriter
.
Let’s put this all together in a small example that shows how the Foreach sink works and illustrates the subtle intricacies of dealing with the state handling and serialization requirements.
For this example, we are going to develop a text-based TCP sink that transmits the results of the query to an external TCP socket receiving server.
In this example, we will be using the spark-shell
utility that comes with the Spark installation.
In Example 11-7, we create a simple TCP client that can connect and write text to a server socket, provided its host
and port
.
Note that this class is not Serializable
.
Socket
s are inherently nonserializable because they are dependent on the underlying system I/O ports.
class
TCPWriter
(
host
:
String
,
port
:
Int
)
{
import
java.io.PrintWriter
import
java.net.Socket
val
socket
=
new
Socket
(
host
,
port
)
val
printer
=
new
PrintWriter
(
socket
.
getOutputStream
,
true
)
def
println
(
str
:
String
)
=
printer
.
println
(
str
)
def
close
()
=
{
printer
.
flush
()
printer
.
close
()
socket
.
close
()
}
}
Next, in Example 11-8, we use this TCPWriter
in a ForeachWriter
implementation.
import
org.apache.spark.sql.ForeachWriter
class
TCPForeachWriter
(
host
:
String
,
port
:
Int
)
extends
ForeachWriter
[
RateTick
]
{
@transient
var
writer
:
TCPWriter
=
_
var
localPartition
:
Long
=
0
var
localVersion
:
Long
=
0
override
def
open
(
partitionId
:
Long
,
version
:
Long
)
:
Boolean
=
{
writer
=
new
TCPWriter
(
host
,
port
)
localPartition
=
partitionId
localVersion
=
version
println
(
s"Writing partition [
$partitionId
] and version[
$version
]"
)
true
// we always accept to write
}
override
def
process
(
value
:
RateTick
)
:
Unit
=
{
val
tickString
=
s"
${
v
.
timestamp
}
,
${
v
.
value
}
"
writer
.
println
(
s"
$localPartition
,
$localVersion
,
$tickString
"
)
}
override
def
close
(
errorOrNull
:
Throwable
)
:
Unit
=
{
if
(
errorOrNull
==
null
)
{
println
(
s"Closing partition [
$localPartition
] and version[
$localVersion
]"
)
writer
.
close
()
}
else
{
(
"Query failed with: "
+
errorOrNull
)
}
}
}
Pay close attention to how we have declared the TCPWriter
variable: @transient var writer:TCPWriter = _
.
@transient
means that this reference should not be serialized.
The initial value is null
(using the empty variable initialization syntax _
).
It’s only in the call to open
that we create an instance of TCPWriter
and assign it to our variable for later use.
Also, note how the process
method takes an object of type RateTick
.
Implementing a ForeachWriter
is easier when we have a typed Dataset
to start with as we deal with a specific object structure instead of spark.sql.Row
s, which are the generic data container for streaming DataFrames.
In this case, we transformed the initial streaming DataFrame
to a typed Dataset[RateTick]
before proceeding to the sink phase.
Now, to complete our example, we create a simple Rate
source and write the produced stream directly to our newly developed TCPForeachWriter
:
case
class
RateTick
(
timestamp
:
Long
,
value
:
Long
)
val
stream
=
spark
.
readStream
.
format
(
"rate"
)
.
option
(
"rowsPerSecond"
,
100
)
.
load
()
.
as
[
RateTick
]
val
writerInstance
=
new
TCPForeachWriter
(
"localhost"
,
9876
)
val
query
=
stream
.
writeStream
.
foreach
(
writerInstance
)
.
outputMode
(
"append"
)
Before starting our query, we run a simple TCP server to observe the results.
For this purpose, we use an nc
, a useful *nix command to create TCP/UDP clients and servers in the command line.
In this case, we use a TCP server listening to port 9876
:
# Tip: the syntax of netcat is system-specific.
# The following command should work on *nix and on OSX nc managed by homebrew.
# Check your system documentation for the proper syntax.
nc -lk 9876
Finally, we start our query:
val
queryExecution
=
query
.
start
()
In the shell running the nc
command, we should see output like the following:
5, 1, 1528043018, 72 5, 1, 1528043018, 73 5, 1, 1528043018, 74 0, 1, 1528043018, 0 0, 1, 1528043018, 1 0, 1, 1528043018, 2 0, 1, 1528043018, 3 0, 1, 1528043018, 4 0, 1, 1528043018, 5 0, 1, 1528043018, 6 0, 1, 1528043018, 7 0, 1, 1528043018, 8 0, 1, 1528043018, 9 0, 1, 1528043018, 10 0, 1, 1528043018, 11 7, 1, 1528043019, 87 7, 1, 1528043019, 88 7, 1, 1528043019, 89 7, 1, 1528043019, 90 7, 1, 1528043019, 91 7, 1, 1528043019, 92
In the output, the first column is the partition
, and the second is the version
, followed by the data produced by the Rate
source.
It’s interesting to note that the data is ordered within a partition, like partition 0
in our example, but there are no ordering guarantees among different partitions.
Partitions are processed in parallel in different machines of the cluster.
There’s no guarantee which one comes first.
Finally, to end the query execution, we call the stop
method:
queryExecution
.
stop
()
In this example, you have seen how to correctly use a minimalistic socket
client to output the data of a streaming query with the Foreach sink.
Socket communication is the underlying interaction mechanism of most database drivers and many other application clients in the wild.
The method that we have illustrated here is a common pattern that you can effectively apply to write to a variety of external systems that offer a JVM-based client library.
In a nutshell, we can summarize this pattern as follows:
Create a @transient
mutable reference to our driver class in the body of the ForeachWriter
.
In the open
method, initialize a connection to the external system.
Assign this connection to the mutable reference.
It’s guaranteed that this reference will be used by a single thread.
In process
, publish the provided data element to the external system.
Finally, in close
, we terminate all connections and clean up any state.
In Example 11-8, we saw how we needed an uninitialized mutable reference to the TCPWriter
: @transient var writer:TCPWriter = _
.
This seemingly elaborate construct is required to ensure that we instantiate the nonserializable class only when the ForeachWriter
is already deserialized and running remotely, in an executor.
If we want to explore what happens when we attempt to include a nonserializable reference in a ForeachWriter
implementation, we could declare our TCPWriter
instance like this, instead:
import
org.apache.spark.sql.ForeachWriter
class
TCPForeachWriter
(
host
:
String
,
port
:
Int
)
extends
ForeachWriter
[
RateTick
]
{
val
nonSerializableWriter
:
TCPWriter
=
new
TCPWriter
(
host
,
port
)
// ... same code as before ...
}
Although this looks simpler and more familiar, when we attempt to run our query with this ForeachWriter
implementation, we get a org.apache.spark.SparkException: Task not serializable
.
This produces a very long stack trace that contains a best-effort attempt at pointing out the offending class.
We must follow the stack trace until we find the Caused by
statement, such as that shown in the following trace:
Caused
by
:
java.io.NotSerializableException:
$line17.$read$$iw$$iw$TCPWriter
Serialization
stack
:
-
object
not
serializable
(
class:
$line17.$read$$iw$$iw$TCPWriter
,
value:
$line17.$read$$iw$$iw$TCPWriter@
4
f44d3e0
)
-
field
(
class
:
$line20.$read$$iw$$iw$TCPForeachWriter
,
name
:
nonSerializableWriter
,
type
:
class
$line17.$read$$iw$$iw$TCPWriter
)
-
object
(
class
$line20
.
$read$$iw$$iw$TCPForeachWriter
,
$line20
.
$read$$iw$$iw$TCPForeachWriter
@
54832
ad9
)
-
field
(
class
:
org.apache.spark.sql.execution.streaming.ForeachSink
,
name
:
org$apache$spark$sql$execution$streaming$ForeachSink$$writer
,
type
:
class
org.apache.spark.sql.ForeachWriter
)
As this example was running in the spark-shell
, we find some weird $$
-notation, but when we remove that noise, we can see that the nonserializable object is object not serializable (class: TCPWriter)
, and the reference to it is the field field name: nonSerializableWriter, type: class TCPWriter
.
Serialization issues are common in ForeachWriter
implementations.
Hopefully, with the tips in this section, you will be able to avoid any trouble in your own implementation.
But for cases when this happens, Spark makes a best-effort attempt at determining the source of the problem.
This information, provided in the stack trace, is very valuable to debug and solve these serialization issues.