The previous chapters provided a good overview of the Structured Streaming programming model and how you can apply it in a practical way. You also saw how sources are the starting point of each Structured Streaming program. In this chapter, we study the general characteristics of a source and review the available sources in greater detail, including their different configuration options and modes of operation.
In Structured Streaming, a source is an abstraction that represents streaming data providers. The concept behind the source interface is that streaming data is a continuous flow of events over time that can be seen as a sequence, indexed with a monotonously incrementing counter.
Figure 10-1 illustrates how each event in the stream is considered to have an ever-increasing offset.
Offsets, as shown in Figure 10-2, are used to request data from the external source and to indicate what data has already been consumed.
Structured Streaming knows when there is data to process by asking the current offset from the external system and comparing it to the last processed offset.
The data to be processed is requested by getting a batch between two offsets start
and end
.
The source is informed that data has been processed by committing a given offset.
The source contract guarantees that all data with an offset less than or equal to the committed offset has been processed and that subsequent requests will stipulate only offsets greater than that committed offset.
Given these guarantees, sources might opt to discard the processed data to free up system resources.
Let’s take a closer look at the dynamics of the offset-based processing shown in Figure 10-2:
At t1, the system calls getOffset
and obtains the current offset for the source.
At t2, the system obtains the batch up to the last known offset by calling getBatch(start, end)
. Note that new data might have arrived in the meantime.
At t3, the system commits
the offset and the source drops the corresponding records.
This process repeats constantly, ensuring the acquisition of streaming data. To recover from eventual failure, offsets are often checkpointed to external storage.
Besides the offset-based interaction, sources must fulfill two requirements: to be reliable, sources must be replayable in the same order; and sources must provide a schema.
In Structured Streaming, replayability is the capacity to request a part of the stream that had been already requested but not committed yet.
Just like we can rewind that Netflix series we are watching to see a piece we just missed because of a distraction, sources must offer the capability to replay a piece of the stream that was already requested but not committed.
This is done by calling getBatch
with the offset range that we want to receive again.
A source is considered to be reliable when it can produce an uncommitted offset range even after a total failure of the Structured Streaming process.
In this failure recovery process, offsets are restored from their last known checkpoint and are requested again from the source.
This requires the actual streaming system that is backing the source implementation to store data safely outside the streaming
process.
By requiring replayability from the source, Structured Streaming delegates recovery responsibility to the source.
This implies that only reliable sources work with Structured Streaming to create strong end-to-end delivery guarantees.
A defining characteristic of the structured APIs of Spark is that they rely on schema information to handle the data at different levels. As opposed to processing opaque Strings or Byte Array blobs, schema information provides insights on how the data is shaped in terms of fields and types. We can use schema information to drive optimizations at different levels in the stack, from query planning to the internal binary representation of data, storage, and access to it.
Sources must provide schema information that describes the data they produce. Some source implementations allow this schema to be configured and use this configuration information to automatically parse incoming data and transform it into valid records. In fact, many file-based streaming sources such as JSON or comma-separated values (CSV) files follow this model, in which the user must provide the schema used by the file format to ensure proper parsing. Some other sources use a fixed internal schema that expresses the metadata information of every record and leaves the parsing of the payload to the application.
From an architectural perspective, creating schema-driven streaming applications is desirable because it facilitates the global understanding of how data flows through the system and drives the formalization of the different stages of a multiprocess streaming pipeline.
In Structured Streaming, we reuse the Spark SQL API for creating schema definitions.
There are several different methods that we can use to define the schema that defines the content of the stream—programmatically, inferred from a case class
definition, or loaded from an existing dataset:
We use the StructType
and StructField
classes to build up a representation of the schema.
For example, to represent a tracked vehicle with id, type, and location coordinates, we can construct the corresponding schema structure as follows:
import
org.apache.spark.sql.
{
StructType
,
StructField
}
_
import
org.apache.spark.sql.types._
val
schema
=
StructType
(
List
(
StructField
(
"id"
,
StringType
,
true
),
StructField
(
"type"
,
StringType
,
true
),
StructField
(
"location"
,
StructType
(
List
(
StructField
(
"latitude"
,
DoubleType
,
false
),
StructField
(
"longitude"
,
DoubleType
,
false
)
)),
false
)
)
)
StructField
can contain nested StructType
s, making it possible to create schemas of arbitrary depth and complexity.
In Scala, the schema also can be represented using arbitrary combinations of case class
es.
Given a single case class
or a case class
hierarchy, the schema representation can be computed by creating an Encoder
for the case class
and obtaining the schema from that encoder
instance.
Using this method, the same schema definition used in the preceding example can be obtained like so:
import
org.apache.spark.sql.Encoders
// Define the case class hierarchy
case
class
Coordinates
(
latitude
:
Double
,
longitude
:
Double
)
case
class
Vehicle
(
id
:
String
,
`type`
:
String
,
location
:
Coordinates
)
// Obtain the Encoder, and the schema from the Encoder
val
schema
=
Encoders
.
product
[
Vehicle
].
schema
A practical method of obtaining a schema definition is by maintaining a sample data file in a schema-aware format, such as Parquet.
To obtain our schema definition, we load the sample dataset and get the schema definition from the loaded DataFrame
:
val
sample
=
spark
.
read
.
parquet
(<
path
-
to
-
sample
>)
val
schema
=
sample
.
schema
The programmatic way of defining schemas is powerful but requires effort and is complex to maintain, often leading to errors. Loading a dataset might be practical at the prototyping stage, but it requires keeping the sample dataset up-to-date, which in some cases can lead to accidental complexity.
Although the best method to choose might be different from one use case to the other, in general, when working with Scala, we prefer to use the inference method, when possible.
The following are the sources currently available in the Spark distribution of Structured Streaming:
Allows the ingestion of data stored as files. In most cases, the data is transformed in records that are further processed in streaming mode. This supports these formats: JSON, CSV, Parquet, ORC, and plain text.
A TCP socket client able to connect to a TCP server and consume a text-based data stream. The stream must be encoded in the UTF-8 character set.
Produces an internally generated stream of (timestamp, value)
records with a configurable production rate. This is normally used for learning and testing purposes.
As we discussed in “Understanding Sources”, sources are considered reliable when they provide replay capabilities from an offset, even when the structured streaming process fails. Using this criterion, we can classify the available sources as follows:
File source, Kafka source
Socket source, Rate source
The unreliable sources may be used in a production system only when the loss of data can be tolerated.
The streaming source API is currently undergoing evolution. As of this writing, there is no stable public API to develop custom sources. This is expected to change in the near future.
In the next part of this chapter, we explore in detail the sources currently available. As production-ready sources, the File and the Kafka sources have many options that we discuss in detail. The Socket and the Rate source are limited in features, which will be evident by their concise coverage.
The File source is a simple streaming data source that reads files from a monitored directory in a filesystem. A file-based handover is a commonly used method to bridge a batch-based process with a streaming system. The batch process produces its output in a file format and drops it in a common directory where a suitable implementation of the File source can pick these files up and transform their contents into a stream of records for further processing in streaming mode.
The files are read using a specified format, which is provided with the .format(<format_name>)
method in the readStream
builder, or by using the dedicated methods in the DataStreamReader
that indicate the format to use; for example, readStream.parquet('/path/to/dir/')
.
When using the dedicated methods corresponding to each supported format, the method call should be done as the last call of the builder.
For example, the three forms in Example 10-1 are equivalent.
// Use format and load path
val
fileStream
=
spark
.
readStream
.
format
(
"parquet"
)
.
schema
(
schema
)
.
load
(
"hdfs://data/exchange"
)
// Use format and path options
val
fileStream
=
spark
.
readStream
.
format
(
"parquet"
)
.
option
(
"path"
,
"hdfs://data/exchange"
)
.
schema
(
schema
)
.
load
()
// Use dedicated method
val
fileStream
=
spark
.
readStream
.
schema
(
schema
)
.
parquet
(
"hdfs://data/exchange"
)
As of Spark v2.3.0, the following file-based formats are supported by Structured Streaming.
These are the same file formats supported by the static DataFrame
, Dataset
, and SQL APIs:
CSV
JSON
Parquet
ORC
text
textFile
Regardless of the specific format, the general functionality of the File source is to monitor a directory in a shared filesystem identified by its specific URL. All file formats support a common set of options that control the file inflow and define the aging criteria of the files.
As Apache Spark is a fast-evolving project, APIs and their options might change in future versions. Also, in this section, we cover only the most relevant options that apply to streaming workloads. For the most up-to-date information, always check the API documentation corresponding to your Spark version.
These options can be set for all file-based sources:
maxFilesPerTrigger
(Default: unset)Indicates how many files will be consumed at each query trigger. This setting limits the number of files processed at each trigger, and in doing so, it helps to control the data inflow in the system.
latestFirst
(Default: false
)When this flag is set to true
, newer files are elected for processing first.
Use this option when the most recent data has higher priority over older data.
maxFileAge
(Default: 7 days
)Defines an age threshold for the files in the directory.
Files older than the threshold will not be eligible for processing and will be effectively ignored.
This threshold is relative to the most recent file in the directory and not to the system clock.
For example, if maxFileAge
is 2 days
and the most recent file is from yesterday, the threshold to consider a file too old will be older than three days ago. This dynamic is similar to watermarks on event time.
fileNameOnly
(Default: false
)When set to true
, two files will be considered the same if they have the same name; otherwise, the full path will be considered.
When latestFirst
is set to true
and the maxFilesPerTrigger
option is configured, maxFileAge
is ignored because there might be a condition in which files that are valid for processing become older than the threshold because the system gives priority to the most recent files found.
In such cases, no aging policy can be set.
Some file formats, such as CSV and JSON, use a configurable parser to transform the text data in each file into structured records. It’s possible for upstream processes to create records that do not fulfill the expected format. These records are considered corrupted.
Streaming systems are characterized by their continuous running. A streaming process should not fail when bad data is received. Depending on the business requirements, we can either drop the invalid records or route the data considered corrupted to a separate error-handling flow.
The following options allow for the configuration of the parser behavior to handle those records that are considered corrupted:
mode
(default PERMISSIVE
)Controls the way that corrupted records are handled during parsing.
Allowed values are PERMISSIVE
, DROPMALFORMED
, and FAILFAST
.
PERMISSIVE
: The value of the corrupted record is inserted in a special field configured by the option columnNameOfCorruptRecord
that must exist in the schema. All other fields are set to null
. If the field does not exist, the record is dropped (same behavior as DROPMALFORMED
).
DROPMALFORMED
: Corrupted records are dropped.
FAILFAST
: An exception is thrown when a corrupted record is found. This method is not recommended in a streaming process, because the propagation of the exception will potentially make the streaming process fail and stop.
columnNameOfCorruptRecord
(default: “_corrupt_record”) Permits the configuration of the special field that contains the string value of malformed records. This field can also be configured by setting spark.sql.columnNameOfCorruptRecord
in the Spark configuration. If both spark.sql.columnNameOfCorruptRecord
and this option are set, this option takes precedence.
dateFormat
(default: "yyyy-MM-dd"
)Configures the pattern used to parse date
fields. Custom patterns follow the formats defined at java.text.SimpleDateFormat.
timestampFormat
(default: "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"
)Configures the pattern used to parse timestamp
fields. Custom patterns follow the formats defined at java.text.SimpleDateFormat.
The JSON format support for the File source lets us consume text files encoded as JSON, in which each line in the file is expected to be a valid JSON object. The JSON records are parsed using the provided schema. Records that do not follow the schema are considered invalid, and there are several options available to control the handling of invalid records.
By default, the JSON File source expects the file contents to follow the JSON Lines specification.
That is, each independent line in the file corresponds to a valid JSON document that complies with the specified schema.
Each line should be separated by a newline (
) character.
A CRLF
character (
) is also supported because trailing white spaces are ignored.
We can tweak the tolerance of the JSON parser to process data that does not fully comply with the standard. It’s also possible to change the behavior to handle those records that are considered corrupted. The following options allow for the configuration of the parser behavior:
allowComments
(default: false
)When enabled, comments in Java/C++ style are allowed in the file and the corresponding line will be ignored; for example:
//
Timestamps
are
in
ISO
8601
compliant
format
{
"id"
:
"x097abba"
,
"timestamp"
:
"2018-04-01T16:32:56+00:00"
}
{
"id"
:
"x053ba0bab"
,
"timestamp"
:
"2018-04-01T16:35:02+00:00"
}
Otherwise, comments in the JSON file are considered corrupted records and handled following the mode
setting.
allowNumericLeadingZeros
(default: false
)When enabled, leading zeros in numbers are allowed (e.g., 00314). Otherwise, the leading zeros are considered invalid numeric values, the corresponding record is deemed corrupted, and it is handled following the mode
setting.
allowSingleQuotes
(default: true
)Allows the use of single quotes to demark fields. When enabled, both single quotes and double quotes are allowed. Regardless of this setting, quote characters cannot be nested and must be properly escaped when used within a value; for example:
//
valid
record
{
"firstname"
:
"Alice"
,
'lastname':
'Wonderland'
}
//
invalid
nesting
{
"firstname"
:
'Elvis
"The King"
'
,
'lastname':
'Presley'
}
//
correct
escaping
{
"firstname"
:
'Elvis
"The King"
'
,
'lastname':
'Presley'
}
allowUnquotedFieldNames
(default: false
)Allows unquoted JSON field names (e.g., {firstname:"Alice"}
). Note that it’s not possible to have spaces in field names when using this option (e.g., {first name:"Alice"}
is considered corrupted even when the field name matches the schema). Use with caution.
multiLine
(default: false
)When enabled, instead of parsing JSON Lines, the parser will consider the contents of each file as a single valid JSON document and will attempt to parse its contents as records following the defined schema.
Use this option when the producer of the file can output only complete JSON documents as files. In such cases, use a top-level array to group the records, as shown in Example 10-2.
[
{
"firstname"
:
"Alice"
,
"last name"
:
"Wonderland"
,
"age"
:
7
},
{
"firstname"
:
"Coraline"
,
"last name"
:
"Spin"
,
"age"
:
15
}
]
primitivesAsString
(default false
)When enabled, primitive value types are considered strings. This allows you to read documents having fields of mixed types, but all values are read as a String
.
In Example 10-3 the resulting age
field is of type String
, containing values age="15"
for “Coraline” and age="unknown"
for “Diana”.
{
"firstname"
:
"Coraline"
,
"last name"
:
"Spin"
,
"age"
:
15
}
{
"firstname"
:
"Diana"
,
"last name"
:
"Prince"
,
"age"
:
"unknown"
}
CSV is a popular tabular data storage and exchange format that is widely supported by enterprise applications.
The File Source
CSV format support allows us to ingest and process the output of such applications in a Structured Streaming application.
Although the name “CSV” originally indicated that values are separated by commas, often the separation character can be freely configured.
There are many configuration options available to control the way data is transformed from plain text to structured records.
In the rest of this section, we cover the most common options and, in particular, those that are relevant for the streaming process. For formatting-related options, refer to the latest documentation. These are the most commonly used CSV parsing options:
comment
(default: “” [disabled])Configures a character that marks lines considered as comments; for example, when using option("comment","#")
, we can parse the following CSV with a comment in it:
#Timestamps are in ISO 8601 compliant format x097abba, 2018-04-01T16:32:56+00:00, 55 x053ba0bab, 2018-04-01T16:35:02+00:00, 32
header
(default: false
)Given that a schema must be provided, the header line is ignored and has no effect.
multiline
(default: false
)Consider each file as one record spanning all of the lines in the file.
quote
(default: "
[double quote])Configures the character used to enclose values that contain a column separator.
sep
(default: ,
[comma])Configures the character that separates the fields in each line.
Apache Parquet is a column-oriented, file-based data storage format. The internal representation splits original rows into chunks of columns that are stored using compression techniques. As a consequence, queries that require specific columns do not need to read the complete file. Instead, the relevant pieces can be independently addressed and retrieved. Parquet supports complex nested data structures and preserves the schema structure of the data. Due to its enhanced query capabilities, efficient use of storage space and the preservation of schema information, Parquet is a popular format for storing large, complex datasets.
To create a streaming source from Parquet files, it is sufficient to provide the schema of the data and the directory location. The schema provided during the streaming declaration is fixed for the duration of the streaming source definition.
Example 10-4 shows the creation of a Parquet-based File source from a folder in hdfs://data/folder
using the provided schema.
// Use format and load path
val
fileStream
=
spark
.
readStream
.
schema
(
schema
)
.
parquet
(
"hdfs://data/folder"
)
The text format for the File source supports the ingestion of plain-text files.
Using configuration options, it’s possible to ingest the text either line by line or the entire file as a single text blob.
The schema of the data produced by this source is naturally StringType
and does not need to be specified.
This is a generic format that we can use to ingest arbitrary text for further programmatic processing, from the famous word count to custom parsing of proprietary text formats.
Next to the common options for the File source that we saw in “Common Options”, the text file format supports reading text files as a whole using the wholetext
option:
wholetext
(default false
)If true, read the complete file as a single text blob. Otherwise, split the text into lines using the standard line separators (
,
,
) and consider each line a record.
The text format supports two API alternatives:
text
Returns a dynamically typed DataFrame
with a single value
field of type StringType
textFile
Returns a statically typed Dataset[String]
We can use the text
format specification as a terminating method call or as a format
option.
To obtain a statically typed Dataset
, we must use textFile
as the last call of the stream builder call.
The examples in Example 10-5 illustrate the specific API usage.
// Text specified as format
>
val
fileStream
=
spark
.
readStream
.
format
(
"text"
).
load
(
"hdfs://data/folder"
)
fileStream
:
org.apache.spark.sql.DataFrame
=
[
value:
string
]
// Text specified through dedicated method
>
val
fileStream
=
spark
.
readStream
.
text
(
"hdfs://data/folder"
)
fileStream
:
org.apache.spark.sql.DataFrame
=
[
value:
string
]
// TextFile specified through dedicated method
val
fileStream
=
spark
.
readStream
.
textFile
(
"/tmp/data/stream"
)
fileStream
:
org.apache.spark.sql.Dataset
[
String
]
=
[
value:
string
]
Apache Kafka is a Publish/Subscribe (pub/sub) system based on the concept of a distributed log. Kafka is highly scalable and offers high-throughput, low-latency handling of the data at the consumer and producer sides. In Kafka, the unit of organization is a topic. Publishers send data to a topic and subscribers receive data from the topic to which they subscribed. This data delivery happens in a reliable way. Apache Kafka has become a popular choice of messaging infrastructure for a wide range of streaming use cases.
The Structured Streaming source for Kafka implements the subscriber role and can consume data published to one or several topics. This is a reliable source. Recall from our discussion in “Understanding Sources”, this means that data delivery semantics are guaranteed even in case of partial or total failure and restart of the streaming process.
To create a Kafka source, we use the format("kafka")
method with the createStream
builder on the Spark Session.
We need two mandatory parameters to connect to Kafka: the addresses of the Kafka brokers, and the topic(s) to which we want to connect.
The address of the Kafka brokers to connect to is provided through the option kafka.bootstrap.servers
as a String
containing a comma-separated list of host:port
pairs.
Example 10-6 presents a simple Kafka source definition.
It subscribes to a single topic, topic1
, by connecting to the brokers located at host1:port1
, host2:port2
, and host3:port3
.
>
val
kafkaStream
=
spark
.
readStream
.
format
(
"kafka"
)
.
option
(
"kafka.bootstrap.servers"
,
"host1:port1,host2:port2,host3:port3"
)
.
option
(
"subscribe"
,
"topic1"
)
.
option
(
"checkpointLocation"
,
"hdfs://spark/streaming/checkpoint"
)
.
load
()
kafkaStream
:
org.apache.spark.sql.DataFrame
=
[
key:
binary
,value:
binary
...
5
more
fields
]
>
kafkaStream
.
printSchema
root
|--
key
:
binary
(
nullable
=
true
)
|--
value
:
binary
(
nullable
=
true
)
|--
topic
:
string
(
nullable
=
true
)
|--
partition
:
integer
(
nullable
=
true
)
|--
offset
:
long
(
nullable
=
true
)
|--
timestamp
:
timestamp
(
nullable
=
true
)
|--
timestampType
:
integer
(
nullable
=
true
)
>
val
dataStream
=
kafkaStream
.
selectExpr
(
"CAST(key AS STRING)"
,
"CAST(value AS STRING)"
)
.
as
[(
String
,String
)]
dataStream
:
org.apache.spark.sql.Dataset
[(
String
,String
)]
=
[
key:
string
,value:
string
]
The result of that call is a DataFrame
with five fields: key
, value
, topic
, partition
, offset
, timestamp
, timestampType
.
Having a schema consisting of these five fields is fixed the Kafka source.
It provides the raw key
and values
from Kafka and the metadata of each consumed record.
Usually, we are interested in only the key
and value
of the message.
Both the key
and the value
contain a binary payload, represented internally as a Byte Array
.
When the data is written to Kafka using a String
serializer, we can read that data back by casting the values to String
as we have done in the last expression in the example.
Although text-based encoding is a common practice, it’s not the most space-efficient way of exchanging data.
Other encodings, such as the schema-aware AVRO format, might offer a better space efficiency with the added benefit of embedding the schema information.
The additional metadata in the message, such as topic
, partition
, or offset
can be used in more complex scenarios. For example, the topic field will contain the topic that produced the record and could be used as a label or discriminator, in case we subscribe to several topics at the same time.
There are three different ways to specify the topic or topics that we want to consume:
subscribe
subscribePattern
assign
The Kafka source setup must contain one and only one of these subscription options. They provide different levels of flexibility to select the topic(s) and even the partitions to subscribe to:
subscribe
Takes a single topic or a list of comma-separated topics: topic1, topic2, ..., topicn
. This method subscribes to each topic and creates a single, unified stream with the data of the union of all the topics; for example, .option("subscribe", "topic1,topic3")
.
subscribePattern
This is similar to subscribe
in behavior, but the topics are specified with a regular expression pattern.
For example, if we have topics 'factory1Sensors', 'factory2Sensors', 'street1Sensors', 'street2Sensors'
, we can subscribe to all “factory” sensors with the expression .option("subscribePattern", "factory[\d]+Sensors")
.
assign
Allows the fine-grained specification of specific partitions per topic to consume.
This is known as TopicPartition
s in the Kafka API. The partitions per topic are indicated using a JSON object, in which each key is a topic and its value is an array of partitions. For example, the option definition .option("assign", """{"sensors":[0,1,3]}""")
would subscribe to the partitions 0, 1, and 3 of topic sensors.
To use this method we need information about the topic partitioning.
We can obtain the partition information programmatically by using the Kafka API or through configuration.
There are two categories of configuration options for the Structured Streaming source for Kafka: dedicated source configuration, and pass-through options that are given directly to the underlying Kafka consumer.
The following options configure the behavior of the Kafka source. They relate in particular to how offsets are consumed:
startingOffsets
(default: latest
)Accepted values are earliest
, latest
, or a JSON object representing an association between topics, their partitions, and a given offset.
Actual offset values are always positive numbers.
There are two special offset values: -2
denotes earliest
and -1
means latest
; for example, """ {"topic1": { "0": -1, "1": -2, "2":1024 }} """
startingOffsets
are only used the first time a query is started. All subsequent restarts will use the checkpoint information stored. To restart a streaming job from a specific offset, we need to remove the contents of the checkpoint.
failOnDataLoss
(default: true
)This flag indicates whether to fail the restart of a streaming query in case data might be lost. This is usually when offsets are out of range, topics are deleted, or topics are rebalanced.
We recommend setting this option to false
during the develop/test cycle because stop/restart of the query side with a continuous producer will often trigger a failure.
Set this back to true
for production deployment.
kafkaConsumer.pollTimeoutMs
(default: 512
)The poll timeout (in milliseconds) to wait for data from Kafka in the distributed consumers running on the Spark executors.
fetchOffset.numRetries
(default: 3
)The number of retries before failing the fetch of Kafka offsets.
fetchOffset.retryIntervalMs
(default: 10
)The delay between offset fetch retries in milliseconds.
maxOffsetsPerTrigger
(default: not set)This option allows us to set a rate limit to the number of total records to be consumed at each query trigger. The limit configured will be equally distributed among the set of partitions of the subscribed topics.
It’s possible to pass configuration options through to the underlying Kafka consumer of this source.
We do this by adding a 'kafka.'
prefix to the configuration key that we want to set.
For example, to configure Transport Layer Security (TLS) options for the Kafka source, we can set the Kafka consumer configuration option security.protocol
by setting kafka.security.protocol
in the source configuration.
Example 10-7 demonstrates how to configure TLS for the Kafka source using this method.
val
tlsKafkaSource
=
spark
.
readStream
.
format
(
"kafka"
)
.
option
(
"kafka.bootstrap.servers"
,
"host1:port1, host2:port2"
)
.
option
(
"subscribe"
,
"topsecret"
)
.
option
(
"kafka.security.protocol"
,
"SSL"
)
.
option
(
"kafka.ssl.truststore.location"
,
"/path/to/truststore.jks"
)
.
option
(
"kafka.ssl.truststore.password"
,
"truststore-password"
)
.
option
(
"kafka.ssl.keystore.location"
,
"/path/to/keystore.jks"
)
.
option
(
"kafka.ssl.keystore.password"
,
"keystore-password"
)
.
option
(
"kafka.ssl.key.password"
,
"password"
)
.
load
()
For an exhaustive listing of the Kafka consumer configuration options, refer to the official Kafka documentation.
Not all options from the standard consumer configuration can be used because they conflict with the internal process of the source, which is controlled with the settings we saw in “Kafka source-specific options”.
These options are prohibited, as shown in Table 10-1.
This means that attempting to use any of them will result in an IllegalArgumentException
.
option | Reason | Alternative |
---|---|---|
|
Offsets are managed within Structured Streaming |
use |
|
Offsets are managed within Structured Streaming |
|
|
A unique group ID is managed internally per query |
|
|
Payload is always represented as a |
Deserialization into specific formats is done programmatically |
|
Payload is always represented as a |
Deserialization into specific formats is done programmatically |
|
A consumer interceptor might break the internal data representation |
The Transmission Control Protocol (or TCP) is a connection-oriented protocol that enables bidirectional communication between a client and a server. This protocol underpins many of the higher-level communication protocols over the internet, such as FTP, HTTP, MQTT, and many others. Although application layer protocols like HTTP add additional semantics on top of a TCP connection, there are many applications that offer a vanilla, text-based TCP connection over a UNIX socket to deliver data.
The Socket source is a TCP socket client able to connect to a TCP server that offers a UTF-8 encoded text-based data stream.
It connects to a TCP server using a host
and port
provided as mandatory options
.
To connect to a TCP server, we need the address of the host and a port number. It’s also possible to configure the Socket source to add the timestamp at which each line of data is received.
These are the configuration options:
host
(mandatory)The DNS hostname or IP address of the TCP server to which to connect.
port
(mandatory)The port number of the TCP server to which to connect.
includeTimestamp
(default: false
)When enabled, the Socket source adds the timestamp of arrival to each line of data. It also changes the schema produced by this source, adding the timestamp
as an additional field.
In Example 10-8, we observe the two modes of operation that this source offers.
With the host
, port
configuration, the resulting streaming DataFrame
has a single field named value
of type String
.
When we add the flag includeTimestamp
set to true
, the schema of the resulting streaming DataFrame
contains the fields value
and timestamp
, where value
is of type String
as before, and timestamp
is of type Timestamp
.
Also, observe the log warning this source prints at creation.
// Only using host and port
>
val
stream
=
spark
.
readStream
.
format
(
"socket"
)
.
option
(
"host"
,
"localhost"
)
.
option
(
"port"
,
9876
)
.
load
()
18
/
04
/
14
17
:
02
:
34
WARN
TextSocketSourceProvider:
The
socket
source
should
not
be
used
for
production
applications
!
It
does
not
support
recovery
.
stream
:
org.apache.spark.sql.DataFrame
=
[
value:
string
]
// With added timestamp information
val
stream
=
spark
.
readStream
.
format
(
"socket"
)
.
option
(
"host"
,
"localhost"
)
.
option
(
"port"
,
9876
)
.
option
(
"includeTimestamp"
,
true
)
.
load
()
18
/
04
/
14
17
:
06
:
47
WARN
TextSocketSourceProvider:
The
socket
source
should
not
be
used
for
production
applications
!
It
does
not
support
recovery
.
stream
:
org.apache.spark.sql.DataFrame
=
[
value:
string
,timestamp:
timestamp
]
The Socket source creates a TCP client that connects to a TCP server specified in the configuration. This client runs on the Spark driver. It keeps the incoming data in memory until it is consumed by the query and the corresponding offset is committed. The data from committed offsets is evicted, keeping the memory usage stable under normal circumstances.
Recall from the discussion in “Understanding Sources”, a source is considered reliable if it can replay an uncommitted offset even in the event of failure and restart of the streaming process. This source is not considered to be reliable, because a failure of the Spark driver will result in losing all uncommitted data in memory.
This source should be used only for cases in which data loss is acceptable for the use case.
A common architectural alternative to directly connecting to a TCP server using the Socket source is to use Kafka as a reliable intermediate storage. A robust microservice can be used to bridge the TCP server and Kafka. This microservice collects the data from the TCP server and delivers it atomically to Kafka. Then, we can use the reliable Kafka source to consume the data and further process it in Structured Streaming.
The Rate source is an internal stream generator that produces a sequence of records at a configurable frequency, given in records/second
.
The output is a stream of records (timestamp, value)
where timestamp
corresponds to the moment of generation of the record, and value
is an ever-increasing counter:
>
val
stream
=
spark
.
readStream
.
format
(
"rate"
).
load
()
stream
:
org.apache.spark.sql.DataFrame
=
[
timestamp:
timestamp
,value:
bigint
]
This is intended for benchmarks and for exploring Structured Streaming, given that it does not rely on external systems to work. As we can appreciate in the previous example, it is very easy to create and completely self-contained.
The code in Example 10-9 creates a rate stream of 100 rows per second with a ramp-up time of 60 seconds.
The schema of the resulting DataFrame
contains two fields: timestamp
of type Timestamp
, and value
, which is a BigInt
at schema level, and a Long
in the internal representation.
>
val
stream
=
spark
.
readStream
.
format
(
"rate"
)
.
option
(
"rowsPerSecond"
,
100
)
.
option
(
"rampUpTime"
,
60
)
.
load
()
stream
:
org.apache.spark.sql.DataFrame
=
[
timestamp:
timestamp
,value:
bigint
]
The Rate source supports a few options that control the throughput and level of parallelism:
rowsPerSecond
(default: 1) The number of rows to generate each second.
rampUpTime
(default: 0) At the start of the stream, the generation of records will increase progressively until this time has been reached. The increase is linear.
numPartitions
(default: default spark parallelism level)The number of partitions to generate. More partitions increase the parallelism level of the record generation and downstream query processing.