Chapter 18. The Spark Streaming Execution Model

When we began our Spark Streaming journey in Chapter 16, we discussed how the DStream abstraction embodies the programming and the operational models offered by this streaming API. After learning about the programming model in Chapter 17, we are ready to understand the execution model behind the Spark Streaming runtime.

In this chapter, you learn about the bulk synchronous architecture and how it provides us with a framework to reason about the microbatch streaming model. Then, we explore how Spark Streaming consumes data using the receiver model and the guarantees that this model provides in terms of data-processing reliability. Finally, we examine the direct API as an alternative to receivers for streaming data providers that are able to offer reliable data delivery.

The Bulk-Synchronous Architecture

In Chapter 5 we discussed the bulk-synchronous parallelism or BSP model as a theoretical framework that allows us to reason how distributed stream processing could be done on microbatches of data from a stream.

Spark Streaming follows a processing model similar to bulk-synchronous parallelism:

  • All of the Spark executors on the cluster are assumed to have a synchronous clock; for example, synchronized through a network time protocol (NTP) server.

  • In the case of a receiver-based source, one or several of the executors runs a special Spark job, a receiver. This receiver is tasked with consuming new elements of the Stream. It receives two clock ticks:

    • The most frequent clock tick is called the block interval. It signals when elements received from the stream should be allocated to a block; that is, the portion of the stream that should be processed by a single executor, for this current interval. Each such block becomes a partition of the Resilient Distributed Dataset (RDD) produced at each batch interval.

    • The second and less frequent is the batch interval. It marks when the receiver should assemble the data from the stream collected since the last clock tick and produces an RDD for distributed processing on the cluster.

  • When using the direct approach, only the batch interval tick is relevant.

  • During all processing, as is the case with a regular (batch) Spark job, blocks are signaled to the block manager, a component that ensures any block of data put into Spark is replicated according to the configured persistence level, for the purpose of fault tolerance.

  • On each batch interval, the RDD from which data was received during the previous batch interval becomes available, and is thus scheduled for processing during this batch.

Figure 18-1 illustrates how these elements come together to conceptually form a DStream.

spas 1801
Figure 18-1. The DStream structure: blocks and batches

To achieve the concurrent execution with a strict bulk-synchronous model, the barrier here would be the arrival of a new RDD at the batch interval. Except that in Spark Streaming, this is not really a barrier because the data delivery happens independently of the state of the cluster at the moment of the arrival of the new batch: Spark’s receivers do not wait for the cluster to be finished with receiving data to start on the new batch.

This is not a design fault; rather, it’s a consequence of Spark Streaming trying to do real stream processing at its most honest: despite having a microbatching model, Spark Streaming acknowledges that a stream has no predefined end, and that the system is required to receive data continuously.

The consequence of this relatively simple model is that the Spark Streaming job tasked with receiving data—the receiver—needs to be a job scheduled in the cluster to continuously work. If it ever were to crash, it would be restarted on another executor, continuing the ingestion of data without further interruption.

The Receiver Model

As we previously hinted, in Spark Streaming, the receiver is a process able to continuously collect data from the input stream regardless of the state of processing of the Spark cluster.

This component is an adaptor between the data delivered by the stream source and the data-processing engine in Spark Streaming. As an adaptor, it implements the specific API and semantics of the external stream and delivers that data using an internal contract. Figure 18-2 shows the role of the receiver in the DStream data flow.

spas 1802
Figure 18-2. Receiver model

The Receiver API

The most fundamental receiver consists of three methods:

def onStart()

Starts the reception of data from the external sources. In practice, onStart should asynchronously initiate the inbound data collection process and return immediately.

def store(...)

Delivers one or several data elements to Spark Streaming. store must be called from the asynchronous process initiated by onStart whenever there is new data available.

def stop(...)

Stops the receiving process. stop must take care of properly cleaning up any resources used by the receiving process started by onStart.

This model provides a generic interface that can be implemented to integrate a wide range of streaming data providers. Note how the genericity abstracts over the data delivery method of the streaming system. We can implement an always-connected push-based receiver, like a TCP client socket, as well as a request-based pull connector, like a REST/HTTP connector to some system.

How Receivers Work

The task of the receiver is to collect data from the stream source and deliver it to Spark Streaming. Intuitively, it’s very easy to understand: as data comes in, it’s collected and packaged in blocks during the time of the batch interval. As soon as each batch interval period of time is completed, the collected data blocks are given to Spark for processing.

Figure 18-3 depicts how the timing of this sequence of events takes place. At the start of the streaming process, the receiver begins collecting data. At the end of the t0 interval, the first collected block #0 is given to Spark for processing. At time t2, Spark is processing the data block collected at t1, while the receiver is collecting the data corresponding to block #2.

spas 1803
Figure 18-3. Receiver in action

We can generalize that, at any point in time, Spark is processing the previous batch of data, while the receivers are collecting data for the current interval. After a batch has been processed by Spark (like #0 in Figure 18-3), it can be cleaned up. The time when that RDD will be cleaned up is determined by the spark.cleaner.ttl setting.

The Receiver’s Data Flow

Figure 18-4 illustrates the data flow of a Spark application in this case.

spas 1804
Figure 18-4. The data flow of Spark’s receiver

In this figure, we can see that data ingestion occurs as a job that is translated into a single task on one executor. This task deals with connecting to a data source and initiates the data transfer. It is managed from the Spark Context, a bookkeeping object that resides within the driver machine.

On each block interval tick (as measured on the executor that is running the receiver), this machine groups the data received for the previous block interval into a block. The block is then registered with the Block Manager, also present in the bookkeeping of the Spark Context, on the driver. This process initiates the replication of the data that this block represents, to ensure that the source data in Spark Streaming is replicated the number of times indicated by the storage level.

On each batch interval tick (as measured on the driver), the driver groups the data received for the previous batch interval, which has been replicated the correct number of times, into an RDD. Spark registers this RDD with the JobScheduler, initiating the scheduling of a job on that particular RDD—in fact, the whole point of Spark Streaming’s microbatching model consists of repeatedly scheduling the user-defined program on successive batched RDDs of data.

The Internal Data Resilience

The fact that receivers are independent jobs has consequences, in particular for the resource usage and data delivery semantics. To execute its data collection job, the receiver consumes one core on an executor regardless of the amount of work it needs to do. Therefore, using a single streaming receiver will result in data ingestion done in sequence by a single core in an executor, which becomes the limiting factor to the amount of data that Spark Streaming can ingest.

The base unit of replication for Spark is a block: any block can be on one or several machines (up to the persistence level indicated in the configuration, so at most two by default), and it is only when a block has reached that persistence level that it can be processed. Therefore, it is only when an RDD has every block replicated that it can be taken into account for job scheduling.

At the Spark engine side, each block becomes a partition of the RDD. The combination of a partition of data and the work that needs to be applied to it becomes a task. Each task can be processed in parallel, usually on the executor that contains the data locally. Therefore, the level of parallelism we can expect for an RDD obtained from a single receiver is exactly the ratio of the batch interval to the block interval, as defined in Equation 18-1.

Equation 18-1. Single receiver partitioning
numberpartitions=batchintervalblockinterval

In Spark, the usual rule of thumb for task parallelism is to have two to three times ratio of the number of tasks to the number of available executor cores. Taking a factor of three for our discussion, we should set the block interval to that shown in Equation 18-2.

Equation 18-2. Block interval tuning
blockinterval=batchinterval3*Sparkcores

Receiver Parallelism

We mentioned that a single receiver would be a limiting factor to the amount of data that we can process with Spark Streaming.

A simple way to increase incoming data throughput is to declare more DStreams at the code level. Each DStream will be attached to its own consumer—and therefore each will have its own core consumption—on the cluster. The DStream operation union allows us to merge those streams, ensuring that we produce a single pipeline of data from our various input streams.

Let’s assume that we create DStreams in parallel an put them in a sequence:

val inputDstreams: Seq[DStream[(K,V)]] = Seq.fill(parallelism: Int) {
... // the input Stream creation function
}
val joinedStream  = ssc.union(inputDstreams)
Warning

The union of the created DStreams is important because this reduces the number of transformation pipelines on the input DStream to one. Not doing this will multiply the number of stages by the number of consumers, resulting in unnecessary overhead.

In this way, we can exploit the receiver parallelism, here represented by the #parallelism factor of concurrently created DStreams.

Balancing Resources: Receivers Versus Processing Cores

Given that each created receiver consumes its own core in a cluster, increasing consumer parallelism has consequences in the number of cores available for processing in our cluster.

Let’s imagine that we have a 12-core cluster that we want to dedicate to our streaming analytics application. When using a single receiver, we use one core for receiving and nine cores for processing data. The cluster might be underutilized because a single receiver might not receive enough data to give work to all of the available processing cores. Figure 18-5 illustrates that situation, in which the green nodes are being used for processing, and the gray nodes remain idle.

spas 1805
Figure 18-5. Single-receiver allocation

To improve cluster utilization, we increase the number of receivers, as we just discussed. In our hypothetical scenario, using four receivers provides us with a much better resource allocation, as shown in Figure 18-6.

spas 1806
Figure 18-6. Multiple-receiver allocation

The batch interval is fixed by the needs of the analysis and remains the same. What should the block interval be? Well, four DStreams ingesting in parallel will necessarily create four times as many blocks per block interval as one single DStream would. Therefore, with the same block interval, the number of partitions of the unionized DStream will be four times what it was in the original case. Hence, we can’t use the same block interval. Instead, we should use the following:

blockinterval=4*batchinterval3*Sparkcores

Because we want at least three partitions, we round this number down to the nearest millisecond.

Generalizing, with an arbitrary set of characteristics, we should use this:

blockinterval=receivers*batchintervalpartitionspercore*Sparkcores

Where the total number of cores used in the system is as follows:

total system cores=# of receivers+# of spark cores

Achieving Zero Data Loss with the Write-Ahead Log

The original receiver design, previous to Spark v1.2, had a major design flaw. While the receiver is collecting data for the current block, that data is only found in a memory buffer in the receiver process. Only when the block is completed and delivered does it become replicated in the cluster. In case of failure of the receiver, the data in that buffer gets lost and cannot be recovered, causing data loss.

To prevent data loss, data collected by the receiver is additionally appended to a log file on a reliable filesystem. This is known as the write-ahead log (WAL), a component commonly used in database design to guarantee reliable and durable data reception.

The WAL is an append-only structure in which data is written before it is delivered for processing. When data is known to be correctly processed, its entry in the log is marked as processed. In the database world, the equivalent process is the commit of the transaction in which the data is involved, making this log also known as the commit log.

In case of failure, data from the WAL is replayed from the record following the last registered commit, compensating in that way for the potential loss of data of the receiver. The combination of the WAL and the receiver is known as the reliable receiver. Streaming sources based on the reliable receiver model are known as reliable sources.

Enabling the WAL

To enable the WAL-backed data delivery to ensure zero data loss, we need to apply the following settings:

streamingContext.checkpoint(dir)

This directory is used for both checkpoints and the write-ahead logs.

spark.streaming.receiver.writeAheadLog.enable (default: false)

Set to true to enable the write-ahead process.

Note that due to the increased work of writing to the log, the overall throughput of the streaming job might lower and the overall resource usage might increase. As the WAL writes to a reliable filesystem, the infrastructure of that filesystem needs to have enough resources to accept the continuous write stream to the log, in terms of storage and processing capacity.

The Receiverless or Direct Model

Spark Streaming aims to be a generic stream-processing framework. Within that premise, the receiver model provides a general, source-agnostic contract that enables the integration of any streaming source. But some sources allow for a direct consumption model in which the role of the receiver as an intermediary in the data delivery process becomes unnecessary.

The increasing popularity of Kafka as a streaming backend for Spark Streaming jobs made it a focus of additional consideration. In the previous section, we learned about the WAL as a solution to achieve zero data loss for the receiver model in face of failure.

Kafka is, at its core, an implementation of a distributed commit log. When the Kafka reliable receiver was implemented, it became evident that the use of the WAL was duplicating the same functionality already present in Kafka. Moreover, consuming data from Kafka into a receiver was not even necessary. Let us recall that the receiver takes care of data redundancy through duplication of the blocks in the Spark memory. Kafka already replicates data reliability and provides equivalent data durability guarantees. To consume data from Kafka, all that was required was to track the offset of the data already processed and compute the offset of the new data received in a batch interval. Using these two offsets for each partition consumed would be sufficient to launch a Spark job that would directly consume the data segment determined by these two offsets and operate on it. When the processing of the microbatch succeeds, the consumed offset is committed.

The direct connector model is more of a manager than a data broker. Its role is to compute data segments to be processed by Spark and maintain the bookkeeping of data to be processed versus data already handled. Given the high-performance and low-latency data delivery characteristics of Kafka, this method turned out to be much faster and requires fewer resources than the receiver-based implementation.

Note

For the specific usage of the direct connector for Kafka, refer to Chapter 19.

Summary

So far, we have seen a primer on the Spark Streaming execution model and the fundamentals of how it treats stream processing:

  • Streams are aggregated data seen over time on a data source. On every block interval, a new partition of data is produced and replicated. In every batch interval (a multiple of the block interval), the resulting data is assembled into an RDD and a job can be scheduled on it.

  • Scheduling is done by user-defined functions in a script, but can also be the byproduct of some built-in functionality (e.g., checkpointing). The scheduling itself has a fixed core.

  • The most generic way of creating a DStream is the receiver model, which creates a job connecting to the input source on an executor, consuming one core. Parallelism can be increased in some cases by creating several DStreams.

  • Factors such as resource allocation and configuration parameters affect the overall performance of a streaming job, and there are options to tune such behavior.

  • Enabling the WAL prevents potential data loss at the expense of additional resource usage.

  • For particular systems, such as Kafka, that provide high-performance and durable data delivery guarantees, it’s possible to reduce the responsibilities of the receiver to minimal bookkeeping that computes microbatch intervals in terms native to the streaming system. This model, known as the direct model, is both more resource efficient and performant than having to copy and replicate the data into the memory of the Spark cluster.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset