Real-time streaming applications are meant to be long running and resilient to failures of all sorts. Spark Streaming implements a checkpointing mechanism that maintains enough information to recover from failures.
There are two types of data that needs to be checkpointed:
- Metadata checkpointing
- Data checkpointing
Checkpointing can be enabled by calling checkpoint() function on the StreamingContext as follows:
def checkpoint(directory: String)
Specifies the directory where the checkpoint data will be reliably stored.
Note that this must be a fault-tolerant file system like HDFS.
Once checkpoint directory is set, any DStream can be checkpointed into the directory based on a specified interval. Looking at the Twitter example, we can checkpoint each DStream every 10 seconds into the directory checkpoints:
val ssc = new StreamingContext(sc, Seconds(5))
val twitterStream = TwitterUtils.createStream(ssc, None)
val wordStream = twitterStream.flatMap(x => x.getText().split(" "))
val aggStream = twitterStream
.flatMap(x => x.getText.split(" ")).filter(_.startsWith("#"))
.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(15), Seconds(10), 5)
ssc.checkpoint("checkpoints")
aggStream.checkpoint(Seconds(10))
wordStream.checkpoint(Seconds(10))
The checkpoints directory looks something like the following after few seconds, showing the metadata as well as the RDDs and the logfiles are maintained as part of the checkpointing: