Before delving into database persistent storage such as MongoDB, we will look at some useful file storages that are widely used: CSV (short for comma-separated values) and JSON (short for JavaScript Object Notation) file storage. The enduring popularity of these two file formats lies in a few key reasons: they are human readable, simple, relatively lightweight, and easy to use.
The CSV format is lightweight, human readable, and easy to use. It has delimited text columns with an inherent tabular schema.
Python offers a robust csv
library that can serialize a csv
file into a Python dictionary. For the purpose of our program, we have written a python
class that manages to persist data in CSV format and read from a given CSV.
Let's run through the code of the class IO_csv
object. The __init__
section of the class basically instantiates the file path, the filename, and the file suffix (in this case, .csv
):
class IO_csv(object): def __init__(self, filepath, filename, filesuffix='csv'): self.filepath = filepath # /path/to/file without the /' at the end self.filename = filename # FILE_NAME self.filesuffix = filesuffix
The save
method of the class uses a Python named tuple and the header fields of the csv
file in order to impart a schema while persisting the rows of the CSV. If the csv
file already exists, it will be appended and not overwritten otherwise; it will be created:
def save(self, data, NTname, fields): # NTname = Name of the NamedTuple # fields = header of CSV - list of the fields name NTuple = namedtuple(NTname, fields) if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix)): # Append existing file with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'ab') as f: writer = csv.writer(f) # writer.writerow(fields) # fields = header of CSV writer.writerows([row for row in map(NTuple._make, data)]) # list comprehension using map on the NamedTuple._make() iterable and the data file to be saved # Notice writer.writerows and not writer.writerow (i.e. list of multiple rows sent to csv file else: # Create new file with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'wb') as f: writer = csv.writer(f) writer.writerow(fields) # fields = header of CSV - list of the fields name writer.writerows([row for row in map(NTuple._make, data)]) # list comprehension using map on the NamedTuple._make() iterable and the data file to be saved # Notice writer.writerows and not writer.writerow (i.e. list of multiple rows sent to csv file
The load
method of the class also uses a Python named tuple and the header fields of the csv
file in order to retrieve the data using a consistent schema. The load
method is a memory-efficient generator to avoid loading a huge file in memory: hence we use yield
in place of return
:
def load(self, NTname, fields): # NTname = Name of the NamedTuple # fields = header of CSV - list of the fields name NTuple = namedtuple(NTname, fields) with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix),'rU') as f: reader = csv.reader(f) for row in map(NTuple._make, reader): # Using map on the NamedTuple._make() iterable and the reader file to be loaded yield row
Here's the named tuple. We are using it to parse the tweet in order to save or retrieve them to and from the csv
file:
fields01 = ['id', 'created_at', 'user_id', 'user_name', 'tweet_text', 'url'] Tweet01 = namedtuple('Tweet01',fields01) def parse_tweet(data): """ Parse a ``tweet`` from the given response data. """ return Tweet01( id=data.get('id', None), created_at=data.get('created_at', None), user_id=data.get('user_id', None), user_name=data.get('user_name', None), tweet_text=data.get('tweet_text', None), url=data.get('url') )
JSON is one of the most popular data formats for Internet-based applications. All the APIs we are dealing with, Twitter, GitHub, and Meetup, deliver their data in JSON format. The JSON format is relatively lightweight compared to XML and human readable, and the schema is embedded in JSON. As opposed to the CSV format, where all records follow exactly the same tabular structure, JSON records can vary in their structure. JSON is semi-structured. A JSON record can be mapped into a Python dictionary of dictionaries.
Let's run through the code of the class IO_json
object. The __init__
section of the class basically instantiates the file path, the filename, and the file suffix (in this case, .json
):
class IO_json(object): def __init__(self, filepath, filename, filesuffix='json'): self.filepath = filepath # /path/to/file without the /' at the end self.filename = filename # FILE_NAME self.filesuffix = filesuffix # self.file_io = os.path.join(dir_name, .'.join((base_filename, filename_suffix)))
The save
method of the class uses utf-8
encoding in order to ensure read and write compatibility of the data. If the JSON file already exists, it will be appended and not overwritten; otherwise it will be created:
def save(self, data): if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix)): # Append existing file with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'a', encoding='utf-8') as f: f.write(unicode(json.dumps(data, ensure_ascii= False))) # In python 3, there is no "unicode" function # f.write(json.dumps(data, ensure_ascii= False)) # create a " escape char for " in the saved file else: # Create new file with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'w', encoding='utf-8') as f: f.write(unicode(json.dumps(data, ensure_ascii= False))) # f.write(json.dumps(data, ensure_ascii= False))
The load
method of the class just returns the file that has been read. A further json.loads
function needs to be applied in order to retrieve the json
out of the file read:
def load(self): with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), encoding='utf-8') as f: return f.read()
It is crucial to store the information harvested. Thus, we set up MongoDB as our main document data store. As all the information collected is in JSON format and MongoDB stores information in BSON (short for Binary JSON), it is therefore a natural choice.
We will run through the following steps now:
In order to install the MongoDB package, perform through the following steps:
apt
). To import the MongoDB public key, we issue the following command:sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10
echo "deb http://repo.mongodb.org/apt/ubuntu "$("lsb_release -sc)"/ mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.0.list
sudo
:sudo apt-get update
sudo apt-get install -y mongodb-org
Let's start the MongoDB server:
mongod
:sudo service mongodb start
mongod
has started properly, we issue the command: an@an-VB:/usr/bin$ ps -ef | grep mongo mongodb 967 1 4 07:03 ? 00:02:02 /usr/bin/mongod --config /etc/mongod.conf an 3143 3085 0 07:45 pts/3 00:00:00 grep --color=auto mongo
In this case, we see that mongodb
is running in process 967
.
mongod
server sends a message to the effect that it is waiting for connection on port 27017
. This is the default port for MongoDB. It can be changed in the configuration file./var/log/mongod/mongod.log
:an@an-VB:/var/lib/mongodb$ ls -lru total 81936 drwxr-xr-x 2 mongodb nogroup 4096 Apr 25 11:19 _tmp -rw-r--r-- 1 mongodb nogroup 69 Apr 25 11:19 storage.bson -rwxr-xr-x 1 mongodb nogroup 5 Apr 25 11:19 mongod.lock -rw------- 1 mongodb nogroup 16777216 Apr 25 11:19 local.ns -rw------- 1 mongodb nogroup 67108864 Apr 25 11:19 local.0 drwxr-xr-x 2 mongodb nogroup 4096 Apr 25 11:19 journal
mongodb
server, just issue the following command:sudo service mongodb stop
Running the Mongo client in the console is as easy as calling mongo
, as highlighted in the following command:
an@an-VB:/usr/bin$ mongo MongoDB shell version: 3.0.2 connecting to: test Server has startup warnings: 2015-05-30T07:03:49.387+0200 I CONTROL [initandlisten] 2015-05-30T07:03:49.388+0200 I CONTROL [initandlisten]
At the mongo client console prompt, we can see the databases with the following commands:
> show dbs local 0.078GB test 0.078GB
We select the test database using use test
:
> use test switched to db test
We display the collections within the test database:
> show collections restaurants system.indexes
We check a sample record in the restaurant collection listed previously:
> db.restaurants.find() { "_id" : ObjectId("553b70055e82e7b824ae0e6f"), "address : { "building : "1007", "coord" : [ -73.856077, 40.848447 ], "street : "Morris Park Ave", "zipcode : "10462 }, "borough : "Bronx", "cuisine : "Bakery", "grades : [ { "grade : "A", "score" : 2, "date" : ISODate("2014-03-03T00:00:00Z") }, { "date" : ISODate("2013-09-11T00:00:00Z"), "grade : "A", "score" : 6 }, { "score" : 10, "date" : ISODate("2013-01-24T00:00:00Z"), "grade : "A }, { "date" : ISODate("2011-11-23T00:00:00Z"), "grade : "A", "score" : 9 }, { "date" : ISODate("2011-03-10T00:00:00Z"), "grade : "B", "score" : 14 } ], "name : "Morris Park Bake Shop", "restaurant_id : "30075445" }
Installing the Python driver with anaconda is easy. Just run the following command at the terminal:
conda install pymongo
We are creating a IO_mongo
class that will be used in our harvesting and processing programs to store the data collected and retrieved saved information. In order to create the mongo
client, we will import the MongoClient
module from pymongo
. We connect to the mongodb
server on localhost at port 27017. The command is as follows:
from pymongo import MongoClient as MCli class IO_mongo(object): conn={'host':'localhost', 'ip':'27017'}
We initialize our class with the client connection, the database (in this case, twtr_db
), and the collection (in this case, twtr_coll
) to be accessed:
def __init__(self, db='twtr_db', coll='twtr_coll', **conn ): # Connects to the MongoDB server self.client = MCli(**conn) self.db = self.client[db] self.coll = self.db[coll]
The save
method inserts new records in the preinitialized collection and database:
def save(self, data): # Insert to collection in db return self.coll.insert(data)
The load
method allows the retrieval of specific records according to criteria and projection. In the case of large amount of data, it returns a cursor:
def load(self, return_cursor=False, criteria=None, projection=None): if criteria is None: criteria = {} if projection is None: cursor = self.coll.find(criteria) else: cursor = self.coll.find(criteria, projection) # Return a cursor for large amounts of data if return_cursor: return cursor else: return [ item for item in cursor ]
Each social network poses its limitations and challenges. One of the main obstacles for harvesting data is an imposed rate limit. While running repeated or long-running connections between rates limit pauses, we have to be careful to avoid collecting duplicate data.
We have redesigned our connection programs outlined in the previous chapter to take care of the rate limits.
In this TwitterAPI
class that connects and collects the tweets according to the search query we specify, we have added the following:
IO_mongo
class exposed previously as well as JSON file using the IO_json
classLet's go through the steps:
class TwitterAPI(object): """ TwitterAPI class allows the Connection to Twitter via OAuth once you have registered with Twitter and receive the necessary credentials """ def __init__(self): consumer_key = 'get_your_credentials' consumer_secret = get your_credentials' access_token = 'get_your_credentials' access_secret = 'get your_credentials' self.consumer_key = consumer_key self.consumer_secret = consumer_secret self.access_token = access_token self.access_secret = access_secret self.retries = 3 self.auth = twitter.oauth.OAuth(access_token, access_secret, consumer_key, consumer_secret) self.api = twitter.Twitter(auth=self.auth)
logger.debug
(debug message)logger.info
(info message)logger.warn
(warn message)logger.error
(error message)logger.critical
(critical message)# logger initialisation appName = 'twt150530' self.logger = logging.getLogger(appName) #self.logger.setLevel(logging.DEBUG) # create console handler and set level to debug logPath = '/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data' fileName = appName fileHandler = logging.FileHandler("{0}/{1}.log".format(logPath, fileName)) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') fileHandler.setFormatter(formatter) self.logger.addHandler(fileHandler) self.logger.setLevel(logging.DEBUG)
# Save to JSON file initialisation jsonFpath = '/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data' jsonFname = 'twtr15053001' self.jsonSaver = IO_json(jsonFpath, jsonFname)
# Save to MongoDB Intitialisation self.mongoSaver = IO_mongo(db='twtr01_db', coll='twtr01_coll')
searchTwitter
launches the search according to the query specified:def searchTwitter(self, q, max_res=10,**kwargs): search_results = self.api.search.tweets(q=q, count=10, **kwargs) statuses = search_results['statuses'] max_results = min(1000, max_res) for _ in range(10): try: next_results = search_results['search_metadata']['next_results'] # self.logger.info('info' in searchTwitter - next_results:%s'% next_results[1:]) except KeyError as e: self.logger.error('error' in searchTwitter: %s', %(e)) break # next_results = urlparse.parse_qsl(next_results[1:]) # python 2.7 next_results = urllib.parse.parse_qsl(next_results[1:]) # self.logger.info('info' in searchTwitter - next_results[max_id]:', next_results[0:]) kwargs = dict(next_results) # self.logger.info('info' in searchTwitter - next_results[max_id]:%s'% kwargs['max_id']) search_results = self.api.search.tweets(**kwargs) statuses += search_results['statuses'] self.saveTweets(search_results['statuses']) if len(statuses) > max_results: self.logger.info('info' in searchTwitter - got %i tweets - max: %i' %(len(statuses), max_results)) break return statuses
saveTweets
method actually saves the collected tweets in JSON and in MongoDB:def saveTweets(self, statuses): # Saving to JSON File self.jsonSaver.save(statuses) # Saving to MongoDB for s in statuses: self.mongoSaver.save(s)
parseTweets
method allows us to extract the key tweet information from the vast amount of information provided by the Twitter API:def parseTweets(self, statuses): return [ (status['id'], status['created_at'], status['user']['id'], status['user']['name'] status['text''text'], url['expanded_url']) for status in statuses for url in status['entities']['urls'] ]
getTweets
method calls the searchTwitter
method described previously. The getTweets
method ensures that API calls are made reliably whilst respecting the imposed rate limit. The code is as follows:def getTweets(self, q, max_res=10): """ Make a Twitter API call whilst managing rate limit and errors. """ def handleError(e, wait_period=2, sleep_when_rate_limited=True): if wait_period > 3600: # Seconds self.logger.error('Too many retries in getTweets: %s', %(e)) raise e if e.e.code == 401: self.logger.error('error 401 * Not Authorised * in getTweets: %s', %(e)) return None elif e.e.code == 404: self.logger.error('error 404 * Not Found * in getTweets: %s', %(e)) return None elif e.e.code == 429: self.logger.error('error 429 * API Rate Limit Exceeded * in getTweets: %s', %(e)) if sleep_when_rate_limited: self.logger.error('error 429 * Retrying in 15 minutes * in getTweets: %s', %(e)) sys.stderr.flush() time.sleep(60*15 + 5) self.logger.info('error 429 * Retrying now * in getTweets: %s', %(e)) return 2 else: raise e # Caller must handle the rate limiting issue elif e.e.code in (500, 502, 503, 504): self.logger.info('Encountered %i Error. Retrying in %i seconds' % (e.e.code, wait_period)) time.sleep(wait_period) wait_period *= 1.5 return wait_period else: self.logger.error('Exit - aborting - %s', %(e)) raise e
searchTwitter
API with the relevant query based on the parameters specified. If we encounter any error such as rate limitation from the provider, this will be processed by the handleError
method:while True: try: self.searchTwitter( q, max_res=10) except twitter.api.TwitterHTTPError as e: error_count = 0 wait_period = handleError(e, wait_period) if wait_period is None: return