Harvesting and storing data

Before delving into database persistent storage such as MongoDB, we will look at some useful file storages that are widely used: CSV (short for comma-separated values) and JSON (short for JavaScript Object Notation) file storage. The enduring popularity of these two file formats lies in a few key reasons: they are human readable, simple, relatively lightweight, and easy to use.

Persisting data in CSV

The CSV format is lightweight, human readable, and easy to use. It has delimited text columns with an inherent tabular schema.

Python offers a robust csv library that can serialize a csv file into a Python dictionary. For the purpose of our program, we have written a python class that manages to persist data in CSV format and read from a given CSV.

Let's run through the code of the class IO_csv object. The __init__ section of the class basically instantiates the file path, the filename, and the file suffix (in this case, .csv):

class IO_csv(object):

    def __init__(self, filepath, filename, filesuffix='csv'):
        self.filepath = filepath       # /path/to/file without the /' at the end
        self.filename = filename       # FILE_NAME
        self.filesuffix = filesuffix

The save method of the class uses a Python named tuple and the header fields of the csv file in order to impart a schema while persisting the rows of the CSV. If the csv file already exists, it will be appended and not overwritten otherwise; it will be created:

    def save(self, data, NTname, fields):
        # NTname = Name of the NamedTuple
        # fields = header of CSV - list of the fields name
        NTuple = namedtuple(NTname, fields)
        
        if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix)):
            # Append existing file
            with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'ab') as f:
                writer = csv.writer(f)
                # writer.writerow(fields) # fields = header of CSV
                writer.writerows([row for row in map(NTuple._make, data)])
                # list comprehension using map on the NamedTuple._make() iterable and the data file to be saved
                # Notice writer.writerows and not writer.writerow (i.e. list of multiple rows sent to csv file
        else:
            # Create new file
            with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'wb') as f:
                writer = csv.writer(f)
                writer.writerow(fields) # fields = header of CSV - list of the fields name
                writer.writerows([row for row in map(NTuple._make, data)])
                #  list comprehension using map on the NamedTuple._make() iterable and the data file to be saved
                # Notice writer.writerows and not writer.writerow (i.e. list of multiple rows sent to csv file

The load method of the class also uses a Python named tuple and the header fields of the csv file in order to retrieve the data using a consistent schema. The load method is a memory-efficient generator to avoid loading a huge file in memory: hence we use yield in place of return:

    def load(self, NTname, fields):
        # NTname = Name of the NamedTuple
        # fields = header of CSV - list of the fields name
        NTuple = namedtuple(NTname, fields)
        with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix),'rU') as f:
            reader = csv.reader(f)
            for row in map(NTuple._make, reader):
                # Using map on the NamedTuple._make() iterable and the reader file to be loaded
                yield row 

Here's the named tuple. We are using it to parse the tweet in order to save or retrieve them to and from the csv file:

fields01 = ['id', 'created_at', 'user_id', 'user_name', 'tweet_text', 'url']
Tweet01 = namedtuple('Tweet01',fields01)

def parse_tweet(data):
    """
    Parse a ``tweet`` from the given response data.
    """
    return Tweet01(
        id=data.get('id', None),
        created_at=data.get('created_at', None),
        user_id=data.get('user_id', None),
        user_name=data.get('user_name', None),
        tweet_text=data.get('tweet_text', None),
        url=data.get('url')
    )

Persisting data in JSON

JSON is one of the most popular data formats for Internet-based applications. All the APIs we are dealing with, Twitter, GitHub, and Meetup, deliver their data in JSON format. The JSON format is relatively lightweight compared to XML and human readable, and the schema is embedded in JSON. As opposed to the CSV format, where all records follow exactly the same tabular structure, JSON records can vary in their structure. JSON is semi-structured. A JSON record can be mapped into a Python dictionary of dictionaries.

Let's run through the code of the class IO_json object. The __init__ section of the class basically instantiates the file path, the filename, and the file suffix (in this case, .json):

class IO_json(object):
    def __init__(self, filepath, filename, filesuffix='json'):
        self.filepath = filepath        # /path/to/file without the /' at the end
        self.filename = filename        # FILE_NAME
        self.filesuffix = filesuffix
        # self.file_io = os.path.join(dir_name, .'.join((base_filename, filename_suffix)))

The save method of the class uses utf-8 encoding in order to ensure read and write compatibility of the data. If the JSON file already exists, it will be appended and not overwritten; otherwise it will be created:

    def save(self, data):
        if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix)):
            # Append existing file
            with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'a', encoding='utf-8') as f:
                f.write(unicode(json.dumps(data, ensure_ascii= False))) # In python 3, there is no "unicode" function 
                # f.write(json.dumps(data, ensure_ascii= False)) # create a " escape char for " in the saved file        
        else:
            # Create new file
            with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'w', encoding='utf-8') as f:
                f.write(unicode(json.dumps(data, ensure_ascii= False)))
                # f.write(json.dumps(data, ensure_ascii= False))

The load method of the class just returns the file that has been read. A further json.loads function needs to be applied in order to retrieve the json out of the file read:

    def load(self):
        with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), encoding='utf-8') as f:
            return f.read()

Setting up MongoDB

It is crucial to store the information harvested. Thus, we set up MongoDB as our main document data store. As all the information collected is in JSON format and MongoDB stores information in BSON (short for Binary JSON), it is therefore a natural choice.

We will run through the following steps now:

  • Installing the MongoDB server and client
  • Running the MongoDB server
  • Running the Mongo client
  • Installing the PyMongo driver
  • Creating the Python Mongo client

Installing the MongoDB server and client

In order to install the MongoDB package, perform through the following steps:

  1. Import the public key used by the package management system (in our case, Ubuntu's apt). To import the MongoDB public key, we issue the following command:
    sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10
    
  2. Create a list file for MongoDB. To create the list file, we use the following command:
    echo "deb http://repo.mongodb.org/apt/ubuntu "$("lsb_release -sc)"/ mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.0.list
    
  3. Update the local package database as sudo:
    sudo apt-get update
    
  4. Install the MongoDB packages. We install the latest stable version of MongoDB with the following command:
    sudo apt-get install -y mongodb-org
    

Running the MongoDB server

Let's start the MongoDB server:

  1. To start MongoDB server, we issue the following command to start mongod:
    sudo service mongodb start
    
  2. To check whether mongod has started properly, we issue the command:
    an@an-VB:/usr/bin$ ps -ef | grep mongo
    mongodb    967     1  4 07:03 ?        00:02:02 /usr/bin/mongod --config /etc/mongod.conf
    an        3143  3085  0 07:45 pts/3    00:00:00 grep --color=auto mongo
    

    In this case, we see that mongodb is running in process 967.

  3. The mongod server sends a message to the effect that it is waiting for connection on port 27017. This is the default port for MongoDB. It can be changed in the configuration file.
  4. We can check the contents of the log file at /var/log/mongod/mongod.log:
    an@an-VB:/var/lib/mongodb$ ls -lru
    total 81936
    drwxr-xr-x 2 mongodb nogroup     4096 Apr 25 11:19 _tmp
    -rw-r--r-- 1 mongodb nogroup       69 Apr 25 11:19 storage.bson
    -rwxr-xr-x 1 mongodb nogroup        5 Apr 25 11:19 mongod.lock
    -rw------- 1 mongodb nogroup 16777216 Apr 25 11:19 local.ns
    -rw------- 1 mongodb nogroup 67108864 Apr 25 11:19 local.0
    drwxr-xr-x 2 mongodb nogroup     4096 Apr 25 11:19 journal
    
  5. In order to stop the mongodb server, just issue the following command:
    sudo service mongodb stop
    

Running the Mongo client

Running the Mongo client in the console is as easy as calling mongo, as highlighted in the following command:

an@an-VB:/usr/bin$ mongo
MongoDB shell version: 3.0.2
connecting to: test
Server has startup warnings: 
2015-05-30T07:03:49.387+0200 I CONTROL  [initandlisten] 
2015-05-30T07:03:49.388+0200 I CONTROL  [initandlisten] 

At the mongo client console prompt, we can see the databases with the following commands:

> show dbs
local  0.078GB
test   0.078GB

We select the test database using use test:

> use test
switched to db test

We display the collections within the test database:

> show collections
restaurants
system.indexes

We check a sample record in the restaurant collection listed previously:

> db.restaurants.find()
{ "_id" : ObjectId("553b70055e82e7b824ae0e6f"), "address : { "building : "1007", "coord" : [ -73.856077, 40.848447 ], "street : "Morris Park Ave", "zipcode : "10462 }, "borough : "Bronx", "cuisine : "Bakery", "grades : [ { "grade : "A", "score" : 2, "date" : ISODate("2014-03-03T00:00:00Z") }, { "date" : ISODate("2013-09-11T00:00:00Z"), "grade : "A", "score" : 6 }, { "score" : 10, "date" : ISODate("2013-01-24T00:00:00Z"), "grade : "A }, { "date" : ISODate("2011-11-23T00:00:00Z"), "grade : "A", "score" : 9 }, { "date" : ISODate("2011-03-10T00:00:00Z"), "grade : "B", "score" : 14 } ], "name : "Morris Park Bake Shop", "restaurant_id : "30075445" }

Installing the PyMongo driver

Installing the Python driver with anaconda is easy. Just run the following command at the terminal:

conda install pymongo

Creating the Python client for MongoDB

We are creating a IO_mongo class that will be used in our harvesting and processing programs to store the data collected and retrieved saved information. In order to create the mongo client, we will import the MongoClient module from pymongo. We connect to the mongodb server on localhost at port 27017. The command is as follows:

from pymongo import MongoClient as MCli

class IO_mongo(object):
    conn={'host':'localhost', 'ip':'27017'}

We initialize our class with the client connection, the database (in this case, twtr_db), and the collection (in this case, twtr_coll) to be accessed:

    def __init__(self, db='twtr_db', coll='twtr_coll', **conn ):
        # Connects to the MongoDB server 
        self.client = MCli(**conn)
        self.db = self.client[db]
        self.coll = self.db[coll]

The save method inserts new records in the preinitialized collection and database:

    def save(self, data):
        # Insert to collection in db  
        return self.coll.insert(data)

The load method allows the retrieval of specific records according to criteria and projection. In the case of large amount of data, it returns a cursor:

    def load(self, return_cursor=False, criteria=None, projection=None):

            if criteria is None:
                criteria = {}

            if projection is None:
                cursor = self.coll.find(criteria)
            else:
                cursor = self.coll.find(criteria, projection)

            # Return a cursor for large amounts of data
            if return_cursor:
                return cursor
            else:
                return [ item for item in cursor ]

Harvesting data from Twitter

Each social network poses its limitations and challenges. One of the main obstacles for harvesting data is an imposed rate limit. While running repeated or long-running connections between rates limit pauses, we have to be careful to avoid collecting duplicate data.

We have redesigned our connection programs outlined in the previous chapter to take care of the rate limits.

In this TwitterAPI class that connects and collects the tweets according to the search query we specify, we have added the following:

  • Logging capability using the Python logging library with the aim of collecting any errors or warning in the case of program failure
  • Persistence capability using MongoDB, with the IO_mongo class exposed previously as well as JSON file using the IO_json class
  • API rate limit and error management capability, so we can ensure more resilient calls to Twitter without getting barred for tapping into the firehose

Let's go through the steps:

  1. We initialize by instantiating the Twitter API with our credentials:
    class TwitterAPI(object):
        """
        TwitterAPI class allows the Connection to Twitter via OAuth
        once you have registered with Twitter and receive the 
        necessary credentials 
        """
    
        def __init__(self): 
            consumer_key = 'get_your_credentials'
            consumer_secret = get your_credentials'
            access_token = 'get_your_credentials'
            access_secret = 'get your_credentials'
            self.consumer_key = consumer_key
            self.consumer_secret = consumer_secret
            self.access_token = access_token
            self.access_secret = access_secret
            self.retries = 3
            self.auth = twitter.oauth.OAuth(access_token, access_secret, consumer_key, consumer_secret)
            self.api = twitter.Twitter(auth=self.auth)
  2. We initialize the logger by providing the log level:
    • logger.debug(debug message)
    • logger.info(info message)
    • logger.warn(warn message)
    • logger.error(error message)
    • logger.critical(critical message)
  3. We set the log path and the message format:
            # logger initialisation
            appName = 'twt150530'
            self.logger = logging.getLogger(appName)
            #self.logger.setLevel(logging.DEBUG)
            # create console handler and set level to debug
            logPath = '/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data'
            fileName = appName
            fileHandler = logging.FileHandler("{0}/{1}.log".format(logPath, fileName))
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            fileHandler.setFormatter(formatter)
            self.logger.addHandler(fileHandler) 
            self.logger.setLevel(logging.DEBUG)
  4. We initialize the JSON file persistence instruction:
            # Save to JSON file initialisation
            jsonFpath = '/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data'
            jsonFname = 'twtr15053001'
            self.jsonSaver = IO_json(jsonFpath, jsonFname)
  5. We initialize the MongoDB database and collection for persistence:
            # Save to MongoDB Intitialisation
            self.mongoSaver = IO_mongo(db='twtr01_db', coll='twtr01_coll')
  6. The method searchTwitter launches the search according to the query specified:
        def searchTwitter(self, q, max_res=10,**kwargs):
            search_results = self.api.search.tweets(q=q, count=10, **kwargs)
            statuses = search_results['statuses']
            max_results = min(1000, max_res)
            
            for _ in range(10):
                try:
                    next_results = search_results['search_metadata']['next_results']
                    # self.logger.info('info' in searchTwitter - next_results:%s'% next_results[1:])
                except KeyError as e:
                    self.logger.error('error' in searchTwitter: %s', %(e))
                    break
                
                # next_results = urlparse.parse_qsl(next_results[1:]) # python 2.7
                next_results = urllib.parse.parse_qsl(next_results[1:])
                # self.logger.info('info' in searchTwitter - next_results[max_id]:', next_results[0:])
                kwargs = dict(next_results)
                # self.logger.info('info' in searchTwitter - next_results[max_id]:%s'% kwargs['max_id'])
                search_results = self.api.search.tweets(**kwargs)
                statuses += search_results['statuses']
                self.saveTweets(search_results['statuses'])
                
                if len(statuses) > max_results:
                    self.logger.info('info' in searchTwitter - got %i tweets - max: %i' %(len(statuses), max_results))
                    break
            return statuses
  7. The saveTweets method actually saves the collected tweets in JSON and in MongoDB:
        def saveTweets(self, statuses):
            # Saving to JSON File
            self.jsonSaver.save(statuses)
            
            # Saving to MongoDB
            for s in statuses:
                self.mongoSaver.save(s)
  8. The parseTweets method allows us to extract the key tweet information from the vast amount of information provided by the Twitter API:
        def parseTweets(self, statuses):
            return [ (status['id'], 
                      status['created_at'], 
                      status['user']['id'],
                      status['user']['name'] 
                      status['text''text'], 
                      url['expanded_url']) 
                            for status in statuses 
                                for url in status['entities']['urls'] ]
  9. The getTweets method calls the searchTwitter method described previously. The getTweets method ensures that API calls are made reliably whilst respecting the imposed rate limit. The code is as follows:
        def getTweets(self, q,  max_res=10):
            """
            Make a Twitter API call whilst managing rate limit and errors.
            """
            def handleError(e, wait_period=2, sleep_when_rate_limited=True):
                if wait_period > 3600: # Seconds
                    self.logger.error('Too many retries in getTweets: %s', %(e))
                    raise e
                if e.e.code == 401:
                    self.logger.error('error 401 * Not Authorised * in getTweets: %s', %(e))
                    return None
                elif e.e.code == 404:
                    self.logger.error('error 404 * Not Found * in getTweets: %s', %(e))
                    return None
                elif e.e.code == 429: 
                    self.logger.error('error 429 * API Rate Limit Exceeded * in getTweets: %s', %(e))
                    if sleep_when_rate_limited:
                        self.logger.error('error 429 * Retrying in 15 minutes * in getTweets: %s', %(e))
                        sys.stderr.flush()
                        time.sleep(60*15 + 5)
                        self.logger.info('error 429 * Retrying now * in getTweets: %s', %(e))
                        return 2
                    else:
                        raise e # Caller must handle the rate limiting issue
                elif e.e.code in (500, 502, 503, 504):
                    self.logger.info('Encountered %i Error. Retrying in %i seconds' % (e.e.code, wait_period))
                    time.sleep(wait_period)
                    wait_period *= 1.5
                    return wait_period
                else:
                    self.logger.error('Exit - aborting - %s', %(e))
                    raise e
  10. Here, we are calling the searchTwitter API with the relevant query based on the parameters specified. If we encounter any error such as rate limitation from the provider, this will be processed by the handleError method:
            while True:
                try:
                    self.searchTwitter( q, max_res=10)
                except twitter.api.TwitterHTTPError as e:
                    error_count = 0 
                    wait_period = handleError(e, wait_period)
                    if wait_period is None:
                        return
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset