Chapter 10. Structured Streaming Sources

The previous chapters provided a good overview of the Structured Streaming programming model and how you can apply it in a practical way. You also saw how sources are the starting point of each Structured Streaming program. In this chapter, we study the general characteristics of a source and review the available sources in greater detail, including their different configuration options and modes of operation.

Understanding Sources

In Structured Streaming, a source is an abstraction that represents streaming data providers. The concept behind the source interface is that streaming data is a continuous flow of events over time that can be seen as a sequence, indexed with a monotonously incrementing counter.

Figure 10-1 illustrates how each event in the stream is considered to have an ever-increasing offset.

spas 1001
Figure 10-1. A stream seen as an indexed sequence of events

Offsets, as shown in Figure 10-2, are used to request data from the external source and to indicate what data has already been consumed. Structured Streaming knows when there is data to process by asking the current offset from the external system and comparing it to the last processed offset. The data to be processed is requested by getting a batch between two offsets start and end. The source is informed that data has been processed by committing a given offset. The source contract guarantees that all data with an offset less than or equal to the committed offset has been processed and that subsequent requests will stipulate only offsets greater than that committed offset. Given these guarantees, sources might opt to discard the processed data to free up system resources.

spas 1002
Figure 10-2. Offset process sequence

Let’s take a closer look at the dynamics of the offset-based processing shown in Figure 10-2:

  1. At t1, the system calls getOffset and obtains the current offset for the source.

  2. At t2, the system obtains the batch up to the last known offset by calling getBatch(start, end). Note that new data might have arrived in the meantime.

  3. At t3, the system commits the offset and the source drops the corresponding records.

This process repeats constantly, ensuring the acquisition of streaming data. To recover from eventual failure, offsets are often checkpointed to external storage.

Besides the offset-based interaction, sources must fulfill two requirements: to be reliable, sources must be replayable in the same order; and sources must provide a schema.

Reliable Sources Must Be Replayable

In Structured Streaming, replayability is the capacity to request a part of the stream that had been already requested but not committed yet. Just like we can rewind that Netflix series we are watching to see a piece we just missed because of a distraction, sources must offer the capability to replay a piece of the stream that was already requested but not committed. This is done by calling getBatch with the offset range that we want to receive again.

A source is considered to be reliable when it can produce an uncommitted offset range even after a total failure of the Structured Streaming process. In this failure recovery process, offsets are restored from their last known checkpoint and are requested again from the source. This requires the actual streaming system that is backing the source implementation to store data safely outside the streaming process. By requiring replayability from the source, Structured Streaming delegates recovery responsibility to the source. This implies that only reliable sources work with Structured Streaming to create strong end-to-end delivery guarantees.

Sources Must Provide a Schema

A defining characteristic of the structured APIs of Spark is that they rely on schema information to handle the data at different levels. As opposed to processing opaque Strings or Byte Array blobs, schema information provides insights on how the data is shaped in terms of fields and types. We can use schema information to drive optimizations at different levels in the stack, from query planning to the internal binary representation of data, storage, and access to it.

Sources must provide schema information that describes the data they produce. Some source implementations allow this schema to be configured and use this configuration information to automatically parse incoming data and transform it into valid records. In fact, many file-based streaming sources such as JSON or comma-separated values (CSV) files follow this model, in which the user must provide the schema used by the file format to ensure proper parsing. Some other sources use a fixed internal schema that expresses the metadata information of every record and leaves the parsing of the payload to the application.

From an architectural perspective, creating schema-driven streaming applications is desirable because it facilitates the global understanding of how data flows through the system and drives the formalization of the different stages of a multiprocess streaming pipeline.

Defining schemas

In Structured Streaming, we reuse the Spark SQL API for creating schema definitions. There are several different methods that we can use to define the schema that defines the content of the stream—programmatically, inferred from a case class definition, or loaded from an existing dataset:

Programmatically

We use the StructType and StructField classes to build up a representation of the schema. For example, to represent a tracked vehicle with id, type, and location coordinates, we can construct the corresponding schema structure as follows:

import org.apache.spark.sql.{StructType, StructField}_
import org.apache.spark.sql.types._

val schema = StructType(
  List(
    StructField("id", StringType, true),
    StructField("type", StringType, true),
    StructField("location", StructType(List(
        StructField("latitude", DoubleType, false),
        StructField("longitude", DoubleType, false)
        )), false)
    )
  )

StructField can contain nested StructTypes, making it possible to create schemas of arbitrary depth and complexity.

By inference

In Scala, the schema also can be represented using arbitrary combinations of case classes. Given a single case class or a case class hierarchy, the schema representation can be computed by creating an Encoder for the case class and obtaining the schema from that encoder instance.

Using this method, the same schema definition used in the preceding example can be obtained like so:

import org.apache.spark.sql.Encoders

// Define the case class hierarchy
case class Coordinates(latitude: Double, longitude: Double)
case class Vehicle(id: String, `type`: String, location: Coordinates )
// Obtain the Encoder, and the schema from the Encoder
val schema = Encoders.product[Vehicle].schema
Extract from a dataset

A practical method of obtaining a schema definition is by maintaining a sample data file in a schema-aware format, such as Parquet. To obtain our schema definition, we load the sample dataset and get the schema definition from the loaded DataFrame:

val sample = spark.read.parquet(<path-to-sample>)
val schema = sample.schema

The programmatic way of defining schemas is powerful but requires effort and is complex to maintain, often leading to errors. Loading a dataset might be practical at the prototyping stage, but it requires keeping the sample dataset up-to-date, which in some cases can lead to accidental complexity.

Although the best method to choose might be different from one use case to the other, in general, when working with Scala, we prefer to use the inference method, when possible.

Available Sources

The following are the sources currently available in the Spark distribution of Structured Streaming:

File

Allows the ingestion of data stored as files. In most cases, the data is transformed in records that are further processed in streaming mode. This supports these formats: JSON, CSV, Parquet, ORC, and plain text.

Kafka

Allows the consumption of streaming data from Apache Kafka.

Socket

A TCP socket client able to connect to a TCP server and consume a text-based data stream. The stream must be encoded in the UTF-8 character set.

Rate

Produces an internally generated stream of (timestamp, value) records with a configurable production rate. This is normally used for learning and testing purposes.

As we discussed in “Understanding Sources”, sources are considered reliable when they provide replay capabilities from an offset, even when the structured streaming process fails. Using this criterion, we can classify the available sources as follows:

Reliable

File source, Kafka source

Unreliable

Socket source, Rate source

The unreliable sources may be used in a production system only when the loss of data can be tolerated.

Warning

The streaming source API is currently undergoing evolution. As of this writing, there is no stable public API to develop custom sources. This is expected to change in the near future.

In the next part of this chapter, we explore in detail the sources currently available. As production-ready sources, the File and the Kafka sources have many options that we discuss in detail. The Socket and the Rate source are limited in features, which will be evident by their concise coverage.

The File Source

The File source is a simple streaming data source that reads files from a monitored directory in a filesystem. A file-based handover is a commonly used method to bridge a batch-based process with a streaming system. The batch process produces its output in a file format and drops it in a common directory where a suitable implementation of the File source can pick these files up and transform their contents into a stream of records for further processing in streaming mode.

Specifying a File Format

The files are read using a specified format, which is provided with the .format(<format_name>) method in the readStream builder, or by using the dedicated methods in the DataStreamReader that indicate the format to use; for example, readStream.parquet('/path/to/dir/'). When using the dedicated methods corresponding to each supported format, the method call should be done as the last call of the builder.

For example, the three forms in Example 10-1 are equivalent.

Example 10-1. Building a FileStream
// Use format and load path
val fileStream = spark.readStream
  .format("parquet")
  .schema(schema)
  .load("hdfs://data/exchange")

// Use format and path options
val fileStream = spark.readStream
  .format("parquet")
  .option("path", "hdfs://data/exchange")
  .schema(schema)
  .load()

// Use dedicated method
val fileStream = spark.readStream
  .schema(schema)
  .parquet("hdfs://data/exchange")

As of Spark v2.3.0, the following file-based formats are supported by Structured Streaming. These are the same file formats supported by the static DataFrame, Dataset, and SQL APIs:

  • CSV

  • JSON

  • Parquet

  • ORC

  • text

  • textFile

Common Options

Regardless of the specific format, the general functionality of the File source is to monitor a directory in a shared filesystem identified by its specific URL. All file formats support a common set of options that control the file inflow and define the aging criteria of the files.

Warning

As Apache Spark is a fast-evolving project, APIs and their options might change in future versions. Also, in this section, we cover only the most relevant options that apply to streaming workloads. For the most up-to-date information, always check the API documentation corresponding to your Spark version.

These options can be set for all file-based sources:

maxFilesPerTrigger (Default: unset)

Indicates how many files will be consumed at each query trigger. This setting limits the number of files processed at each trigger, and in doing so, it helps to control the data inflow in the system.

latestFirst (Default: false)

When this flag is set to true, newer files are elected for processing first. Use this option when the most recent data has higher priority over older data.

maxFileAge (Default: 7 days)

Defines an age threshold for the files in the directory. Files older than the threshold will not be eligible for processing and will be effectively ignored. This threshold is relative to the most recent file in the directory and not to the system clock. For example, if maxFileAge is 2 days and the most recent file is from yesterday, the threshold to consider a file too old will be older than three days ago. This dynamic is similar to watermarks on event time.

fileNameOnly (Default: false)

When set to true, two files will be considered the same if they have the same name; otherwise, the full path will be considered.

Note

When latestFirst is set to true and the maxFilesPerTrigger option is configured, maxFileAge is ignored because there might be a condition in which files that are valid for processing become older than the threshold because the system gives priority to the most recent files found. In such cases, no aging policy can be set.

Common Text Parsing Options (CSV, JSON)

Some file formats, such as CSV and JSON, use a configurable parser to transform the text data in each file into structured records. It’s possible for upstream processes to create records that do not fulfill the expected format. These records are considered corrupted.

Streaming systems are characterized by their continuous running. A streaming process should not fail when bad data is received. Depending on the business requirements, we can either drop the invalid records or route the data considered corrupted to a separate error-handling flow.

Handing parsing errors

The following options allow for the configuration of the parser behavior to handle those records that are considered corrupted:

mode (default PERMISSIVE)

Controls the way that corrupted records are handled during parsing. Allowed values are PERMISSIVE, DROPMALFORMED, and FAILFAST.

  • PERMISSIVE: The value of the corrupted record is inserted in a special field configured by the option columnNameOfCorruptRecord that must exist in the schema. All other fields are set to null. If the field does not exist, the record is dropped (same behavior as DROPMALFORMED).

  • DROPMALFORMED: Corrupted records are dropped.

  • FAILFAST: An exception is thrown when a corrupted record is found. This method is not recommended in a streaming process, because the propagation of the exception will potentially make the streaming process fail and stop.

columnNameOfCorruptRecord (default: “_corrupt_record”)

Permits the configuration of the special field that contains the string value of malformed records. This field can also be configured by setting spark.sql.columnNameOfCorruptRecord in the Spark configuration. If both spark.sql.columnNameOfCorruptRecord and this option are set, this option takes precedence.

Schema inference

inferSchema (default: false)

Schema inference is not supported. Setting this option is ignored. Providing a schema is mandatory.

Date and time formats

dateFormat (default: "yyyy-MM-dd")

Configures the pattern used to parse date fields. Custom patterns follow the formats defined at java.text.SimpleDateFormat.

timestampFormat (default: "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")

Configures the pattern used to parse timestamp fields. Custom patterns follow the formats defined at java.text.SimpleDateFormat.

JSON File Source Format

The JSON format support for the File source lets us consume text files encoded as JSON, in which each line in the file is expected to be a valid JSON object. The JSON records are parsed using the provided schema. Records that do not follow the schema are considered invalid, and there are several options available to control the handling of invalid records.

JSON parsing options

By default, the JSON File source expects the file contents to follow the JSON Lines specification. That is, each independent line in the file corresponds to a valid JSON document that complies with the specified schema. Each line should be separated by a newline ( ) character. A CRLF character ( ) is also supported because trailing white spaces are ignored.

We can tweak the tolerance of the JSON parser to process data that does not fully comply with the standard. It’s also possible to change the behavior to handle those records that are considered corrupted. The following options allow for the configuration of the parser behavior:

allowComments (default: false)

When enabled, comments in Java/C++ style are allowed in the file and the corresponding line will be ignored; for example:

// Timestamps are in ISO 8601 compliant format
{"id":"x097abba", "timestamp": "2018-04-01T16:32:56+00:00"}
{"id":"x053ba0bab", "timestamp": "2018-04-01T16:35:02+00:00"}

Otherwise, comments in the JSON file are considered corrupted records and handled following the mode setting.

allowNumericLeadingZeros (default: false)

When enabled, leading zeros in numbers are allowed (e.g., 00314). Otherwise, the leading zeros are considered invalid numeric values, the corresponding record is deemed corrupted, and it is handled following the mode setting.

allowSingleQuotes (default: true)

Allows the use of single quotes to demark fields. When enabled, both single quotes and double quotes are allowed. Regardless of this setting, quote characters cannot be nested and must be properly escaped when used within a value; for example:

// valid record
{"firstname":"Alice", 'lastname': 'Wonderland'}
// invalid nesting
{"firstname":'Elvis "The King"', 'lastname': 'Presley'}
// correct escaping
{"firstname":'Elvis "The King"', 'lastname': 'Presley'}
allowUnquotedFieldNames (default: false)

Allows unquoted JSON field names (e.g., {firstname:"Alice"}). Note that it’s not possible to have spaces in field names when using this option (e.g., {first name:"Alice"} is considered corrupted even when the field name matches the schema). Use with caution.

multiLine (default: false)

When enabled, instead of parsing JSON Lines, the parser will consider the contents of each file as a single valid JSON document and will attempt to parse its contents as records following the defined schema.

Use this option when the producer of the file can output only complete JSON documents as files. In such cases, use a top-level array to group the records, as shown in Example 10-2.

Example 10-2. Using a top-level array to group records
[
  {"firstname":"Alice", "last name":"Wonderland", "age": 7},
  {"firstname":"Coraline", "last name":"Spin"   , "age":15}
]
primitivesAsString (default false)

When enabled, primitive value types are considered strings. This allows you to read documents having fields of mixed types, but all values are read as a String.

In Example 10-3 the resulting age field is of type String, containing values age="15" for “Coraline” and age="unknown" for “Diana”.

Example 10-3. primitivesAsString usage
{"firstname":"Coraline", "last name":"Spin", "age": 15}
{"firstname":"Diana", "last name":"Prince", "age": "unknown"}

CSV File Source Format

CSV is a popular tabular data storage and exchange format that is widely supported by enterprise applications. The File Source CSV format support allows us to ingest and process the output of such applications in a Structured Streaming application. Although the name “CSV” originally indicated that values are separated by commas, often the separation character can be freely configured. There are many configuration options available to control the way data is transformed from plain text to structured records.

In the rest of this section, we cover the most common options and, in particular, those that are relevant for the streaming process. For formatting-related options, refer to the latest documentation. These are the most commonly used CSV parsing options:

CSV parsing options

comment (default: “” [disabled])

Configures a character that marks lines considered as comments; for example, when using option("comment","#"), we can parse the following CSV with a comment in it:

#Timestamps are in ISO 8601 compliant format
x097abba, 2018-04-01T16:32:56+00:00, 55
x053ba0bab, 2018-04-01T16:35:02+00:00, 32
header (default: false)

Given that a schema must be provided, the header line is ignored and has no effect.

multiline (default: false)

Consider each file as one record spanning all of the lines in the file.

quote (default: " [double quote])

Configures the character used to enclose values that contain a column separator.

sep (default: , [comma])

Configures the character that separates the fields in each line.

Parquet File Source Format

Apache Parquet is a column-oriented, file-based data storage format. The internal representation splits original rows into chunks of columns that are stored using compression techniques. As a consequence, queries that require specific columns do not need to read the complete file. Instead, the relevant pieces can be independently addressed and retrieved. Parquet supports complex nested data structures and preserves the schema structure of the data. Due to its enhanced query capabilities, efficient use of storage space and the preservation of schema information, Parquet is a popular format for storing large, complex datasets.

Schema definition

To create a streaming source from Parquet files, it is sufficient to provide the schema of the data and the directory location. The schema provided during the streaming declaration is fixed for the duration of the streaming source definition.

Example 10-4 shows the creation of a Parquet-based File source from a folder in hdfs://data/folder using the provided schema.

Example 10-4. Building a Parquet source example
// Use format and load path
val fileStream = spark.readStream
  .schema(schema)
  .parquet("hdfs://data/folder")

Text File Source Format

The text format for the File source supports the ingestion of plain-text files. Using configuration options, it’s possible to ingest the text either line by line or the entire file as a single text blob. The schema of the data produced by this source is naturally StringType and does not need to be specified. This is a generic format that we can use to ingest arbitrary text for further programmatic processing, from the famous word count to custom parsing of proprietary text formats.

Text ingestion options

Next to the common options for the File source that we saw in “Common Options”, the text file format supports reading text files as a whole using the wholetext option:

wholetext (default false)

If true, read the complete file as a single text blob. Otherwise, split the text into lines using the standard line separators ( , , ) and consider each line a record.

text and textFile

The text format supports two API alternatives:

text

Returns a dynamically typed DataFrame with a single value field of type StringType

textFile

Returns a statically typed Dataset[String]

We can use the text format specification as a terminating method call or as a format option. To obtain a statically typed Dataset, we must use textFile as the last call of the stream builder call. The examples in Example 10-5 illustrate the specific API usage.

Example 10-5. Text format API usage
// Text specified as format
>val fileStream = spark.readStream.format("text").load("hdfs://data/folder")
fileStream: org.apache.spark.sql.DataFrame = [value: string]

// Text specified through dedicated method
>val fileStream = spark.readStream.text("hdfs://data/folder")
fileStream: org.apache.spark.sql.DataFrame = [value: string]

// TextFile specified through dedicated method
val fileStream = spark.readStream.textFile("/tmp/data/stream")
fileStream: org.apache.spark.sql.Dataset[String] = [value: string]

The Kafka Source

Apache Kafka is a Publish/Subscribe (pub/sub) system based on the concept of a distributed log. Kafka is highly scalable and offers high-throughput, low-latency handling of the data at the consumer and producer sides. In Kafka, the unit of organization is a topic. Publishers send data to a topic and subscribers receive data from the topic to which they subscribed. This data delivery happens in a reliable way. Apache Kafka has become a popular choice of messaging infrastructure for a wide range of streaming use cases.

The Structured Streaming source for Kafka implements the subscriber role and can consume data published to one or several topics. This is a reliable source. Recall from our discussion in “Understanding Sources”, this means that data delivery semantics are guaranteed even in case of partial or total failure and restart of the streaming process.

Setting Up a Kafka Source

To create a Kafka source, we use the format("kafka") method with the createStream builder on the Spark Session. We need two mandatory parameters to connect to Kafka: the addresses of the Kafka brokers, and the topic(s) to which we want to connect.

The address of the Kafka brokers to connect to is provided through the option kafka.bootstrap.servers as a String containing a comma-separated list of host:port pairs.

Example 10-6 presents a simple Kafka source definition. It subscribes to a single topic, topic1, by connecting to the brokers located at host1:port1, host2:port2, and host3:port3.

Example 10-6. Creating a Kafka source
>val kafkaStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2,host3:port3")
  .option("subscribe", "topic1")
  .option("checkpointLocation", "hdfs://spark/streaming/checkpoint")
  .load()

kafkaStream: org.apache.spark.sql.DataFrame =
  [key: binary, value: binary ... 5 more fields]

>kafkaStream.printSchema

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

>val dataStream = kafkaStream.selectExpr("CAST(key AS STRING)",
                                         "CAST(value AS STRING)")
                             .as[(String, String)]

dataStream: org.apache.spark.sql.Dataset[(String, String)] =
  [key: string, value: string]

The result of that call is a DataFrame with five fields: key, value, topic, partition, offset, timestamp, timestampType. Having a schema consisting of these five fields is fixed the Kafka source. It provides the raw key and values from Kafka and the metadata of each consumed record.

Usually, we are interested in only the key and value of the message. Both the key and the value contain a binary payload, represented internally as a Byte Array. When the data is written to Kafka using a String serializer, we can read that data back by casting the values to String as we have done in the last expression in the example. Although text-based encoding is a common practice, it’s not the most space-efficient way of exchanging data. Other encodings, such as the schema-aware AVRO format, might offer a better space efficiency with the added benefit of embedding the schema information.

The additional metadata in the message, such as topic, partition, or offset can be used in more complex scenarios. For example, the topic field will contain the topic that produced the record and could be used as a label or discriminator, in case we subscribe to several topics at the same time.

Selecting a Topic Subscription Method

There are three different ways to specify the topic or topics that we want to consume:

  • subscribe

  • subscribePattern

  • assign

The Kafka source setup must contain one and only one of these subscription options. They provide different levels of flexibility to select the topic(s) and even the partitions to subscribe to:

subscribe

Takes a single topic or a list of comma-separated topics: topic1, topic2, ..., topicn. This method subscribes to each topic and creates a single, unified stream with the data of the union of all the topics; for example, .option("subscribe", "topic1,topic3").

subscribePattern

This is similar to subscribe in behavior, but the topics are specified with a regular expression pattern. For example, if we have topics 'factory1Sensors', 'factory2Sensors', 'street1Sensors', 'street2Sensors', we can subscribe to all “factory” sensors with the expression .option("subscribePattern", "factory[\d]+Sensors").

assign

Allows the fine-grained specification of specific partitions per topic to consume. This is known as TopicPartitions in the Kafka API. The partitions per topic are indicated using a JSON object, in which each key is a topic and its value is an array of partitions. For example, the option definition .option("assign", """{"sensors":[0,1,3]}""") would subscribe to the partitions 0, 1, and 3 of topic sensors. To use this method we need information about the topic partitioning. We can obtain the partition information programmatically by using the Kafka API or through configuration.

Configuring Kafka Source Options

There are two categories of configuration options for the Structured Streaming source for Kafka: dedicated source configuration, and pass-through options that are given directly to the underlying Kafka consumer.

Kafka source-specific options

The following options configure the behavior of the Kafka source. They relate in particular to how offsets are consumed:

startingOffsets (default: latest)

Accepted values are earliest, latest, or a JSON object representing an association between topics, their partitions, and a given offset. Actual offset values are always positive numbers. There are two special offset values: -2 denotes earliest and -1 means latest; for example, """ {"topic1": { "0": -1, "1": -2, "2":1024 }} """

startingOffsets are only used the first time a query is started. All subsequent restarts will use the checkpoint information stored. To restart a streaming job from a specific offset, we need to remove the contents of the checkpoint.

failOnDataLoss (default: true)

This flag indicates whether to fail the restart of a streaming query in case data might be lost. This is usually when offsets are out of range, topics are deleted, or topics are rebalanced. We recommend setting this option to false during the develop/test cycle because stop/restart of the query side with a continuous producer will often trigger a failure. Set this back to true for production deployment.

kafkaConsumer.pollTimeoutMs (default: 512)

The poll timeout (in milliseconds) to wait for data from Kafka in the distributed consumers running on the Spark executors.

fetchOffset.numRetries (default: 3)

The number of retries before failing the fetch of Kafka offsets.

fetchOffset.retryIntervalMs (default: 10)

The delay between offset fetch retries in milliseconds.

maxOffsetsPerTrigger (default: not set)

This option allows us to set a rate limit to the number of total records to be consumed at each query trigger. The limit configured will be equally distributed among the set of partitions of the subscribed topics.

Kafka Consumer Options

It’s possible to pass configuration options through to the underlying Kafka consumer of this source. We do this by adding a 'kafka.' prefix to the configuration key that we want to set.

For example, to configure Transport Layer Security (TLS) options for the Kafka source, we can set the Kafka consumer configuration option security.protocol by setting kafka.security.protocol in the source configuration.

Example 10-7 demonstrates how to configure TLS for the Kafka source using this method.

Example 10-7. Kafka source TLS configuration example
val tlsKafkaSource = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1, host2:port2")
  .option("subscribe", "topsecret")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
  .option("kafka.ssl.truststore.password", "truststore-password")
  .option("kafka.ssl.keystore.location", "/path/to/keystore.jks")
  .option("kafka.ssl.keystore.password", "keystore-password")
  .option("kafka.ssl.key.password", "password")
  .load()
Note

For an exhaustive listing of the Kafka consumer configuration options, refer to the official Kafka documentation.

Banned configuration options

Not all options from the standard consumer configuration can be used because they conflict with the internal process of the source, which is controlled with the settings we saw in “Kafka source-specific options”.

These options are prohibited, as shown in Table 10-1. This means that attempting to use any of them will result in an IllegalArgumentException.

Table 10-1. Banned Kafka options
option Reason Alternative

auto.offset.reset

Offsets are managed within Structured Streaming

use startingOffsets instead

enable.auto.commit

Offsets are managed within Structured Streaming

group.id

A unique group ID is managed internally per query

key.deserializer

Payload is always represented as a Byte Array

Deserialization into specific formats is done programmatically

value.deserializer

Payload is always represented as a Byte Array

Deserialization into specific formats is done programmatically

interceptor.classes

A consumer interceptor might break the internal data representation

The Socket Source

The Transmission Control Protocol (or TCP) is a connection-oriented protocol that enables bidirectional communication between a client and a server. This protocol underpins many of the higher-level communication protocols over the internet, such as FTP, HTTP, MQTT, and many others. Although application layer protocols like HTTP add additional semantics on top of a TCP connection, there are many applications that offer a vanilla, text-based TCP connection over a UNIX socket to deliver data.

The Socket source is a TCP socket client able to connect to a TCP server that offers a UTF-8 encoded text-based data stream. It connects to a TCP server using a host and port provided as mandatory options.

Configuration

To connect to a TCP server, we need the address of the host and a port number. It’s also possible to configure the Socket source to add the timestamp at which each line of data is received.

These are the configuration options:

host (mandatory)

The DNS hostname or IP address of the TCP server to which to connect.

port (mandatory)

The port number of the TCP server to which to connect.

includeTimestamp (default: false)

When enabled, the Socket source adds the timestamp of arrival to each line of data. It also changes the schema produced by this source, adding the timestamp as an additional field.

In Example 10-8, we observe the two modes of operation that this source offers. With the host, port configuration, the resulting streaming DataFrame has a single field named value of type String. When we add the flag includeTimestamp set to true, the schema of the resulting streaming DataFrame contains the fields value and timestamp, where value is of type String as before, and timestamp is of type Timestamp. Also, observe the log warning this source prints at creation.

Example 10-8. Socket source examples
// Only using host and port

>val stream = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9876)
  .load()

18/04/14 17:02:34 WARN TextSocketSourceProvider:
The socket source should not be used for production applications!
It does not support recovery.

stream: org.apache.spark.sql.DataFrame = [value: string]

// With added timestamp information

val stream = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9876)
  .option("includeTimestamp", true)
  .load()

18/04/14 17:06:47 WARN TextSocketSourceProvider:
The socket source should not be used for production applications!
It does not support recovery.

stream: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]

Operations

The Socket source creates a TCP client that connects to a TCP server specified in the configuration. This client runs on the Spark driver. It keeps the incoming data in memory until it is consumed by the query and the corresponding offset is committed. The data from committed offsets is evicted, keeping the memory usage stable under normal circumstances.

Recall from the discussion in “Understanding Sources”, a source is considered reliable if it can replay an uncommitted offset even in the event of failure and restart of the streaming process. This source is not considered to be reliable, because a failure of the Spark driver will result in losing all uncommitted data in memory.

This source should be used only for cases in which data loss is acceptable for the use case.

Note

A common architectural alternative to directly connecting to a TCP server using the Socket source is to use Kafka as a reliable intermediate storage. A robust microservice can be used to bridge the TCP server and Kafka. This microservice collects the data from the TCP server and delivers it atomically to Kafka. Then, we can use the reliable Kafka source to consume the data and further process it in Structured Streaming.

The Rate Source

The Rate source is an internal stream generator that produces a sequence of records at a configurable frequency, given in records/second. The output is a stream of records (timestamp, value) where timestamp corresponds to the moment of generation of the record, and value is an ever-increasing counter:

> val stream = spark.readStream.format("rate").load()

stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]

This is intended for benchmarks and for exploring Structured Streaming, given that it does not rely on external systems to work. As we can appreciate in the previous example, it is very easy to create and completely self-contained.

The code in Example 10-9 creates a rate stream of 100 rows per second with a ramp-up time of 60 seconds. The schema of the resulting DataFrame contains two fields: timestamp of type Timestamp, and value, which is a BigInt at schema level, and a Long in the internal representation.

Example 10-9. Rate source example
> val stream = spark.readStream.format("rate")
  .option("rowsPerSecond", 100)
  .option("rampUpTime",60)
  .load()
stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]

Options

The Rate source supports a few options that control the throughput and level of parallelism:

rowsPerSecond (default: 1)

The number of rows to generate each second.

rampUpTime (default: 0)

At the start of the stream, the generation of records will increase progressively until this time has been reached. The increase is linear.

numPartitions (default: default spark parallelism level)

The number of partitions to generate. More partitions increase the parallelism level of the record generation and downstream query processing.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset