Using Spark Streaming to subscribe to a Twitter stream

Just like all the other components of Spark, Spark Streaming is also scalable and fault-tolerant, it's just that it manages a stream of data instead of a large amount of data that Spark generally does. The way that Spark Streaming approaches streaming is unique in the sense that it accumulates streams into small batches called DStreams and then processes them as mini-batches, an approach usually called micro-batching. The component that receives the stream of data and splits it into time-bound windows of batches is called the receiver.

Once these batches are received, Spark takes these batches up, converts them into RDDs, and processes the RDDs in the same way as static datasets. The regular framework components such as the driver and executor stay the same. However, in terms of Spark Streaming, a DStream or Discretized stream is just a continuous stream of RDDs. Also, just like SQLContext served as an entry point to use SQL in Spark, there's StreamingContext that serves as an entry point for Spark Streaming.

In this recipe, we will subscribe to a Twitter stream and index (store) the tweets into ElasticSearch (https://www.elastic.co/).

How to do it...

The prerequisite to run this recipe is to have a running ElasticSearch instance on your machine.

  1. Running ElasticSearch: Running an instance of ElasticSearch is as simple as it gets. Just download the installable from https://www.elastic.co/downloads/elasticsearch and run bin/elasticsearch. This recipe uses the latest version 1.7.1.
    How to do it...
  2. Creating a Twitter app: In order to subscribe to tweets, Twitter requires us to create a Twitter app. Let's quickly set up a Twitter app in order to get the consumer key and the secret key. Visit https://apps.twitter.com/ using your login and click Create New App.
    How to do it...

    We will be using the consumer key, consumer secret key, the access token, and the access secret in our application.

    How to do it...
  3. Adding Spark Streaming and the Twitter dependency: There are two dependencies that need to be added here, the spark-streaming and the spark-streaming-twitter libraries:
    "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
    "org.apache.spark" %% "spark-streaming-twitter" % sparkVersion
  4. Creating a Twitter stream: Creating a Twitter stream is super easy in Spark. We just need to use TwitterUtils.createStream for this. TwitterUtils wraps around the twitter4j library (http://twitter4j.org/en/index.html) to provide first-class support in Spark.

    TwitterUtils.createStream expects a few parameters. Let's construct them one by one.

    • StreamingContext: StreamingContext could be constructed by passing in SparkContext and the time window of the batch:
        val streamingContext=new StreamingContext(sc, Seconds (5))
    • OAuthorization: The access and the consumer keys that comprise the OAuth credentials need to be passed in order to subscribe to the Twitter stream:
        val builder = new ConfigurationBuilder()
              .setOAuthConsumerKey(consumerKey)
                .setOAuthConsumerSecret(consumerSecret)
                .setOAuthAccessToken(accessToken)
                .setOAuthAccessTokenSecret(accessTokenSecret)
                .setUseSSL(true)
      
      
        val twitterAuth = Some(new OAuthAuthorization(builder.build()))
    • Filter criteria: You are free to skip this parameter if your intention is to subscribe to (a sample of) the universe of the tweets. For this recipe, we'll add some filter criteria to it:
          val filter=List("fashion", "tech", "startup", "spark")
    • StorageLevel: This is where our received objects that come in batches need to be stored. The default is memory with a capability to overflow to disk. Once this is constructed, let's construct the Twitter stream itself:
        val stream=TwitterUtils.createStream(streamingContext, twitterAuth, filter, StorageLevel.MEMORY_AND_DISK)
  5. Saving the stream to ElasticSearch: Writing the Tweets to ElasticSearch involves three steps:
    1. Adding the ElasticSearch-Spark dependency: Let's add the appropriate version of ElasticSearch Spark to our build.sbt:
      "org.elasticsearch" %% "elasticsearch-spark" % "2.1.0"
    2. Configuring the ElasticSearch server location in the Spark configuration: ElasticSearch has a subproject called elasticsearch-spark that makes ElasticSeach a first-class citizen in the Spark world. The org.elasticsearch.spark package exposes some convenient functions that convert a case class to JSON (deriving types) and indexes to ElasticSearch. The package also provides some really cool implicits that provide functions to save RDD into ElasticSearch and load data from ElasticSearch as an RDD. We'll be looking at those functions shortly.

      The ElasticSearch target node URL could be specified in the Spark configuration. By default, it points to localhost and port 9200. If required, we could customize it:

      //Default is localhost. Point to ES node when required
      
      val conf = new SparkConf()
          .setAppName("TwitterStreaming")
          .setMaster("local[2]")
          .set(ConfigurationOptions.ES_NODES, "localhost")
                    .set(ConfigurationOptions.ES_PORT, "9200")
    3. Converting the stream into a case class: If we are not interested in pushing the data to ElasticSearch and are interested only in printing some values in twitter4j.Status, stream.foreach will help us iterate through the RDD[Status]. However, in this recipe, we will be extracting some data from twitter4j.Status and pushing it to ElasticSearch. For this purpose, a case class SimpleStatus is created. The reason why we are extracting data out as a case class is that twitter4j.Status has way too much information that we don't want to index:
      case class SimpleStatus(id:String, content:String, date:Date, hashTags:Array[String]=Array[String](),
                              urls:Array[String]=Array[String](),
                              user:String, userName:String, userFollowerCount:Long)

The twitter4j.Status is converted to SimpleStatus using a convertToSimple function that extracts only the required information:

 def convertToSimple(status: twitter4j.Status): SimpleStatus = {
    val hashTags: Array[String] = status.getHashtagEntities().map(eachHT => eachHT.getText())
    val urlArray = if (status.getURLEntities != null) status.getURLEntities().foldLeft((Array[String]()))((r, c) => (r :+ c.getExpandedURL())) else Array[String]()
    val user = status.getUser()
    val utcDate = new Date(dateTimeZone.convertLocalToUTC(status.getCreatedAt.getTime, false))

    SimpleStatus(id = status.getId.toString, content = status.getText(), utcDate,
      hashTags = hashTags, urls = urlArray,
      user = user.getScreenName(), userName = user.getName, userFollowerCount = user.getFollowersCount)
  }

Once we map the twitter4j.Status to SimpleStatus, we now have a RDD[SimpleStatus]. We can now iterate over the RDD[SimpleStatus] and push every RDD to ElasticSearch's "spark" index. "twstatus" is the index type. In RDBMS terms, an index is like a database schema and the index type is like a table:

stream.map(convertToSimple).foreachRDD { statusRdd =>
    println(statusRdd)
    statusRdd.saveToEs("spark/twstatus")
}

We could confirm the indexing by pointing to ElasticSearch's spark index using Sense, a must-have Chrome plugin for ElasticSearch, or simply by performing a curl request:

curl -XGET "http://localhost:9200/spark/_search" -d'
{
    "query": {
        "match_all": {}
    }
}'

Note

The Sense plugin for Chrome can be downloaded from the Chrome store at: https://chrome.google.com/webstore/detail/sense-beta/lhjgkmllcaadmopgmanpapmpjgmfcfig?hl=en.

How to do it...
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset