How to do it...

  1. Create a Twitter account if you do not already have one.
  2. Go to http://apps.twitter.com.
  3. Click on Create New App.
  4. Fill out the Name, Description, Website, and Callback URL fields and then click on Create your Twitter Application. You will receive a screen like this:
  1. You will reach the Application Management screen.
  2. Navigate to Keys and Access Tokens | Create my access Token:
  1. Note down the four values on this screen; we will use them in step 14:
  • Consumer key (the API Key)
  • Consumer secret (the API Secret)
  • Access token
  • Access token secret
  1. Fill in the values on this screen in some time, but, for now, let's download the third-party libraries needed from Maven central:
$ wget http://central.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.2.0/spark-streaming-twitter_2.10-1.2.0.jar
$ wget http://central.maven.org/maven2/org/twitter4j/twitter4j-stream/4.0.2/twitter4j-stream-4.0.2.jar
$ wget http://central.maven.org/maven2/org/twitter4j/twitter4j-core/4.0.2/twitter4j-core-4.0.2.jar
  1. Open the Spark shell, supplying the preceding three JARs as dependencies:
$ spark-shell --packages org.twitter4j:twitter4j-core:4.0.5,org.twitter4j:twitter4j-stream:4.0.5 --jars dstream-twitter_2.11-0.1.0.jar
  1. Perform imports that are Twitter-specific:
scala> import org.apache.spark.streaming.twitter._
scala> import twitter4j.auth._
scala> import twitter4j.conf._
  1. Stream specific imports:
scala> import org.apache.spark.streaming.{Seconds, StreamingContext}
  1. Do necessary imports:
scala>import org.apache.spark._
scala> import org.apache.spark.streaming._
scala> import org.apache.spark.streaming.StreamingContext._
  1. Create StreamingContext with a 10-second batch interval:
scala> val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
  1. Create StreamingContext with a 2-second batch interval:
scala> val cb = new ConfigurationBuilder
scala>cb.setDebugEnabled(true)
.setOAuthConsumerKey("FKNryYEKeCrKzGV7zuZW4EKeN")
.setOAuthConsumerSecret("x6Y0zcTBOwVxpvekSCnGzbi3NYN rM5b8ZMZRIPI1XRC3pDyOs1")
.setOAuthAccessToken("31548859-DHbESdk6YoghCLcfhMF8 8QEFDvEjxbM6Q90eoZTGl")
.setOAuthAccessTokenSecret("wjcWPvtejZSbp9cgLejUdd6W1 MJqFzm5lByUFZl1NYgrV")
val auth = new OAuthAuthorization(cb.build)
These are sample values; you put your own values.
  1. Create Twitter's DStream:
scala> val tweets = TwitterUtils.createStream(ssc,auth)
  1. Filter out English tweets:
scala> val englishTweets = tweets.filter(_.getLang()=="en")
  1. Get the text out of the tweets:
scala> val status = englishTweets.map(status => status.getText)
  1. Set the checkpoint directory:
scala> ssc.checkpoint("hdfs://localhost:9000/user/hduser/checkpoint")
  1. Start StreamingContext:
scala> ssc.start
scala> ssc.awaitTermination
  1. You can put all these commands together using :paste:
scala> :paste
import org.apache.spark.streaming.twitter._
import twitter4j.auth._
import twitter4j.conf._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
val cb = new ConfigurationBuilder
cb.setDebugEnabled(true).setOAuthConsumerKey("FKNryYEKe CrKzGV7zuZW4EKeN")
.setOAuthConsumerSecret("x6Y0zcTBOwVxpvekSCnGzbi3NYNr M5b8ZMZRIPI1XRC3pDyOs1")
.setOAuthAccessToken("31548859-DHbESdk6YoghCLcfhMF88Q EFDvEjxbM6Q90eoZTGl")
.setOAuthAccessTokenSecret("wjcWPvtejZSbp9cgLejUdd6W1 MJqFzm5lByUFZl1NYgrV")
val auth = new OAuthAuthorization(cb.build)
val tweets = TwitterUtils.createStream(ssc,Some(auth))
val englishTweets = tweets.filter(_.getLang()=="en")
val status = englishTweets.map(status => status.getText)
status.print
ssc.checkpoint("hdfs://localhost:9000/checkpoint")
ssc.start
ssc.awaitTermination
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset