Application monitoring is an integral part of any robust deployment. Monitoring provides insights on the application performance characteristics over time by collecting and processing metrics that quantify different aspects of the application’s performance, such as responsiveness, resource usage, and task-specific indicators.
Streaming applications have strict requirements regarding response times and throughput. In the case of distributed applications like Spark, the number of variables that we need to account for during the application’s lifetime are multiplied by the complexities of running on a cluster of machines. In the context of a cluster, we need to keep tabs on resource usage, like CPU, memory, and secondary storage across different hosts, from the perspective of each host, as well as a consolidated view of the running application.
For example, imagine an application running on 10 different executors. The total memory usage indicator shows a 15% increase, which might be within the expected tolerance for this application, but then, we notice that the increase comes from a single node. Such imbalance needs investigation because it will potentially cause a failure when that node runs out of memory. It also implies that there is potentially an unbalanced distribution of work that’s causing a bottleneck. Without proper monitoring, we would not observe such behavior in the first place.
The operational metrics of Structured Streaming can be exposed through three different channels:
The Spark metrics subsystem
The StreamingQuery
instance returned by the writeStream.start
operation
The StreamingQueryListener
interface
As we detail in the following sections, these interfaces offer different levels of detail and exposure to cater for different monitoring needs.
Available through the Spark core engine, the Spark metrics subsystem offers a configurable metrics collection and reporting API with a pluggable sink interface—not to be confused with the streaming sinks that we discussed earlier in this book. Spark comes with several such sinks, including HTTP, JMX, and comma-separated values (CSV) files. In addition to that, there’s a Ganglia sink that needs additional compilation flags due to licensing restrictions.
The HTTP sink is enabled by default.
It’s implemented by a servlet that registers an endpoint on the driver host on the same port as the Spark UI.
The metrics are accessible at the /metrics/json
endpoint.
Other sinks can be enabled through configuration.
The choice of a given sink is driven by the monitoring infrastructure with which we want to integrate.
For example, the JMX sink is a common option to integrate with Prometheus, a popular metric collector in the Kubernetes cluster scheduler.
To acquire metrics from a Structured Streaming job, we first must enable the internal reporting of such metrics.
We achieve that by setting the configuration flag spark.sql.streaming.metricsEnabled
to true
, as demonstrated here:
// at session creation time
val
spark
=
SparkSession
.
builder
()
.
appName
(
"SparkSessionExample"
)
.
config
(
"spark.sql.streaming.metricsEnabled"
,
true
)
.
config
(...)
.
getOrCreate
()
// by setting the config value
spark
.
conf
.
set
(
"spark.sql.streaming.metricsEnabled"
,
"true"
)
// or by using the SQL configuration
spark
.
sql
(
"SET spark.sql.streaming.metricsEnabled=true"
)
With this configuration in place, the metrics reported will contain three additional metrics for each streaming query running in the same SparkSession
context:
The total number of messages ingested per trigger interval
The processing time for the trigger interval
The speed at which the records are being processed
As we have seen through previous Structured Streaming examples, the call to start a streaming query produces a StreamingQuery
result.
Let’s zoom in on the weatherEventsMovingAverage
from Example 13-1:
val
query
=
scoredStream
.
writeStream
.
format
(
"memory"
)
.
queryName
(
"memory_predictions"
)
.
start
()
query
:
org.apache.spark.sql.streaming.StreamingQuery
=
org
.
apache
.
spark
.
sql
.
execution
.
streaming
.
StreamingQueryWrapper
@
7875
ee2b
The result we obtain from that call in the query
value is a StreamingQuery
instance.
A StreamingQuery
is a handler to the actual streaming query that is running continuously in the engine.
This handler contains methods to inspect the execution of the query and control its life cycle.
Some interesting methods are:
query.awaitTermination()
Blocks the current thread until the query ends, either because it’s stopped or because it encountered an error. This method is useful to block the main
thread and prevent it from terminating early.
query.stop()
Stops the execution of the query.
query.exception()
Retrieves any fatal exception encountered by the execution of the query. This method returns None
when the query is operating normally. After a query stops, inspecting this value informs us as to whether it failed and for what reason.
query.status()
Shows a brief snapshot of what the query is currently doing.
For example, retrieving the query.status
of a running query shows a result similar to this:
$query
.
status
res
:
org.apache.spark.sql.streaming.StreamingQueryStatus
=
{
"message"
:
"
Processing
new
data
"
,
"isDataAvailable"
:
true
,
"isTriggerActive"
:
false
}
Even though the status information is not very revealing when everything is working correctly, it can be useful when developing a new job.
query.start()
is silent when an error occurs.
Consulting query.status()
might reveal that there is a problem, in which case, query.exception
will return the cause.
In Example 14-1, we used an incorrect schema as input for a Kafka sink.
If we recall from “The Kafka Sink”, a Kafka sink requires a mandatory field in the output stream: value
(even key
is optional).
In this case, query.status
provided relevant feedback to solve that issue.
query.status
shows the reason for a stream failureres
:
org.apache.spark.sql.streaming.StreamingQueryStatus
=
{
"message"
:
"
Terminated
with
exception:
Required
attribute
'
value
'
not
found
"
,
"isDataAvailable"
:
false
,
"isTriggerActive"
:
false
}
The methods in StreamingQueryStatus
are thread-safe, meaning that they can be called concurrently from another thread without risking corruption of the query state.
For the purpose of monitoring, we are more interested in a set of methods that provide insights into the query execution metrics.
The StreamingQuery
handler offers two such methods:
query.lastProgress
Retrieves the most recent StreamingQueryProgress
report.
query.recentProgress
Retrieves an array of the most recent StreamingQueryProgress
reports.
The maximum number of progress objects retrieved can be set using the configuration parameter spark.sql.streaming.numRecentProgressUpdates
in the Spark Session.
If you do not set this configuration, it defaults to the last 100 reports.
As we can appreciate in Example 14-2, each StreamingQueryProgress
instance offers a comprehensive snapshot of the query performance produced at each trigger.
StreamingQueryProgress
sample{
"id"
:
"639503f1-b6d0-49a5-89f2-402eb262ad26"
,
"runId"
:
"85d6c7d8-0d93-4cc0-bf3c-b84a4eda8b12"
,
"name"
:
"memory_predictions"
,
"timestamp"
:
"2018-08-19T14:40:10.033Z"
,
"batchId"
:
34
,
"numInputRows"
:
37
,
"inputRowsPerSecond"
:
500.0
,
"processedRowsPerSecond"
:
627.1186440677966
,
"durationMs"
:
{
"addBatch"
:
31
,
"getBatch"
:
3
,
"getOffset"
:
1
,
"queryPlanning"
:
14
,
"triggerExecution"
:
59
,
"walCommit"
:
10
},
"stateOperators"
:
[],
"sources"
:
[
{
"description"
:
"KafkaSource[Subscribe[sensor-office-src]]"
,
"startOffset"
:
{
"sensor-office-src"
:
{
"0"
:
606580
}
},
"endOffset"
:
{
"sensor-office-src"
:
{
"0"
:
606617
}
},
"numInputRows"
:
37
,
"inputRowsPerSecond"
:
500.0
,
"processedRowsPerSecond"
:
627.1186440677966
}
],
"sink"
:
{
"description"
:
"MemorySink"
}
}
From the perspective of monitoring the job’s performance, we are particularly interested in
numInputRows
, inputRowsPerSecond
, and processedRowsPerSecond
.
These self-describing fields provide key indicators about the job performance.
If we have more data than our query can process, inputRowsPerSecond
will be higher than processedRowsPerSecond
for sustained periods of time.
This might indicate that the cluster resources allocated for this job should be increased to reach a sustainable long-term performance.
Monitoring is a “day 2 operations” concern, and we require an automated collection of performance metrics to enable other processes such as capacity management, alerting, and operational support.
The inspection methods made available by the StreamingQuery
handler that we saw in the previous section are useful when we work on an interactive environment such as the Spark shell or a notebook, like we use in the exercises of this book.
In an interactive setting, we have the opportunity to manually sample the output of the StreamingQueryProgress
to get an initial idea about the performance characteristics of our job.
Yet, the StreamingQuery
methods are not automation friendly.
Given that a new progress record becomes available at each streaming trigger, automating a method to collect information from this interface needs to be coupled to the internal scheduling of the streaming job.
Luckily, Structured Streaming provides the StreamingQueryListener
, a listener-based interface that provides asynchronous callbacks to report updates in the life cycle of a streaming job.
To hook up to the internal event bus, we must provide an implementation of the StreamingQueryListener
interface and register it to the running SparkSession
.
StreamingQueryListener
consists of three methods:
onQueryStarted(event: QueryStartedEvent)
Called when a streaming query starts.
The event
provides a unique id
for the query and a runId
that changes if the query is stopped and restarted. This callback is called synchronously with the start of the query and should not be blocked.
onQueryTerminated(event: QueryTerminatedEvent)
Called when a streaming query is stopped. The event
contains id
and runId
fields that correlate with the start event. It also provides an exception
field that contains an exception
if the query failed due to an error.
onQueryProgress(event: StreamingQueryProgress)
Called at each query trigger. The event
contains a progress
field that encapsulates a StreamingQueryProgress
instance that we know already from “Getting Metrics with StreamingQueryProgress”. This callback provides us with the events that we need to monitor the query performance.
Example 14-3 illustrates the implementation of a simplified version of such a listener.
This chartListener
, when instantiated from a notebook, plots the input and processing rates per second.
import
org.apache.spark.sql.streaming.StreamingQueryListener
import
org.apache.spark.sql.streaming.StreamingQueryListener._
val
chartListener
=
new
StreamingQueryListener
()
{
val
MaxDataPoints
=
100
// a mutable reference to an immutable container to buffer n data points
var
data
:
List
[
Metric
]
=
Nil
def
onQueryStarted
(
event
:
QueryStartedEvent
)
=
()
def
onQueryTerminated
(
event
:
QueryTerminatedEvent
)
=
()
def
onQueryProgress
(
event
:
QueryProgressEvent
)
=
{
val
queryProgress
=
event
.
progress
// ignore zero-valued events
if
(
queryProgress
.
numInputRows
>
0
)
{
val
time
=
queryProgress
.
timestamp
val
input
=
Metric
(
"in"
,
time
,
event
.
progress
.
inputRowsPerSecond
)
val
processed
=
Metric
(
"proc"
,
time
,
event
.
progress
.
processedRowsPerSecond
)
data
=
(
input
::
processed
::
data
).
take
(
MaxDataPoints
)
chart
.
applyOn
(
data
)
}
}
}
After a listener instance has been defined, it must be attached to the event bus, using the addListener
method in the SparkSession
:
sparkSession
.
streams
.
addListener
(
chartListener
)
After running this chartListener
against one of the notebooks included in this book’s online resources, we can visualize the input and processing rates, as shown in Figure 14-1.
Similar listener implementations can be used to send metric reports to popular monitoring systems, such as Prometheus, Graphite, or queryable databases like InfluxDB, which can be easily integrated with dashboard applications such as Grafana.