How to do it...

  1. Create a Twitter account if you do not already have one.
  2. Go to
  3. Click on Create New App.
  4. Fill out the Name, Description, Website, and Callback URL fields and then click on Create your Twitter Application. You will receive a screen like this:
  1. You will reach the Application Management screen.
  2. Navigate to Keys and Access Tokens | Create my access Token:
  1. Note down the four values on this screen; we will use them in step 14:
  • Consumer key (the API Key)
  • Consumer secret (the API Secret)
  • Access token
  • Access token secret
  1. Fill in the values on this screen in some time, but, for now, let's download the third-party libraries needed from Maven central:
$ wget
$ wget
$ wget
  1. Open the Spark shell, supplying the preceding three JARs as dependencies:
$ spark-shell --packages org.twitter4j:twitter4j-core:4.0.5,org.twitter4j:twitter4j-stream:4.0.5 --jars dstream-twitter_2.11-0.1.0.jar
  1. Perform imports that are Twitter-specific:
scala> import org.apache.spark.streaming.twitter._
scala> import twitter4j.auth._
scala> import twitter4j.conf._
  1. Stream specific imports:
scala> import org.apache.spark.streaming.{Seconds, StreamingContext}
  1. Do necessary imports:
scala>import org.apache.spark._
scala> import org.apache.spark.streaming._
scala> import org.apache.spark.streaming.StreamingContext._
  1. Create StreamingContext with a 10-second batch interval:
scala> val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
  1. Create StreamingContext with a 2-second batch interval:
scala> val cb = new ConfigurationBuilder
.setOAuthConsumerSecret("x6Y0zcTBOwVxpvekSCnGzbi3NYN rM5b8ZMZRIPI1XRC3pDyOs1")
.setOAuthAccessToken("31548859-DHbESdk6YoghCLcfhMF8 8QEFDvEjxbM6Q90eoZTGl")
.setOAuthAccessTokenSecret("wjcWPvtejZSbp9cgLejUdd6W1 MJqFzm5lByUFZl1NYgrV")
val auth = new OAuthAuthorization(
These are sample values; you put your own values.
  1. Create Twitter's DStream:
scala> val tweets = TwitterUtils.createStream(ssc,auth)
  1. Filter out English tweets:
scala> val englishTweets = tweets.filter(_.getLang()=="en")
  1. Get the text out of the tweets:
scala> val status = => status.getText)
  1. Set the checkpoint directory:
scala> ssc.checkpoint("hdfs://localhost:9000/user/hduser/checkpoint")
  1. Start StreamingContext:
scala> ssc.start
scala> ssc.awaitTermination
  1. You can put all these commands together using :paste:
scala> :paste
import org.apache.spark.streaming.twitter._
import twitter4j.auth._
import twitter4j.conf._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
val cb = new ConfigurationBuilder
cb.setDebugEnabled(true).setOAuthConsumerKey("FKNryYEKe CrKzGV7zuZW4EKeN")
.setOAuthConsumerSecret("x6Y0zcTBOwVxpvekSCnGzbi3NYNr M5b8ZMZRIPI1XRC3pDyOs1")
.setOAuthAccessToken("31548859-DHbESdk6YoghCLcfhMF88Q EFDvEjxbM6Q90eoZTGl")
.setOAuthAccessTokenSecret("wjcWPvtejZSbp9cgLejUdd6W1 MJqFzm5lByUFZl1NYgrV")
val auth = new OAuthAuthorization(
val tweets = TwitterUtils.createStream(ssc,Some(auth))
val englishTweets = tweets.filter(_.getLang()=="en")
val status = => status.getText)
