Chapter 14. Monitoring Structured Streaming Applications

Application monitoring is an integral part of any robust deployment. Monitoring provides insights on the application performance characteristics over time by collecting and processing metrics that quantify different aspects of the application’s performance, such as responsiveness, resource usage, and task-specific indicators.

Streaming applications have strict requirements regarding response times and throughput. In the case of distributed applications like Spark, the number of variables that we need to account for during the application’s lifetime are multiplied by the complexities of running on a cluster of machines. In the context of a cluster, we need to keep tabs on resource usage, like CPU, memory, and secondary storage across different hosts, from the perspective of each host, as well as a consolidated view of the running application.

For example, imagine an application running on 10 different executors. The total memory usage indicator shows a 15% increase, which might be within the expected tolerance for this application, but then, we notice that the increase comes from a single node. Such imbalance needs investigation because it will potentially cause a failure when that node runs out of memory. It also implies that there is potentially an unbalanced distribution of work that’s causing a bottleneck. Without proper monitoring, we would not observe such behavior in the first place.

The operational metrics of Structured Streaming can be exposed through three different channels:

  • The Spark metrics subsystem

  • The StreamingQuery instance returned by the writeStream.start operation

  • The StreamingQueryListener interface

As we detail in the following sections, these interfaces offer different levels of detail and exposure to cater for different monitoring needs.

The Spark Metrics Subsystem

Available through the Spark core engine, the Spark metrics subsystem offers a configurable metrics collection and reporting API with a pluggable sink interface—not to be confused with the streaming sinks that we discussed earlier in this book. Spark comes with several such sinks, including HTTP, JMX, and comma-separated values (CSV) files. In addition to that, there’s a Ganglia sink that needs additional compilation flags due to licensing restrictions.

The HTTP sink is enabled by default. It’s implemented by a servlet that registers an endpoint on the driver host on the same port as the Spark UI. The metrics are accessible at the /metrics/json endpoint. Other sinks can be enabled through configuration. The choice of a given sink is driven by the monitoring infrastructure with which we want to integrate. For example, the JMX sink is a common option to integrate with Prometheus, a popular metric collector in the Kubernetes cluster scheduler.

Structured Streaming Metrics

To acquire metrics from a Structured Streaming job, we first must enable the internal reporting of such metrics. We achieve that by setting the configuration flag spark.sql.streaming.metricsEnabled to true, as demonstrated here:

// at session creation time
val spark = SparkSession
   .builder()
   .appName("SparkSessionExample")
   .config("spark.sql.streaming.metricsEnabled", true)
   .config(...)
   .getOrCreate()

// by setting the config value
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")

// or by using the SQL configuration
spark.sql("SET spark.sql.streaming.metricsEnabled=true")

With this configuration in place, the metrics reported will contain three additional metrics for each streaming query running in the same SparkSession context:

inputRate-total

The total number of messages ingested per trigger interval

latency

The processing time for the trigger interval

processingRate-total

The speed at which the records are being processed

The StreamingQuery Instance

As we have seen through previous Structured Streaming examples, the call to start a streaming query produces a StreamingQuery result. Let’s zoom in on the weatherEventsMovingAverage from Example 13-1:

val query = scoredStream.writeStream
        .format("memory")
        .queryName("memory_predictions")
        .start()

query: org.apache.spark.sql.streaming.StreamingQuery =
  org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7875ee2b

The result we obtain from that call in the query value is a StreamingQuery instance. A StreamingQuery is a handler to the actual streaming query that is running continuously in the engine. This handler contains methods to inspect the execution of the query and control its life cycle. Some interesting methods are:

query.awaitTermination()

Blocks the current thread until the query ends, either because it’s stopped or because it encountered an error. This method is useful to block the main thread and prevent it from terminating early.

query.stop()

Stops the execution of the query.

query.exception()

Retrieves any fatal exception encountered by the execution of the query. This method returns None when the query is operating normally. After a query stops, inspecting this value informs us as to whether it failed and for what reason.

query.status()

Shows a brief snapshot of what the query is currently doing.

For example, retrieving the query.status of a running query shows a result similar to this:

$query.status
res: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Processing new data",
  "isDataAvailable" : true,
  "isTriggerActive" : false
}

Even though the status information is not very revealing when everything is working correctly, it can be useful when developing a new job. query.start() is silent when an error occurs. Consulting query.status() might reveal that there is a problem, in which case, query.exception will return the cause.

In Example 14-1, we used an incorrect schema as input for a Kafka sink. If we recall from “The Kafka Sink”, a Kafka sink requires a mandatory field in the output stream: value (even key is optional). In this case, query.status provided relevant feedback to solve that issue.

Example 14-1. query.status shows the reason for a stream failure
res: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message": "Terminated with exception: Required attribute 'value' not found",
  "isDataAvailable": false,
  "isTriggerActive": false
}

The methods in StreamingQueryStatus are thread-safe, meaning that they can be called concurrently from another thread without risking corruption of the query state.

Getting Metrics with StreamingQueryProgress

For the purpose of monitoring, we are more interested in a set of methods that provide insights into the query execution metrics. The StreamingQuery handler offers two such methods:

query.lastProgress

Retrieves the most recent StreamingQueryProgress report.

query.recentProgress

Retrieves an array of the most recent StreamingQueryProgress reports. The maximum number of progress objects retrieved can be set using the configuration parameter spark.sql.streaming.numRecentProgressUpdates in the Spark Session. If you do not set this configuration, it defaults to the last 100 reports.

As we can appreciate in Example 14-2, each StreamingQueryProgress instance offers a comprehensive snapshot of the query performance produced at each trigger.

Example 14-2. StreamingQueryProgress sample
{
  "id": "639503f1-b6d0-49a5-89f2-402eb262ad26",
  "runId": "85d6c7d8-0d93-4cc0-bf3c-b84a4eda8b12",
  "name": "memory_predictions",
  "timestamp": "2018-08-19T14:40:10.033Z",
  "batchId": 34,
  "numInputRows": 37,
  "inputRowsPerSecond": 500.0,
  "processedRowsPerSecond": 627.1186440677966,
  "durationMs": {
    "addBatch": 31,
    "getBatch": 3,
    "getOffset": 1,
    "queryPlanning": 14,
    "triggerExecution": 59,
    "walCommit": 10
  },
  "stateOperators": [],
  "sources": [
    {
      "description": "KafkaSource[Subscribe[sensor-office-src]]",
      "startOffset": {
        "sensor-office-src": {
          "0": 606580
        }
      },
      "endOffset": {
        "sensor-office-src": {
          "0": 606617
        }
      },
      "numInputRows": 37,
      "inputRowsPerSecond": 500.0,
      "processedRowsPerSecond": 627.1186440677966
    }
  ],
  "sink": {
    "description": "MemorySink"
  }
}

From the perspective of monitoring the job’s performance, we are particularly interested in numInputRows, inputRowsPerSecond, and processedRowsPerSecond. These self-describing fields provide key indicators about the job performance. If we have more data than our query can process, inputRowsPerSecond will be higher than processedRowsPerSecond for sustained periods of time. This might indicate that the cluster resources allocated for this job should be increased to reach a sustainable long-term performance.

The StreamingQueryListener Interface

Monitoring is a “day 2 operations” concern, and we require an automated collection of performance metrics to enable other processes such as capacity management, alerting, and operational support.

The inspection methods made available by the StreamingQuery handler that we saw in the previous section are useful when we work on an interactive environment such as the Spark shell or a notebook, like we use in the exercises of this book. In an interactive setting, we have the opportunity to manually sample the output of the StreamingQueryProgress to get an initial idea about the performance characteristics of our job.

Yet, the StreamingQuery methods are not automation friendly. Given that a new progress record becomes available at each streaming trigger, automating a method to collect information from this interface needs to be coupled to the internal scheduling of the streaming job.

Luckily, Structured Streaming provides the StreamingQueryListener, a listener-based interface that provides asynchronous callbacks to report updates in the life cycle of a streaming job.

Implementing a StreamingQueryListener

To hook up to the internal event bus, we must provide an implementation of the StreamingQueryListener interface and register it to the running SparkSession.

StreamingQueryListener consists of three methods:

onQueryStarted(event: QueryStartedEvent)

Called when a streaming query starts. The event provides a unique id for the query and a runId that changes if the query is stopped and restarted. This callback is called synchronously with the start of the query and should not be blocked.

onQueryTerminated(event: QueryTerminatedEvent)

Called when a streaming query is stopped. The event contains id and runId fields that correlate with the start event. It also provides an exception field that contains an exception if the query failed due to an error.

onQueryProgress(event: StreamingQueryProgress)

Called at each query trigger. The event contains a progress field that encapsulates a StreamingQueryProgress instance that we know already from “Getting Metrics with StreamingQueryProgress”. This callback provides us with the events that we need to monitor the query performance.

Example 14-3 illustrates the implementation of a simplified version of such a listener. This chartListener, when instantiated from a notebook, plots the input and processing rates per second.

Example 14-3. Plotting streaming job performance
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val chartListener = new StreamingQueryListener() {
  val MaxDataPoints = 100
  // a mutable reference to an immutable container to buffer n data points
  var data: List[Metric] = Nil

  def onQueryStarted(event: QueryStartedEvent) = ()

  def onQueryTerminated(event: QueryTerminatedEvent) = ()

  def onQueryProgress(event: QueryProgressEvent) = {
    val queryProgress = event.progress
    // ignore zero-valued events
    if (queryProgress.numInputRows > 0) {
      val time = queryProgress.timestamp
      val input = Metric("in", time, event.progress.inputRowsPerSecond)
      val processed = Metric("proc", time, event.progress.processedRowsPerSecond)
      data = (input :: processed :: data).take(MaxDataPoints)
      chart.applyOn(data)
    }
  }
}

After a listener instance has been defined, it must be attached to the event bus, using the addListener method in the SparkSession:

sparkSession.streams.addListener(chartListener)

After running this chartListener against one of the notebooks included in this book’s online resources, we can visualize the input and processing rates, as shown in Figure 14-1.

spas 1401
Figure 14-1. Input and processing streaming rates

Similar listener implementations can be used to send metric reports to popular monitoring systems, such as Prometheus, Graphite, or queryable databases like InfluxDB, which can be easily integrated with dashboard applications such as Grafana.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset