Working with large data sources

Most of the data that users feed into matplotlib when generating plots is from NumPy. NumPy is one of the fastest ways of processing numerical and array-based data in Python (if not the fastest), so this makes sense. However by default, NumPy works on in-memory database. If the dataset that you want to plot is larger than the total RAM available on your system, performance is going to plummet.

In the following section, we're going to take a look at an example that illustrates this limitation. But first, let's get our notebook set up, as follows:

In [1]: import matplotlib
        matplotlib.use('nbagg')
        %matplotlib inline

Here are the modules that we are going to use:

In [2]: import glob, io, math, os
        import psutil
        import numpy as np
        import pandas as pd
        import tables as tb
        from scipy import interpolate
        from scipy.stats import burr, norm
        import matplotlib as mpl
        import matplotlib.pyplot as plt
        from IPython.display import Image

We'll use the custom style sheet that we created earlier, as follows:

In [3]: plt.style.use("../styles/superheroine-2.mplstyle")

An example problem

To keep things manageable for an in-memory example, we're going to limit our generated dataset to 100 million points by using one of SciPy's many statistical distributions, as follows:

In [4]: (c, d) = (10.8, 4.2)
        (mean, var, skew, kurt) = burr.stats(c, d, moments='mvsk')

The Burr distribution, also known as the Singh–Maddala distribution, is commonly used to model household income. Next, we'll use the burr object's method to generate a random population with our desired count, as follows:

In [5]: r = burr.rvs(c, d, size=100000000)

Creating 100 million data points in the last call took about 10 seconds on a moderately recent workstation, with the RAM usage peaking at about 2.25 GB (before the garbage collection kicked in).

Let's make sure that it's the size we expect, as follows:

In [6]: len(r)
Out[6]: 100000000

If we save this to a file, it weighs in at about three-fourths of a gigabyte:

In [7]: r.tofile("../data/points.bin")
In [8]: ls -alh ../data/points.bin
        -rw-r--r-- 1 oubiwann staff 763M Mar 20 11:35 points.bin

This actually does fit in the memory on a machine with a RAM of 8 GB, but generating much larger files tends to be problematic. We can reuse it multiple times though, to reach a size that is larger than what can fit in the system RAM.

Before we do this, let's take a look at what we've got by generating a smooth curve for the probability distribution, as follows:

In [9]:  x = np.linspace(burr.ppf(0.0001, c, d),
                         burr.ppf(0.9999, c, d), 100)
         y = burr.pdf(x, c, d)
In [10]: (figure, axes) = plt.subplots(figsize=(20, 10))
         axes.plot(x, y, linewidth=5, alpha=0.7)
         axes.hist(r, bins=100, normed=True)
         plt.show()

The following plot is the result of the preceding code:

An example problem

Our plot of the Burr probability distribution function, along with the 100-bin histogram with a sample size of 100 million points, took about 7 seconds to render. This is due to the fact that NumPy handles most of the work, and we only displayed a limited number of visual elements. What would happen if we did try to plot all the 100 million points? This can be checked by the following code:

In [11]: (figure, axes) = plt.subplots()
         axes.plot(r)
         plt.show()
formatters.py:239: FormatterWarning:
Exception in image/png formatter: Allocated too many blocks

After about 30 seconds of crunching, the preceding error was thrown—the Agg backend (a shared library) simply couldn't handle the number of artists required to render all the points. We'll examine this sort of situation towards the end of the chapter and discuss ways to work around it.

But for now, this case clarifies the point that we stated a while back—our first plot rendered relatively quickly because we were selective about the data we chose to present, given the large number of points with which we are working.

However, let's say we have data from the files that are too large to fit into the memory. What do we do about this? Possible ways to address this include the following:

  • Moving the data out of the memory and into the filesystem
  • Moving the data off the filesystem and into the databases

We will explore examples of these in the following section.

Big data on the filesystem

The first of the two proposed solutions for large datasets involves not burdening the system memory with data, but rather leaving it on the filesystem. There are several ways to accomplish this, but the following two methods in particular are the most common in the world of NumPy and matplotlib:

  • NumPy's memmap function: This function creates memory-mapped files that are useful if you wish to access small segments of large files on the disk without having to read the whole file into the memory.
  • PyTables: This is a package that is used to manage hierarchical datasets. It is built on the top of the HDF5 and NumPy libraries and is designed to efficiently and easily cope with extremely large amounts of data.

We will examine each in turn.

NumPy's memmap function

Let's restart the IPython kernel by going to the IPython menu at the top of notebook page, selecting Kernel, and then clicking on Restart. When the dialog box pops up, click on Restart. Then, re-execute the first few lines of the notebook by importing the required libraries and getting our style sheet set up.

Once the kernel is restarted, take a look at the RAM utilization on your system for a fresh Python process for the notebook:

In [4]: Image("memory-before.png")
Out[4]:

The following screenshot shows the RAM utilization for a fresh Python process:

NumPy's memmap function

Now, let's load the array data that we previously saved to disk and recheck the memory utilization, as follows:

In [5]: data = np.fromfile("../data/points.bin")
        data_shape = data.shape
        data_len = len(data)
        data_len
Out[5]: 100000000
In [6]: Image("memory-after.png")
Out[6]:

The following screenshot shows the memory utilization after loading the array data:

NumPy's memmap function

This took about five seconds to load, with the memory consumption equivalent to the file size of the data. This means that if we wanted to build some sample data that was too large to fit in the memory, we'd need about 11 of those files concatenated, as follows:

In [7]: 8 * 1024
Out[7]: 8192
In [8]: filesize = 763
        8192 / filesize
Out[8]: 10.73656618610747

However, this is only if the entire memory was available. Let's see how much memory is available right now, as follows:

In  [9]: del data
In [10]: psutil.virtual_memory().available / 1024**2
Out[10]: 2449.1796875

That's 2.5 GB. So, to overrun our RAM, we'll just need a fraction of the total. This is done in the following way:

In [11]: 2449 / filesize
Out[11]: 3.2096985583224114

The preceding output means that we only need four of our original files to create a file that won't fit in memory. However, in the following section, we will still use 11 files to ensure that data, if loaded into the memory, will be much larger than the memory.

How do we create this large file for demonstration purposes (knowing that in a real-life situation, the data would already be created and potentially quite large)? We can try to use numpy.tile to create a file of the desired size (larger than memory), but this can make our system unusable for a significant period of time. Instead, let's use numpy.memmap, which will treat a file on the disk as an array, thus letting us work with data that is too large to fit into the memory.

Let's load the data file again, but this time as a memory-mapped array, as follows:

In [12]: data = np.memmap(
            "../data/points.bin", mode="r", shape=data_shape)

The loading of the array to a memmap object was very quick (compared to the process of bringing the contents of the file into the memory), taking less than a second to complete. Now, let's create a new file to write the data to. This file must be larger in size as compared to our total system memory (if held on in-memory database, it will be smaller on the disk):

In [13]: big_data_shape = (data_len * 11,)
         big_data = np.memmap(
             "../data/many-points.bin", dtype="uint8",
             mode="w+", shape=big_data_shape)

The preceding code creates a 1 GB file, which is mapped to an array that has the shape we requested and just contains zeros:

In [14]: ls -alh ../data/many-points.bin
         -rw-r--r-- 1 oubiwann staff 1.0G Apr 2 11:35 many-points.bin
In [15]: big_data.shape
Out[15]: (1100000000,)
In [16]: big_data
Out[16]: memmap([0, 0, 0, ..., 0, 0, 0], dtype=uint8)

Now, let's fill the empty data structure with copies of the data we saved to the 763 MB file, as follows:

In [17]: for x in range(11):
             start = x * data_len
             end = (x * data_len) + data_len
             big_data[start:end] = data
         big_data
Out[17]: memmap([ 90, 71, 15, ..., 33, 244, 63], dtype=uint8)

If you check your system memory before and after, you will only see minimal changes, which confirms that we are not creating an 8 GB data structure on in-memory. Furthermore, checking your system only takes a few seconds.

Now, we can do some sanity checks on the resulting data and ensure that we have what we were trying to get, as follows:

In [18]: big_data_len = len(big_data)
         big_data_len
Out[18]: 1100000000
In [19]: data[100000000 – 1]
Out[19]: 63
In [20]: big_data[100000000 – 1]
Out[20]: 63

Attempting to get the next index from our original dataset will throw an error (as shown in the following code), since it didn't have that index:

In [21]: data[100000000]
-----------------------------------------------------------
IndexError                Traceback (most recent call last)
...
IndexError: index 100000000 is out of bounds ...

But our new data does have an index, as shown in the following code:

In [22]: big_data[100000000
Out[22]: 90

And then some:

In [23]: big_data[1100000000 – 1]
Out[23]: 63

We can also plot data from a memmaped array without having a significant lag time. However, note that in the following code, we will create a histogram from 1.1 million points of data, so the plotting won't be instantaneous:

In [24]: (figure, axes) = plt.subplots(figsize=(20, 10))
         axes.hist(big_data, bins=100)
         plt.show()

The following plot is the result of the preceding code:

NumPy's memmap function

The plotting took about 40 seconds to generate.

The odd shape of the histogram is due to the fact that, with our data file-hacking, we have radically changed the nature of our data since we've increased the sample size linearly without regard for the distribution. The purpose of this demonstration wasn't to preserve a sample distribution, but rather to show how one can work with large datasets. What we have seen is not too shabby. Thanks to NumPy, matplotlib can work with data that is too large for memory, even if it is a bit slow iterating over hundreds of millions of data points from the disk.

Can matplotlib do better?

HDF5 and PyTables

A commonly used file format in the scientific computing community is Hierarchical Data Format (HDF). HDF is a set of file formats (namely HDF4 and HDF5) that were originally developed at the National Center for Supercomputing Applications (NCSA), a unit of the University of Illinois at Urbana-Champaign, to store and organize large amounts of numerical data.

Note

The NCSA is a great source of technical innovation in the computing industry—a Telnet client, the first graphical web browser, a web server that evolved into the Apache HTTP server, and HDF, which is of particular interest to us, were all developed here. It is a little known fact that NCSA's web browser code was the ancestor to both the Netscape web browser as well as a prototype of Internet Explorer that was provided to Microsoft by a third party.

HDF is supported by Python, R, Julia, Java, Octave, IDL, and MATLAB, to name a few. HDF5 offers significant improvements and useful simplifications over HDF4. It uses B-trees to index table objects and, as such, works well for write-once/read-many time series data. Common use cases span fields such as meteorological studies, biosciences, finance, and aviation. The HDF5 files of multiterabyte sizes are common in these applications. Its typically constructed from the analyses of multiple HDF5 source files, thus providing a single (and often extensive) source of grouped data for a particular application.

The PyTables library is built on the top of the Python HDF5 library and NumPy. As such, it not only provides access to one of the most widely used large data file formats in the scientific computing community, but also links data extracted from these files with the data types and objects provided by the fast Python numerical processing library.

PyTables is also used in other projects. Pandas wraps PyTables, thus extending its convenient in-memory data structures, functions, and objects to large on-disk files. To use HDF data with Pandas, you'll want to create pandas.HDFStore, read from the HDF data sources with pandas.read_hdf, or write to one with pandas.to_hdf. Files that are too large to fit in the memory may be read and written by utilizing chunking techniques. Pandas does support the disk-based DataFrame operations, but these are not very efficient due to the required assembly on columns of data upon reading back into the memory.

One project to keep an eye on under the PyData umbrella of projects is Blaze. It's an open wrapper and a utility framework that can be used when you wish to work with large datasets and generalize actions such as the creation, access, updates,

and migration. Blaze supports not only HDF, but also SQL, CSV, and JSON. The API usage between Pandas and Blaze is very similar, and it offers a nice tool for developers who need to support multiple backends.

In the following example, we will use PyTables directly to create an HDF5 file that is too large to fit in the memory (for an 8 GB RAM machine). We will follow the following steps:

  • Create a series of CSV source data files that take up approximately 14 GB of disk space
  • Create an empty HDF5 file
  • Create a table in the HDF5 file and provide the schema metadata and compression options
  • Load the CSV source data into the HDF5 table
  • Query the new data source once the data has been migrated

Remember the temperature precipitation data for St. Francis, in Kansas, USA, from a previous notebook? We are going to generate random data with similar columns for the purposes of the HDF5 example. This data will be generated from a normal distribution, which will be used in the guise of the temperature and precipitation data for hundreds of thousands of fictitious towns across the globe for the last century, as follows:

In [25]: head = "country,town,year,month,precip,temp
"
         row = "{},{},{},{},{},{}
"
         filename = "../data/{}.csv"
         town_count = 1000
         (start_year, end_year) = (1894, 2014)
         (start_month, end_month) = (1, 13)
         sample_size = (1 + 2
                        * town_count * (end_year – start_year)
                        * (end_month - start_month))
         countries = range(200)
         towns = range(town_count)
         years = range(start_year, end_year)
         months = range(start_month, end_month)
         for country in countries:
             with open(filename.format(country), "w") as csvfile:
                 csvfile.write(head)
                 csvdata = ""
                 weather_data = norm.rvs(size=sample_size)
                 weather_index = 0
                 for town in towns:
                     for year in years:
                         for month in months:
                             csvdata += row.format(
                                 country, town, year, month,
                                 weather_data[weather_index],
                                 weather_data[weather_index + 1])
                             weather_index += 2
                 csvfile.write(csvdata)

Note that we generated a sample data population that was twice as large as the expected size in order to pull both the simulated temperature and precipitation data at the same time (from the same set). This will take about 30 minutes to run. When complete, you will see the following files:

In [26]: ls -rtm ../data/*.csv
         ../data/0.csv, ../data/1.csv, ../data/2.csv, 
         ../data/3.csv, ../data/4.csv, ../data/5.csv, 
         ...
         ../data/194.csv, ../data/195.csv, ../data/196.csv, 
         ../data/197.csv, ../data/198.csv, ../data/199.csv

A quick look at just one of the files reveals the size of each, as follows:

In [27]: ls -lh ../data/0.csv
         -rw-r--r-- 1 oubiwann staff 72M Mar 21 19:02 ../data/0.csv

With each file that is 72 MB in size, we have data that takes up 14 GB of disk space, which exceeds the size of the RAM of the system in question.

Furthermore, running queries against so much data in the .csv files isn't going to be very efficient. It's going to take a long time. So what are our options? Well, to read this data, HDF5 is a very good fit. In fact, it is designed for jobs like this. We will use PyTables to convert the .csv files to a single HDF5. We'll start by creating an empty table file, as follows:

In [28]: tb_name = "../data/weather.h5t"
         h5 = tb.open_file(tb_name, "w")
         h5
Out[28]: File(filename=../data/weather.h5t, title='', mode='w', 
              root_uep='/', filters=Filters(
                 complevel=0, shuffle=False, fletcher32=False, 
                 least_significant_digit=None))
         / (RootGroup) ''

Next, we'll provide some assistance to PyTables by indicating the data types of each column in our table, as follows:

In [29]: data_types = np.dtype(
             [("country", "<i8"),
              ("town", "<i8"),
              ("year", "<i8"),
              ("month", "<i8"),
              ("precip", "<f8"),
              ("temp", "<f8")])

Also, let's define a compression filter that can be used by PyTables when saving our data, as follows:

In [30]: filters = tb.Filters(complevel=5, complib='blosc')

Now, we can create a table inside our new HDF5 file, as follows:

In [31]: tab = h5.create_table(
             "/", "weather",
             description=data_types,
             filters=filters)

With that done, let's load each CSV file, read it in chunks so that we don't overload the memory, and then append it to our new HDF5 table, as follows:

In [32]: for filename in glob.glob("../data/*.csv"):
         it = pd.read_csv(filename, iterator=True, chunksize=10000)
         for chunk in it:
             tab.append(chunk.to_records(index=False))
             tab.flush()

Depending on your machine, the entire process of loading the CSV file, reading it in chunks, and appending to a new HDF5 table can take anywhere from 5 to 10 minutes.

However, what started out as a collection of the .csv files that weigh in at 14 GB is now a single compressed 4.8 GB HDF5 file, as shown in the following code:

In [33]: h5.get_filesize()
Out[33]: 4758762819

Here's the metadata for the PyTables-wrapped HDF5 table after the data insertion:

In [34]: tab
Out[34]: /weather (Table(288000000,), shuffle, blosc(5)) ''
  description := {
  "country": Int64Col(shape=(), dflt=0, pos=0),
  "town": Int64Col(shape=(), dflt=0, pos=1),
  "year": Int64Col(shape=(), dflt=0, pos=2),
  "month": Int64Col(shape=(), dflt=0, pos=3),
  "precip": Float64Col(shape=(), dflt=0.0, pos=4),
  "temp": Float64Col(shape=(), dflt=0.0, pos=5)}
  byteorder := 'little'
  chunkshape := (1365,)

Now that we've created our file, let's use it. Let's excerpt a few lines with an array slice, as follows:

In [35]: tab[100000:100010]
Out[35]: array([(0, 69, 1947, 5, -0.2328834718674, 0.06810312195695),
         (0, 69, 1947, 6, 0.4724989007889, 1.9529216219569),
         (0, 69, 1947, 7, -1.0757216683235, 1.0415374480545),
         (0, 69, 1947, 8, -1.3700249968748, 3.0971874991576),
         (0, 69, 1947, 9, 0.27279758311253, 0.8263207523831),
         (0, 69, 1947, 10, -0.0475253104621, 1.4530808932953),
         (0, 69, 1947, 11, -0.7555493935762, -1.2665440609117),
         (0, 69, 1947, 12, 1.540049376928, 1.2338186532516),
         (0, 69, 1948, 1, 0.829743501445, -0.1562732708511),
         (0, 69, 1948, 2, 0.06924900463163, 1.187193711598)],
         dtype=[('country', '<i8'), ('town', '<i8'),
               ('year', '<i8'), ('month', '<i8'),
               ('precip', '<f8'), ('temp', '<f8')])
In [36]: tab[100000:100010]["precip"]
Out[36]: array([-0.23288347,  0.4724989 , -1.07572167,
                -1.370025  ,  0.27279758, -0.04752531,
                -0.75554939,  1.54004938,  0.8297435 ,
                0.069249  ])

When we're done with the file, we do the same thing that we would do with any other file-like object:

In [37]: h5.close()

If we want to work with it again, simply load it, as follows:

In [38]: h5 = tb.open_file(tb_name, "r")
         tab = h5.root.weather

Let's try plotting the data from our HDF5 file:

In [39]: (figure, axes) = plt.subplots(figsize=(20, 10))
         axes.hist(tab[:1000000]["temp"], bins=100)
         plt.show()

Here's a plot for the first million data points:

HDF5 and PyTables

This histogram was rendered quickly, with a much better response time than what we've seen before. Hence, the process of accessing the HDF5 data is very fast. The next question might be "What about executing calculations against this data?" Unfortunately, running the following will consume an enormous amount of RAM:

tab[:]["temp"].mean()

We've just asked for all of the data—all of its 288 million rows. We are going to end up loading everything into the RAM, grinding the average workstation to a halt. Ideally though, when you iterate through the source data and create the HDF5 file, you also crunch the numbers that you will need, adding supplemental columns or groups to the HDF5 file that can be used later by you and your peers.

If we have data that we will mostly be selecting (extracting portions) and which has already been crunched and grouped as needed, HDF5 is a very good fit. This is why one of the most common use cases that you see for HDF5 is the sharing and distribution of the processed data.

However, if we have data that we need to process repeatedly, then we will either need to use another method besides the one that will cause all the data to be loaded into the memory, or find a better match for our data processing needs.

In the following section, we will look at the last point in more detail. However before that, let's give HDF5 another chance.

We saw in the previous section that the selection of data is very fast in HDF5. What about calculating the mean for a small section of data? If we've got a total of 288 million rows, let's select a divisor of the number that gives us several hundred thousand rows at a time—2,81,250 rows, to be more precise. Let's get the mean for the first slice, as follows:

In [40]: tab[0:281250]["temp"].mean()
Out[40]: 0.0030696632864265312

This took about 1 second to calculate. What about iterating through the records in a similar fashion? Let's break up the 288 million records into chunks of the same size; this will result in 1024 chunks. We'll start by getting the ranges needed for an increment of 281,250 and then, we'll examine the first and the last row as a sanity check, as follows:

In [41]: limit = 281250
         ranges = [(x * limit, x * limit + limit)
             for x in range(2 ** 10)]
         (ranges[0], ranges[-1])
Out[41]: ((0, 281250), (287718750, 288000000))

Now, we can use these ranges to generate the mean for each chunk of 281,250 rows of temperature data and print the total number of means that we generated to make sure that we're getting our counts right, as follows:

In [42]: means = [tab[x * limit:x * limit + limit]["temp"].mean()
             for x in range(2 ** 10)]
         len(means)
Out[42]: 1024

Depending on your machine, that should take between 30 and 60 seconds. With this work done, it's now easy to calculate the mean for all of the 288 million points of temperature data:

In [43]: sum(means) / len(means)
Out[43]: -5.3051780413782918e-05

Through HDF5's efficient file format and implementation, combined with the splitting of our operations into tasks that would not copy the HDF5 data into memory, we were able to perform calculations across a significant fraction of a billion records in less than a minute. HDF5 even supports parallelization. So, this can be improved upon with a little more time and effort.

However, there are many cases where HDF5 is not a practical choice. You may have some free-form data, and preprocessing it will be too expensive. Alternatively, the datasets may be actually too large to fit on a single machine. This is when you may consider using matplotlib with distributed data.

Distributed data

We've looked at the following two ways to handle data that is too large for the memory:

  • NumPy's memmap function
  • The general HDF5 format wrapped by PyTables

However, there is another situation that may come into play for projects that need to use matplotlib to visualize all or parts of large data sets—data that is too large to fit on a hard drive. This can be anything from large datasets, such as the ones created by super-colliders and radio telescopes, to the high-volume streaming data used in systems analysis (and social media) and financial markets data. All of these are either too large to fit on a machine, or too ephemeral to store and need to be processed in real time.

The latter of these is the realm of projects such as Apache Spark (developed at the UC Berkeley AMPLab), Apache Storm (originally developed at BackType and then acquired by Twitter), Apache Kafka (created at LinkedIn), and Amazon's Kinesis. We will not discuss these in this notebook. Instead, we will focus on the former—processing large datasets in a distributed environment; in particular, we will concentrate on MapReduce. Understanding how to use matplotlib and NumPy with a MapReduce framework will provide the foundation necessary for the reader to extend this to the scenarios that involve streaming data.

Even though we have chosen to demonstrate a solution with MapReduce, there are many other options to address problems like these—distributed RDMSes and NoSQL solutions such as Riak, Redis, and Cassandra, to name a few.

MapReduce

So what is MapReduce, and why are we looking at it in the context of running computations against large sets of data? Wikipedia gives the following definition:

MapReduce is a programming model for processing and generating large data sets with a parallel, distributed algorithm on a cluster. A MapReduce program is composed of a Map procedure that performs filtering and sorting, and a Reduce procedure that performs a summary operation. The "MapReduce System" orchestrates the processing by marshalling the distributed servers, running the various tasks in parallel, managing all communications and data transfers between the various parts of the system, and providing for redundancy and fault tolerance.

A little context as to why it is potentially very useful to visualize large datasets with matplotlib will make the definition more clear.

Between 1999 and 2004, Google engineering had created hundreds of proprietary, special-purpose functions, scripts, and programs to process the huge amounts of data that were generated by web crawling, HTTP access logs, and so on. The many kinds of processing tasks that were developed were in large part used to create the page-ranked search results of Google search. At the time, this was a vast improvement over the other search engine results. Though each individual computation was pretty straightforward and nothing about it was new, the manner in which these were combined was unique and provided a vastly improved experience for users.

However, in a span of five years, the computation tasks needed to be split across hundreds, and later thousands, of machines in order to finish in a timely manner. The difficulties of parallelizing code were introduced—not only the decomposition of tasks into parallelizable parts, but also the parallelization of data and handling failures. As a result of these difficulties as well as some related issues, engineers were no longer able to easily add new features to the system

After five years, inspiration struck and Google adopted a new paradigm for distributed, parallelized workloads.

Interestingly enough, the muse behind this new approach to Google's problem came from the second oldest programming language that is still in use—Lisp (Fortan being the oldest). The authors of the Google MapReduce paper were reminded of the fact that many of their processing jobs consisted of a simple action against a dataset. Here's a simple illustration of this pattern using Lisp Flavored Erlang (LFE), a modern Lisp:

> (set data "some words to examine")
"some words to examine"
> (lists:map #'length/1 (string:tokens data " "))
(4 5 2 7)

The preceding code represents a common pattern in functional programming languages where, in this example, the tokenized data is mapped over a length function.

What's more is that Google didn't just stop here. Their engineers were then performing additional operations on the processed data. Here's an example that takes the preceding processed list and creates a new result by "folding" the results into a secondary analytical result:

> (lists:foldl #'+/2 0 (4 5 2 7))
18

The preceding function is called folding due to the fact that there is a recursive operation in place, with each item in the list being folded into an accumulator after being applied to a function. In this case, the folding function is the function that performs the operation of addition (with a parity of 2, thus the +/2). The initial value for the first fold that was provided is 0. Note that if the folding function created items in a list rather than adding two integers for a sum, the initial value would have been a list (empty or otherwise).

The map and fold operations can be combined in the fashion that is typical of higher-order functions, as follows:

> (lists:foldl
    #'+/2
    0
    (lists:map
      #'length/1
      (string:tokens data " ")))
18

As you may have guessed by now (or known already), there is another term by which folding is known. It is named not for the process employed, but by the nature of the results it creates—reduce. In this case, a list of integers is reduced to a single value by the means of the addition function that we provided.

To summarize, given an initial dataset, we executed a length function (with a parity of one) against every element of the data that has been split on the "space" character. The results were integers representing the length of each word in the dataset. Then, we folded the list of length values with the + function, one element at a time, into an accumulator with an initial value of 0. The end result represented the sum of all the word lengths. If we wanted a running average instead of a running sum, we would have supplied a different function. It still would take two arguments and add them. It would just divide that result by two, as follows:

> (defun ave (number accum)
    (/ (+ number accum) 2))
ave
> (lists:foldl
    #'ave/2
    0
    (lists:map
      #'length/1
      (string:tokens data " ")))
4.875

The average word length in our data is 4.875 ASCII characters.

The last example makes the latent power clear in solutions like these—for completely new results, we only need to change one function.

Various Lisps and functional programming languages have the fold or reduce functionality, but this is not just the domain of functional programming. Python 3 has a library dedicated to the functional programming idioms—functools. Here's how the preceding examples will be implemented in Python:

>>> data = "some words to examine"
>>> [x for x in map(len, data.split(" ")]
[4, 5, 2, 7]
>>> functools.reduce(operator.add, [4, 5, 2, 7], 0)
18

Similarly, these may be composed in Python, as follows:

>>> functools.reduce(operator.add,
...                  map(len, data.split(" ")),
...                  0)
18

To calculate the running average, we will use the following code:

>>> def ave(number, accum):
...     return (number + accum) / 2
...
>>> functools.reduce(ave,
...                  map(len, data.split(" ")),
...                  0)
4.875

The really important part to realize here—given the context of Google's needs in 2004 and the later fluorescence of MapReduce via the Hadoop project—is that each map call of len is independent of all the others. Therefore, these can be called on the same machine in the same process or in different processes, on different cores, or on another machine altogether (given the appropriate framework, of course). In a similar fashion, the data provided to the reduce function can be from any number of local or remote sources. In fact, the reduce step can be split across multiple computational resources. It will just need a final reduce step to aggregate the reduce results.

This insight led to an innovation in the development process at Google in support of the tasks that had steadily grown in complexity and had been encumbered by reduced maintainability. They created such infrastructure so that engineers only needed to create their own mapper and reducer functions; these could then be run against the desired data sources on the appropriate MapReduce clusters. This approach to the problem space allowed for the automated parallelization of tasks as well as the distribution of workload across a number of servers running in Google's large computation clusters in their data centers.

What started by applying old techniques from an old language resulted in a whole new industry dominated by the Java MapReduce implementation, Hadoop, but utilized extensively by Python programmers who use the Hadoop streaming. In the next section, we will take a look at Hadoop and others in the open source community of big data frameworks. Let's first close this section with a return to the practical world—what is MapReduce good for, other than for the counting of letters and words?

The main benefit that MapReduce offers is the ability to split a specific class of data processing problems across a potentially large number of machines without the need for specialized hardware. Once a MapReduce framework is in place, developers only need to focus on creating new tasks and need not worry, for the most part, about how all the pieces of infrastructure fit together or whether those pieces need to be adjusted for a different job. A caveat here is that authors of mappers and reducers do need to be aware of the tradeoffs between the computation and communication costs in their distributed environment when running the particular tasks that they have designed. We will examine this topic in more detail in the next chapter when we tackle parallelization and clustering.

MapReduce is useful in a wide range of applications, which includes the following:

  • Distributed pattern-based searching
  • Distributed sorting
  • Web link-graph reversal
  • Singular Value Decomposition (SVD)
  • Web access log statistics
  • Inverted index construction
  • Document clustering
  • Machine learning
  • Statistical machine translation

Moreover, the MapReduce model has been adapted to several computing environments, which include the following:

  • Multi-core systems
  • Desktop grids
  • Volunteer computing environments
  • Dynamic cloud environments
  • Mobile environments

As great an impact as MapReduce has had on the industry, nothing is permanent and evolution continues to push products and services to new uses and improved user experiences. In 2014, Google announced that it had stopped relying on MapReduce for its petabyte-scale operations. Since then, it moved on to technologies such as Percolator, Flume, and MillWheel, which offer streaming operations and updates instead of batch processing. This allowed them to integrate live search results without rebuilding the complete search index.

Furthermore, these technologies are not limited to the concept of map and reduce workflows. Rather, they are open to the more general concept of data pipeline workflows.

Despite this news, MapReduce certainly isn't dead; the frameworks that support it aren't going away. We have been seeing an evolution in the industry since Google popularized the concept of distributed workloads across commodity hardware with MapReduce, and both proprietary and open source solutions are offering their users the fruits of these innovations, such as the previously mentioned Apache Spark project. We will likely see MapReduce platforms such as Hadoop offer more generalized workflows, with MapReduce being just one of many workflows available to the users.

Open source options

We've mentioned Hadoop several times now, and most readers who may have just a passing familiarity with big data must have heard of Hadoop. A member project of the Apache Software Foundation, Hadoop is an open source distributed storage and processing framework designed to work with very large datasets on commodity hardware computing clusters. The distributed storage part of the project is called HDFS and the processing part is named MapReduce. When a user uploads data to the Hadoop filesystem, the files are split into pieces and then distributed across the cluster nodes. When a user creates some code to run on Hadoop MapReduce, the custom mappers and reducers—similar in concept to what we saw in the previous section—are copied to the MapReduce nodes in the cluster, which are then executed against the data stored at each node.

Hadoop's predecessor was created at the Internet Archive in 2002 in an attempt to build a better web page crawler and indexer. When the papers on the Google File System and Google's MapReduce were published in 2003 and 2004 respectively, the creators of Hadoop re-envisioned their project and created a framework upon which it could run more efficiently. This led to the creation of Hadoop. Yahoo! invested heavily in the project a few years later and open sourced it and at the same time provided its researchers an access to a testing cluster. The last project sowed the seed for Hadoop's very strong role in the field of machine learning.

Though Hadoop is the primary driver for the big data market—projected to generate 23 billion USD by 2016—it is not the only big data framework available in the open source community. A notable, if quiet, contender is the Disco project.

In 2008, the Nokia Research Center needed a tool that would allow them to process enormous amounts of data in real time. They wanted their researchers—many of them proficient in Python—to be able to easily and quickly create MapReduce jobs against their large datasets.

They also needed their system to be fault-tolerant and scalable. They built the server on top of the Erlang distributed programming language and created a protocol and a Python client that could talk to it, thus allowing their users to continue using the language they knew so well.

Since then, Disco has continued to evolve. It provides a generalized workflow on top of its distributed file system—the Disco pipelines. The pipeline workflow enables data scientists to create distributed processing tasks, which go beyond the original vision of MapReduce.

The functionality of MapReduce is no longer available only in the domain of the MapReduce frameworks. The rise of NoSQL databases, which subsequently extended their functionality to distributed data, have started offering MapReduce features in their products. For instance, the Redis clusters make it easy for you to implement the MapReduce functionality. Riak is a distributed NoSQL key-value data store that is based on the Amazon Dynamo paper (not to be confused with the DynamoDB product from Amazon). This data store offers built-in MapReduce capabilities and an API to execute the MapReduce jobs against the nodes in a cluster and is supported by the Python Riak client library. MongoDB is another NoSQL database that offers built-in support for MapReduce.

In our case though, we will focus on the Hadoop implementation of MapReduce, utilizing its support for Python via its streaming protocol. In particular, we will take advantage of a service provider that allows us to quickly and easily set up the Hadoop clusters. We will use Amazon's Elastic MapReduce (EMR) service.

An example – working with data on EMR

We are now going to return to the data that we generated to build the large .csv files for the HDF5 example. In this case though, we're not going to take advantage of the speed offered by HDF5. Instead, we're going to use the data to simulate the experience of working with extremely large datasets. This will be a simulation for practical reasons. The generation of a large dataset for demonstration purposes is prohibitively expensive with regard to both time as well as the computing resources. That being said, this simulation will offer all the insight that is necessary to adapt a small dataset to much larger ones.

In this section, we will use Hadoop on Amazon EMR service and perform the following tasks:

  • Create a cluster
  • Push a dataset to the cluster
  • Write the mapper and reducer functions in Python
  • Test the mapper and reducer functions against small local data
  • Add nodes to the EMR cluster to prepare it for our job
  • Execute the MapReduce job against the EMR cluster that we created
  • Examine the results

In the previous chapter, the aws command-line tool was installed. We will use aws extensively for the rest of this section. To ensure that you have access to it, you will need to activate the Python virtual environment in your terminal. From the directory for this chapter's notebook git repository (see the beginning of this chapter), execute the following code:

$ . ../.venv-mmpl/bin/activate

The preceding code will provide the visual result of adding (.venv-mmpl) to your system prompt. You should now have an access to the aws tool. You can confirm your access using the usual technique, as follows:

$ which aws
/Users/oubiwann/lab/mastering-matplotlib/.venv-mmpl/bin/aws

This section assumes that you have created a key pair on AWS for use in ssh'ing to the EC2 instances and, as we will soon implement, ssh'ing to the EMR master nodes.

We will now ready to create a Hadoop cluster on EMR:

$ aws emr create-cluster --name "Weather" --ami-version 3.6.0 
    --applications Name=Hue Name=Hive Name=Pig Name=HBase 
    --use-default-roles --ec2-attributes KeyName=YourKeyName 
    --instance-type c1.xlarge --instance-count 3
j-63JNVV2BYHC

We have enabled the standard Hadoop tools—Hue, Hive, Pig, and HBase. However, we will not use them in the following section. These tools are provided in case you would like to use them in further explorations. We've given our cluster a name, configured it to use the EMR system image version (3.6.0), which supports the 2.4.0 version of Hadoop, and supplied the SSH key name that we will use to log into the master server.

Tip

The key name that you provide should only be a name and should not contain the .pem file extension.

The create-cluster command returned a single value—the ID for the cluster that we just created. We're going to need this cluster ID. So, let's export it as a shell variable. We're also going to use the full path to the .pem file. Hence, we'll set one for that too, as follows:

$ export CLUSTER_ID=j-63JNVV2BYHC
$ export AWS_PEM=/path/to/YourKeyName.pem

You can check the state of the cluster with the help of the following code:

$ aws emr describe-cluster --cluster-id $CLUSTER_ID |grep STATUS
STATUS  RUNNING
STATUS  RUNNING
STATUS  WAITING

The first STATUS output is for the master node. Once it returns the state of the node as RUNNING, we can start copying files to it. The following command will copy just a few files to the cluster:

$ for FILE in data/{0,1,2}.csv
  do
    aws emr put 
      --src $FILE 
      --cluster-id $CLUSTER_ID 
      --key-pair-file $AWS_PEM
  done

We'll start with just a few files (remember, they're about 73 MB each) to make sure that everything's working, but we'd like to demonstrate a fuller experience of MapReduce. Therefore, we'll copy all the files. If you'd prefer to avoid the incurrence of data costs for the same, you can still run the demo with just a few files. To make sure that we don't run out of space, we'll switch to a remote file system that has plenty of room, as follows:

$ for FILE in data/*.csv
  do
    aws emr put 
      --src $FILE 
      --dest /mnt1 
      --cluster-id $CLUSTER_ID 
      --key-pair-file $AWS_PEM
  done

Now that the .csv files have been copied to the Hadoop master node, we can login to the server and copy the data to HDFS, as follows:

$ aws emr ssh --cluster-id $CLUSTER_ID --key-pair-file $AWS_PEM

Now, you are on the master node where you uploaded your data. Let's copy this data into the Hadoop cluster's filesystem, as follows:

[hadoop@ip-10-255-7-47 ~]$ hdfs dfs -mkdir /weather
[hadoop@ip-10-255-7-47 ~]$ hdfs dfs -put /mnt1/*.csv /weather

With the two preceding commands, we created an HDFS directory for our data and then started the process by which the 14 gigabytes of .csv files will be pushed out to the worker nodes. This process may take some time, possibly as much as 30 minutes, depending on Amazon.

Once the last command has been executed, we can check whether the files exist on HDFS with the following command:

[hadoop@ip-10-255-7-47 ~]$ $ hdfs dfs -ls /weather|head -10
Found 200 items
-rw-r--r--   1 hadoop g   75460820 2015-03-29 18:46 /weather/0.csv
-rw-r--r--   1 hadoop g   75456830 2015-03-29 18:47 /weather/1.csv
-rw-r--r--   1 hadoop g   76896036 2015-03-30 00:16 /weather/10.csv
-rw-r--r--   1 hadoop g   78337868 2015-03-30 00:16 /weather/100.csv
-rw-r--r--   1 hadoop g   78341694 2015-03-30 00:16 /weather/101.csv
-rw-r--r--   1 hadoop g   78341015 2015-03-30 00:16 /weather/102.csv
-rw-r--r--   1 hadoop g   78337662 2015-03-30 00:16 /weather/103.csv
-rw-r--r--   1 hadoop g   78336193 2015-03-30 00:16 /weather/104.csv
-rw-r--r--   1 hadoop g   78336537 2015-03-30 00:16 /weather/105.csv

With our data in place, we're now ready to write some Python code for the MapReduce task. Before we do so, let's remind ourselves what the data looks like by using the following code:

[hadoop@ip-10-255-7-47 ~]$ head /mnt1/0.csv
country,town,year,month,precip,temp
0,0,1894,1,0.8449506929198441,0.7897647433139449
0,0,1894,2,0.4746140099538822,0.42335801512344756
0,0,1894,3,-0.7088399152900952,0.776535509023379
0,0,1894,4,-1.1731692311337918,0.8168558530942849
0,0,1894,5,1.9332497442673315,-0.6066233105016293
0,0,1894,6,0.003582147937914687,0.2720125869889254
0,0,1894,7,-0.5612131527063922,2.9628153460517272
0,0,1894,8,0.3733525007455101,-1.3297078910961062
0,0,1894,9,1.9148724762388318,0.6364284082486487

The Python code that we will write will consist of two files, one for each part of the MapReduce job. Just like what we saw when reviewing the history of MapReduce, we will need a mapper function and a reducer function.

In our case, we want to perform the same task that we performed dealing with the HDF5 file when we used PyTables—the calculation of the mean value for all the simulated temperatures across all the simulated countries and towns over a period of 120 years. The mapper function will extract the temperature value from each line of every .csv files in HDFS. The reducer function will add these and then calculate the mean.

Keeping in mind that you are still ssh'ed into the Hadoop master node on EMR, save the following code in a file named mapper.py:

#!/usr/bin/env python
import sys

def parse_line(line):
    return line.strip().split(",")

def is_header(line):
    return line.startswith("country")

def main():
    for line in sys.stdin:
        if not is_header(line):
            print(parse_line(line)[-1])

if __name__ == "__main__":
    main()

The MapReduce code interacts with the Hadoop nodes via stdin and stdout. The code will receive input via stdin and send results to Hadoop via stdout, one line at a time. We will check to make sure that the line that we receive is not the CSV header, and then, we will use a simple line parser to extract the last value, which is the temperature field.

Save the following code in the file reducer.py:

#!/usr/bin/env python
import sys

def to_float(data):
    try:
        return float(data.strip())
    except:
        return None

def main():
    accum = 0
    count = 0
    for line in sys.stdin:
        temp = to_float(line)
        if temp == None:
            continue
        accum += temp
        count += 1
    print(accum / count)

if __name__ == "__main__":
    main()

Now, we will make them executable:

[hadoop@ip-10-255-7-47 ~]$ chmod 755 *.py

Before we execute the preceding code against the cluster, let's perform a quick check on one of the .csv files that we uploaded, as follows:

[hadoop@ip-10-255-7-47 ~]$ head /mnt1/0.csv | ./mapper.py
0.7897647433139449
0.42335801512344756
0.776535509023379
0.8168558530942849
-0.6066233105016293
0.2720125869889254
2.9628153460517272
-1.3297078910961062
0.6364284082486487

Looks good! Now, let's add the reducer to the mix, as follows:

[hadoop@ip-10-255-7-47 ~]$ head 0.csv | ./mapper.py | ./reducer.py
0.526826584472

A quick manual check confirms that the generated average is correct for the values parsed by the mapper. This combination of tasks via the command-line pipes highlights the flexible and inherently composable nature of the MapReduce flows.

With our Python code tested and working, we're almost ready to run it on Hadoop. First, we're going to switch to a local terminal session and create some more nodes in our cluster. This is more of an exercise to gain familiarity with the process than anything else since, in our case, the extra nodes won't have too much of an impact:

$ aws emr add-instance-groups 
    --cluster-id $CLUSTER_ID 
    --instance-groups 
      InstanceCount=6,InstanceGroupType=task,InstanceType=m1.large 
      InstanceCount=10,InstanceGroupType=task,InstanceType=m3.xlarge
j-63JNVV2BYHC
INSTANCEGROUPIDS   ig-ZCJCUQU6RU21
INSTANCEGROUPIDS   ig-3RXZ98RUGS7OI

We obtained not only the cluster ID like we did before, but also the IDs for the two new instance groups that we asked for. We can check the creation and setup progress of the nodes by querying the cluster, as follows:

$ aws emr describe-cluster --cluster-id $CLUSTER_ID
CLUSTER	False	j-63JNVV2BYHC	ec2-54-70-11-85.us-west-2.compute.amazonaws.com	Weather	189	3.6.0	3.6.0	EMR_DefaultRole	False	True
APPLICATIONS	hadoop	2.4.0
APPLICATIONS	Hue
BOOTSTRAPACTIONS	Install Hue	s3://us-west-2.elasticmapreduce/libs/hue/install-hue
BOOTSTRAPACTIONS	Install HBase	s3://us-west-2.elasticmapreduce/bootstrap-actions/setup-hbase
EC2INSTANCEATTRIBUTES	us-west-2b	OubiwannAWSKeyPair	sg-fea0e9cd	sg-fca0e9cf	EMR_EC2_DefaultRole
INSTANCEGROUPS	ig-3M0BXLF58BAO1	MASTER	c1.xlarge	ON_DEMAND	MASTER	1	1
STATUS	RUNNING
STATECHANGEREASON
TIMELINE	1427653325.578	1427653634.541
INSTANCEGROUPS	ig-1YYKNHQQ27GRM	CORE	c1.xlarge	ON_DEMAND	CORE	2	2
STATUS	RUNNING
STATECHANGEREASON
TIMELINE	1427653325.579	1427653692.548
INSTANCEGROUPS	ig-3RXZ98RUGS7OI	TASK	m3.xlarge	ON_DEMAND	task	10	0
STATUS	RESIZING
STATECHANGEREASON	Expanding instance group
TIMELINE	1427676271.495
INSTANCEGROUPS	ig-ZCJCUQU6RU21	TASK	m1.large	ON_DEMAND	task	6	0
STATUS	RESIZING
STATECHANGEREASON	Expanding instance group
TIMELINE	1427676243.42
STATUS	WAITING
STATECHANGEREASON	Waiting after step completed
TIMELINE	1427653325.578	1427653692.516

That's a lot of information to parse, but if you scan the bottom of the code, you will see the two groups that we just added have a status of RESIZING. Keep an eye on these until they've finished. Once it's done, we can move back to the terminal window where we've SSH'ed into the Hadoop master.

Getting back to the Hadoop cluster, let's execute the map-reduce task against the data that we've updated to the cluster and saved to HDFS, as follows:

[hadoop@ip-10-255-7-47 ~]$ hadoop 
  jar contrib/streaming/hadoop-*streaming*.jar 
  -D mapred.reduce.tasks=1 
  -files mapper.py,reducer.py 
  -mapper mapper.py 
  -reducer reducer.py 
  -combiner reducer.py 
  -input /weather/*.csv 
  -output /weather/total-mean-temp

That will only take about a minute and a half to run. To see the results, we just need to take a look at the file that was dumped to the output directory that we indicated previously. This can be accomplished with the help of the following code:

[hadoop@ip-10-255-7-47 ~]$ hdfs dfs -ls /weather/total-mean-temp
Found 2 items
-rw-r--r-- 1 hadoop g  0 2015-03-29 20:20 /weather/total-mean-temp/_SUCCESS
-rw-r--r-- 1 hadoop g 18 2015-03-29 20:20 /weather/total-mean-temp/part-00000
[hadoop@ip-10-255-7-47 ~]$ hdfs dfs 
    -cat /weather/total-mean-temp/part-00000
-5.30517804131e-05

The output is within an order of magnitude of the result that was obtained by manually slicing the HDF5 file:

In [44]: sum(means)/len(means)
Out[44]: -5.3051780413782918e-05

Without an in-depth analysis, one might venture to guess that the difference between these two values may be due to the floating point calculations on different platforms that use different versions of Python (the Python version on the Hadoop cluster was 2.6; we're using 3.4.2). At any rate, the calculated mean meets our expectations, that is, close to zero for a normal distribution that is centered around zero.

So, we've come out on the other side of Hadoop with our result, but what does this mean to us as matplotlib users? The standard use case for matplotlib is on a workstation, often at an interactive Python or IPython prompt. In such scenarios, we are used to crunching our data by calculating the means, standard deviations, and so on – and then plotting them. All of this is achieved with the help of a few commands (and seconds), and then execution is completed in a span of few seconds.

In the world of big data, that experience changes drastically. What was an implicit understanding that one's data is in-process and it is easy to copy and perform analytical operations on the same is now an involved process comprising of cluster deployments, configuration management, distributed data, communication latencies, and the like. The only thing that remains the same is that it's our data and we need to plot it.

When the data was too large for the memory but we were still able to fit the same in a single hard drive, HDF5 and PyTables gave us the means by which we could use our old approaches with very little change in our analytical workflows.

Once our data becomes too large for a hard drive or a file server, the workflows have to change. We can't even pretend it's the same data world that we lived in previously. We have to think in terms of the partitioned data and our tasks running against the partitions.

We still get to use NumPy, but the work is not being done on our machine in the IPython shell. It's being done remotely on a cluster comprising of distributed nodes. Our work in the interactive shells is transformed to a testbed, where we operate on a small sample set to prepare for the task of pushing out a job to the cluster. Additionally, every new big data project has the potential to be legitimately different from any other big data project. For every organization that needs to work with big data, for each set of data, the particulars of the day-to-day analytics workflows are likely to change.

In the end though, our jobs will run and we will have distilled a few tens of millions of data points that are needed in the final analysis from the octillions of data points, and it is this data that will be provided to matplotlib for the task of plotting. Though big data requires that the preparation of data for the operation of plotting move outside the familiarity of an interactive Python prompt, the essence remains the same. We need to know what we have and ways to distill what we have. Furthermore, we should be able to visualize it.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset