Twitter offers two APIs. One search API that essentially allows us to retrieve past tweets based on search terms. This is how we have been collecting our data from Twitter in the previous chapters of the book. Interestingly, for our current purpose, Twitter offers a live streaming API which allows to ingest tweets as they are emitted in the blogosphere.
The following program connects to the Twitter firehose and processes the incoming tweets to exclude deleted or invalid tweets and parses on the fly only the relevant ones to extract screen name
, the actual tweet, or tweet text
, retweet
count, geo-location
information. The processed tweets are gathered into an RDD Queue by Spark Streaming and then displayed on the console at a one-second interval:
""" Twitter Streaming API Spark Streaming into an RDD-Queue to process tweets live Create a queue of RDDs that will be mapped/reduced one at a time in 1 second intervals. To run this example use '$ bin/spark-submit examples/AN_Spark/AN_Spark_Code/s07_twitterstreaming.py' """ # import time from pyspark import SparkContext from pyspark.streaming import StreamingContext import twitter import dateutil.parser import json # Connecting Streaming Twitter with Streaming Spark via Queue class Tweet(dict): def __init__(self, tweet_in): super(Tweet, self).__init__(self) if tweet_in and 'delete' not in tweet_in: self['timestamp'] = dateutil.parser.parse(tweet_in[u'created_at'] ).replace(tzinfo=None).isoformat() self['text'] = tweet_in['text'].encode('utf-8') #self['text'] = tweet_in['text'] self['hashtags'] = [x['text'].encode('utf-8') for x in tweet_in['entities']['hashtags']] #self['hashtags'] = [x['text'] for x in tweet_in['entities']['hashtags']] self['geo'] = tweet_in['geo']['coordinates'] if tweet_in['geo'] else None self['id'] = tweet_in['id'] self['screen_name'] = tweet_in['user']['screen_name'].encode('utf-8') #self['screen_name'] = tweet_in['user']['screen_name'] self['user_id'] = tweet_in['user']['id'] def connect_twitter(): twitter_stream = twitter.TwitterStream(auth=twitter.OAuth( token = "get_your_own_credentials", token_secret = "get_your_own_credentials", consumer_key = "get_your_own_credentials", consumer_secret = "get_your_own_credentials")) return twitter_stream def get_next_tweet(twitter_stream): stream = twitter_stream.statuses.sample(block=True) tweet_in = None while not tweet_in or 'delete' in tweet_in: tweet_in = stream.next() tweet_parsed = Tweet(tweet_in) return json.dumps(tweet_parsed) def process_rdd_queue(twitter_stream): # Create the queue through which RDDs can be pushed to # a QueueInputDStream rddQueue = [] for i in range(3): rddQueue += [ssc.sparkContext.parallelize([get_next_tweet(twitter_stream)], 5)] lines = ssc.queueStream(rddQueue) lines.pprint() if __name__ == "__main__": sc = SparkContext(appName="PythonStreamingQueueStream") ssc = StreamingContext(sc, 1) # Instantiate the twitter_stream twitter_stream = connect_twitter() # Get RDD queue of the streams json or parsed process_rdd_queue(twitter_stream) ssc.start() time.sleep(2) ssc.stop(stopSparkContext=True, stopGraceFully=True)
When we run this program, it delivers the following output:
an@an-VB:~/spark/spark-1.5.0-bin-hadoop2.6$ bin/spark-submit examples/AN_Spark/AN_Spark_Code/s07_twitterstreaming.py ------------------------------------------- Time: 2015-11-03 21:53:14 ------------------------------------------- {"user_id": 3242732207, "screen_name": "cypuqygoducu", "timestamp": "2015-11-03T20:53:04", "hashtags": [], "text": "RT @VIralBuzzNewss: Our Distinctive Edition Holiday break Challenge Is In this article! Hooray!... - https://t.co/9d8wumrd5v https://t.co/u2026", "geo": null, "id": 661647303678259200} ------------------------------------------- Time: 2015-11-03 21:53:15 ------------------------------------------- {"user_id": 352673159, "screen_name": "melly_boo_orig", "timestamp": "2015-11-03T20:53:05", "hashtags": ["eminem"], "text": "#eminem https://t.co/GlEjPJnwxy", "geo": null, "id": 661647307847409668} ------------------------------------------- Time: 2015-11-03 21:53:16 ------------------------------------------- {"user_id": 500620889, "screen_name": "NBAtheist", "timestamp": "2015-11-03T20:53:06", "hashtags": ["tehInterwebbies", "Nutters"], "text": "See? That didn't take long or any actual effort. This is #tehInterwebbies ... #Nutters Abound! https://t.co/QS8gLStYFO", "geo": null, "id": 661647312062709761}
So, we got an example of streaming tweets with Spark and processing them on the fly.