Just like all the other components of Spark, Spark Streaming is also scalable and fault-tolerant, it's just that it manages a stream of data instead of a large amount of data that Spark generally does. The way that Spark Streaming approaches streaming is unique in the sense that it accumulates streams into small batches called DStreams and then processes them as mini-batches, an approach usually called micro-batching. The component that receives the stream of data and splits it into time-bound windows of batches is called the receiver.
Once these batches are received, Spark takes these batches up, converts them into RDDs, and processes the RDDs in the same way as static datasets. The regular framework components such as the driver and executor stay the same. However, in terms of Spark Streaming, a DStream or Discretized stream is just a continuous stream of RDDs. Also, just like SQLContext
served as an entry point to use SQL in Spark, there's StreamingContext
that serves as an entry point for Spark Streaming.
In this recipe, we will subscribe to a Twitter stream and index (store) the tweets into ElasticSearch (https://www.elastic.co/).
The prerequisite to run this recipe is to have a running ElasticSearch instance on your machine.
bin/elasticsearch
. This recipe uses the latest version 1.7.1.We will be using the consumer key, consumer secret key, the access token, and the access secret in our application.
spark-streaming
and the spark-streaming-twitter
libraries:"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", "org.apache.spark" %% "spark-streaming-twitter" % sparkVersion
TwitterUtils.createStream
for this. TwitterUtils
wraps around the twitter4j
library (http://twitter4j.org/en/index.html) to provide first-class support in Spark.TwitterUtils.createStream
expects a few parameters. Let's construct them one by one.
StreamingContext
: StreamingContext
could be constructed by passing in SparkContext
and the time window of the batch:val streamingContext=new StreamingContext(sc, Seconds (5))
OAuth
credentials need to be passed in order to subscribe to the Twitter stream:val builder = new ConfigurationBuilder() .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret) .setUseSSL(true) val twitterAuth = Some(new OAuthAuthorization(builder.build()))
val filter=List("fashion", "tech", "startup", "spark")
StorageLevel
: This is where our received objects that come in batches need to be stored. The default is memory with a capability to overflow to disk. Once this is constructed, let's construct the Twitter stream itself:val stream=TwitterUtils.createStream(streamingContext, twitterAuth, filter, StorageLevel.MEMORY_AND_DISK)
ElasticSearch-Spark
dependency: Let's add the appropriate version of ElasticSearch Spark to our build.sbt
:"org.elasticsearch" %% "elasticsearch-spark" % "2.1.0"
elasticsearch-spark
that makes ElasticSeach a first-class citizen in the Spark world. The org.elasticsearch.spark
package exposes some convenient functions that convert a case class to JSON (deriving types) and indexes to ElasticSearch. The package also provides some really cool implicits that provide functions to save RDD into ElasticSearch and load data from ElasticSearch as an RDD. We'll be looking at those functions shortly.The ElasticSearch target node URL could be specified in the Spark configuration. By default, it points to localhost and port 9200. If required, we could customize it:
//Default is localhost. Point to ES node when required val conf = new SparkConf() .setAppName("TwitterStreaming") .setMaster("local[2]") .set(ConfigurationOptions.ES_NODES, "localhost") .set(ConfigurationOptions.ES_PORT, "9200")
twitter4j.Status
, stream.foreach
will help us iterate through the RDD[Status]
. However, in this recipe, we will be extracting some data from twitter4j.Status
and pushing it to ElasticSearch. For this purpose, a case class SimpleStatus
is created. The reason why we are extracting data out as a case class is that twitter4j.Status
has way too much information that we don't want to index:case class SimpleStatus(id:String, content:String, date:Date, hashTags:Array[String]=Array[String](), urls:Array[String]=Array[String](), user:String, userName:String, userFollowerCount:Long)
The twitter4j.Status
is converted to SimpleStatus
using a convertToSimple
function that extracts only the required information:
def convertToSimple(status: twitter4j.Status): SimpleStatus = { val hashTags: Array[String] = status.getHashtagEntities().map(eachHT => eachHT.getText()) val urlArray = if (status.getURLEntities != null) status.getURLEntities().foldLeft((Array[String]()))((r, c) => (r :+ c.getExpandedURL())) else Array[String]() val user = status.getUser() val utcDate = new Date(dateTimeZone.convertLocalToUTC(status.getCreatedAt.getTime, false)) SimpleStatus(id = status.getId.toString, content = status.getText(), utcDate, hashTags = hashTags, urls = urlArray, user = user.getScreenName(), userName = user.getName, userFollowerCount = user.getFollowersCount) }
Once we map the twitter4j.Status
to SimpleStatus
, we now have a RDD[SimpleStatus]
. We can now iterate over the RDD[SimpleStatus]
and push every RDD to ElasticSearch's "spark"
index. "twstatus"
is the index type. In RDBMS terms, an index is like a database schema and the index type is like a table:
stream.map(convertToSimple).foreachRDD { statusRdd => println(statusRdd) statusRdd.saveToEs("spark/twstatus") }
We could confirm the indexing by pointing to ElasticSearch's spark
index using Sense, a must-have Chrome plugin for ElasticSearch, or simply by performing a curl
request:
curl -XGET "http://localhost:9200/spark/_search" -d' { "query": { "match_all": {} } }'
The Sense plugin for Chrome can be downloaded from the Chrome store at: https://chrome.google.com/webstore/detail/sense-beta/lhjgkmllcaadmopgmanpapmpjgmfcfig?hl=en.