It's always good to have some practical examples with real data sets, and what could be better than real-time social media data? In this section, we will write the code that will fetch tweets from Twitter in real time based on the search keywords provided. There are three dependencies of the code written in this section:
tweepy
is a Python client for Twitter.elasticsearch
is a Python client for Elasticsearch that we have already installed.After generating the auth
tokens and keys stored it inside config.py
with the variable names: consumer_key
, consumer_secret
, access_token
, and access_token_secret
. The next step is to install tweepy
using this command:
pip install tweepy
It's good to do some hands-on while creating mappings. For this, first you need to understand the JSON data of Twitter. You can view a sample on the following URL and accordingly create mappings with the appropriate field types and analyzers: https://gist.github.com/bharvidixit/0d35b7ac907127860e58.
Once the mapping is created, write the code to start fetching tweets and indexing them in our index with the name twitter
, and type tweet
:
from tweepy import OAuthHandler from tweepy import Stream from tweepy.streaming import StreamListener from elasticsearch import Elasticsearch import config import json es = Elasticsearch('localhost:9200') class StdOutListener(StreamListener): """A listener handles tweets that are received from the stream. This listener dumps the tweets into elasticsearch """ counter = 0 total_docs_to_be_indexed = 10000 def on_data(self, data): print data while self.total_docs_to_be_indexed > self.counter: tweet = json.loads(data) self.index_tweet(tweet) self.counter += 1 return True def index_tweet(self, tweet): es.index(index='twitter', doc_type='tweets',id=tweet['id_str'], body=tweet) def on_error(self, status): print status pass #code execution starts here if __name__ == '__main__': listener = StdOutListener() auth = OAuthHandler(config.consumer_key, config.consumer_secret) auth.set_access_token(config.access_token, config.access_token_secret) stream = Stream(auth, listener) #set the terms for tracking and fetching tweets from Twitter stream.filter(track=['crime', 'blast', 'earthquake', 'riot', 'politics'])