Chapter 11. Taking clustering to production

This chapter covers

  • Running a clustering job on a Hadoop cluster
  • Tuning a clustering job for performance
  • Batch clustering versus online clustering

You’ve seen how different clustering algorithms in Mahout group the documents in the Reuters news data set. Along the way, you learned about the vector representation of data, distance measures, and various other ways to improve the quality of clusters. One of Mahout’s strengths is its ability to scale. The Reuters data set wasn’t much of a challenge, so in this chapter we set a bigger challenge for Mahout: clustering one of the largest free data sets in the world: Wikipedia—the free encyclopedia. Mahout can handle such scales because the algorithms in it are implemented as MapReduce jobs that can execute on a Hadoop cluster over hundreds and thousands of computers.[1]

1 Ajay Anand describes running a 4,000 node Hadoop installation at Yahoo! in 2008 in his blog entry, “Scaling Hadoop to 4000 nodes at Yahoo!,” http://developer.yahoo.net/blogs/hadoop/2008/09/scaling_hadoop_to_4000_nodes_a.html

Unfortunately, not everyone has access to such a cluster. For demonstration purposes in this chapter, we use a subset of documents extracted from Wikipedia and experiment on a small cluster to show how greater speed is achieved by adding more computers. We start by running clustering on a single-machine Hadoop setup, known as a pseudo-distributed setup (which you saw earlier in chapter 6). We also look at the Mahout launcher program, which can easily launch clustering jobs locally or on any Hadoop cluster by providing the configuration file. Then we discuss how a clustering job can be tuned for performance. Finally, we discuss how to use the existing Mahout clustering algorithms and design a system that can incrementally cluster in an online fashion.

11.1. Quick-start tutorial for running clustering on Hadoop

Let’s first look at an aspect of Hadoop’s architecture.

A Hadoop cluster consists of a single server called the NameNode that handles the different computers in the cluster, which are called DataNodes. The NameNode also synchronizes the Hadoop distributed filesystem (HDFS). Another server called the JobTracker manages all the MapReduce jobs and manages the computers (nodes) where the Mapper and Reducer classes execute on the cluster. On each of these nodes, another process called the TaskTracker manages the Mapper or Reducer execution requests from the JobTracker.

Hadoop does all this seamlessly and without any user intervention. In single-node clusters, the NameNode, JobTracker, DataNode, and TaskTracker all run on the same system as separate processes talking with each other.

The version of Hadoop that Mahout is designed to work with at the moment is 0.21 (though it should be compatible with other recent versions).

11.1.1. Running clustering on a local Hadoop cluster

You’ll find a short tutorial on setting up a local Hadoop cluster in pseudo-distributed mode on this web page: http://hadoop.apache.org/common/docs/r0.20.2/quickstart.html. You’ll need to set up this local cluster for this chapter’s examples.

The Hadoop binary resides in the bin/folder of the Hadoop home directory. To view the contents of the HDFS, execute this command:

bin/hadoop dfs -ls /

This will list all files in the root of the filesystem.

When the cluster is started for the first time, your home directory might not exist in HDFS, so you should create a /user/<your UNIX user name> folder:

bin/hadoop dfs -mkdir /user/<your UNIX username>

The home directory can also be listed as follows:

bin/hadoop dfs -ls

To begin clustering the Reuters data on your pseudo-distributed cluster, prepare the Reuters SequenceFile containing the text data locally, as in chapter 8, and copy it to the HDFS:

bin/hadoop dfs -put <path-to>/reuters-seqfiles reuters-seqfiles

Using this SequenceFile as the input, you’ll run the dictionary-based vectorizer on the cluster and then run k-means clustering on it.

Any Hadoop MapReduce job is executed using the hadoop jar command. Mahout puts all example class files and dependencies in a single JAR file under examples/target/mahout-examples-0.4-SNAPSHOT.job. To run the dictionary vectorizer in the local Hadoop cluster using the Reuters SequenceFiles as input, simply execute the Hadoop jar command with the Mahout job file as follows:

bin/hadoop jar mahout-examples-0.5-job.jar 
  org.apache.mahout.vectorizer.SparseVectorsFromSequenceFiles 
  -ow -i reuters-seqfiles -o reuters-vectors

That’s it. The clustering executes on the local Hadoop cluster. If the default Hadoop configuration is used, and if the system has at least two processor cores, two mappers will be executed in parallel, with each running on one core. This can be tracked on the JobTracker dashboard at http://localhost:50030, as shown in figure 11.1.

Figure 11.1. A screenshot showing a typical Hadoop MapReduce JobTracker page. You can view this page for your local Hadoop cluster at http://localhost:50030.

11.1.2. Customizing Hadoop configurations

If you have two cores, Hadoop, out of the box, executes clustering twice as fast as the local runs, which we invoked earlier in chapters 810. If more than two cores are available on the computer, the Hadoop configuration file can be modified to set the number of mapper and reducer tasks to a higher value, increasing the overall speed of computation. The mapred.map.tasks and mapred.reduce.tasks configuration properties in the Hadoop installation’s mapred-site.xml file needs to be configured to the appropriate value for the number of CPU cores.

With each additional core, the parallelism increases and processing time decreases. Once the number of parallel tasks hits the peak for a single node, increasing the number of tasks decreases performance heavily. The only way to scale more is to have multiple nodes with the same configuration: a fully distributed Hadoop cluster.

Tip

You’ll find a tutorial with step-by-step instructions for setting up a distributed Hadoop cluster on the Hadoop website: http://hadoop.apache.org/common/docs/r0.20.2/cluster_setup.html. In the 0.20.2 and later versions of Hadoop, the configuration is split across three files in the conf folder: core-site.xml, hdfs-site.xml, and mapred-site.xml. The default configuration values are set in core-default.xml, hdfs-default.xml, and mapred-default.xml. A parameter given in the default file can be overridden in the *-site.xml file. Tuning the Map-Reduce parameters usually involves editing the mapred-site.xml.

Executing clustering code on a distributed Hadoop cluster is no different from executing it on a single-node cluster. The bin/hadoop script can launch the job on the cluster from the NameNode, slave nodes, or any computer that has access to the NameNode. The only requirement is that the script, when run, needs to access the correct configuration files of the cluster through the HADOOP_CONF_DIR environment variable.

Executing Jobs on a Hadoop Cluster Using the Mahout Launcher

Mahout also provides a bin/mahout script (much like the Hadoop launcher script) to launch clustering jobs. We used it extensively in previous chapters to launch clustering as a single-process job. By setting the HADOOP_HOME and HADOOP_CONF_DIR environment variables, the same script can be used to launch any of the Mahout algorithms on a Hadoop cluster. The script will automatically read the Hadoop cluster configuration files and launch the Mahout job on the cluster.

Typical output will look like this:

export HADOOP_HOME=~/hadoop/
export HADOOP_CONF_DIR=$HADOOP_HOME/conf
bin/mahout kmeans -h

running on hadoop, using HADOOP_HOME=/Users/username/hadoop and
     HADOOP_CONF_DIR=/Users/username/hadoop/conf
...

The Mahout launcher script internally calls the Hadoop launcher script using the correct cluster configuration files, as is outlined in figure 11.2. In Mahout, the launcher script is the easiest way to launch any algorithm locally or on a distributed Hadoop cluster.

Figure 11.2. A logic diagram showing the two modes of executing jobs with the Mahout launcher script.

Congratulations, you have successfully run a clustering job on a Hadoop setup. If you’re interested, you can spend some time experimenting with various parameters to tune clustering quality. Next, we’ll look at how you can tune this setup and achieve higher throughput and performance in clustering.

11.2. Tuning clustering performance

Clustering algorithms in Mahout are designed to run in parallel. Though the algorithms differ, they’re similar in one respect: they all read from the SequenceFile of vectors in parallel in each Mapper.

Many operations in the clustering algorithms are CPU-bound. This means that for many operations, the vector serializations and deserializations, distance computations, and so on, keep the CPU at full usage. On the other hand, some operations are I/O bound, such as transmitting the centroid to each Reducer over the network. To improve clustering performance, you need to understand the tricks to address both these types of performance bottlenecks. You’ll take a look at how various parameters in Mahout create CPU, disk, or network bottlenecks based on the type of input data.

For reference, the k-means clustering algorithm is illustrated in figure 11.3. Other clustering algorithms like fuzzy k-means, Dirichlet, and LDA all have an architecture similar to k-means, and the optimizations for k-means will hold true for all of them.

Figure 11.3. Schematic diagram of a k-means clustering iteration running as a MapReduce job

Clustering performance is a function of the input data. To tune performance, you need to analyze various distributions of input and determine the performance pitfalls you might encounter while using certain clustering parameters. Let’s begin by looking at how CPU bottlenecks can be reduced, and in some cases completely avoided.

11.2.1. Avoiding performance pitfalls in CPU-bound operations

CPU performance drops when heavily used functions start becoming slower. In clustering, the distance computation is CPU bound, so making the computation faster makes the whole job faster. The following things need to be kept in mind when tackling CPU-related performance problems in clustering.

Using an Appropriate Vector Representation

DenseVector is generally the fastest Vector among the three in Mahout. Any element can be accessed quickly, and it can be iterated efficiently in sequence. But using DenseVector for sparse vectors can cause significant performance problems. When used to represent a vector that has many zero elements, a DenseVector will wastefully store a lot of elements that would otherwise have no representation in a sparse representation. This means more data is needlessly serialized, and many zero elements are iterated over in operations that wish to ignore zero elements.

For example, say you’re clustering very sparse data (the number of non-zero elements is about 1 percent of the cardinality) like text document vectors. The clustering using SparseVector is about twice as fast as DenseVector on the local system and about 10 times faster on distributed systems. The extra performance during distributed clustering is due to the latency caused by transferring unnecessary 0 bytes over the network in a DenseVector.

As a result, it’s usually best to represent data as a SparseVector because even with smaller sparseness, it offers great savings in storage, deserialization, and network operations.

Using a Fast Distance Measure

Clustering algorithms calculate distances often, so using a fast distance-measure implementation is crucial. Improvements in the distance measure’s speed go straight to the bottom line of the algorithm’s overall performance.

If you implement your own distance measure, follow these best practices:

  • Avoid cloning or instantiating a new Vector. Vectors are heavy Java objects, and cloning them degrades performance heavily.
  • Avoid iterating on all elements if the distance measure requires only non-zero elements. Use the Vector.iterateNonZero() iterator instead of Vector.iterator().
  • Use Vector.assign() or Vector.aggregate() methods to efficiently traverse and modify the vector. You can learn more about them in appendix B.
Using a SparseVector Type Based on the Distance-Measure Computation

There are two sparse vector implementations, and it pays to use the implementation appropriate to the distance-measure computation. RandomAccessSparseVector is tuned for random lookups, and SequentialAccessSparseVector is tuned for fast sequential access.

For example, computing a cosine distance measure entails many vector dot product operations, which requires iterating over two vectors in order, element by element. The code needs to multiply values from matching indices from both vectors. Naturally, SequentialAccessSparseVector is optimal for such a sequential access pattern in the distance measure, and it’s much faster than RandomAccessSparseVector. Keeping all the document vectors in sequential format gives a huge boost to the speed of distance-measure computation and hence improves the overall clustering performance.

Tip

The Mahout utils package has a utility class called VectorBenchmarks. It pits different types of vector operations across combinations of dense, random-access and sequential-access vectors and compares their speed. If your custom distance measure does more of certain types of vector operations, you can use this utility to find the vector that performs that operation best at a certain sparsity level. The tool is found in the mahout-utils module in the org.apache.mahout.benchmark package.

11.2.2. Avoiding performance pitfalls in I/O-bound operations

I/O performance is usually affected most by the amount of data the program reads and writes. In the case of Hadoop jobs, these reads and writes are mostly sequential.

Decreasing the number of bytes written improves overall clustering performance greatly. The following things need to be kept in mind to decrease the I/O bottleneck.

Using an Appropriate Vector Representation

This is quite obvious. As explained earlier, you should never store sparse vectors as DenseVector objects. It bloats the size on disk and can create both disk and network I/O bottlenecks for the clustering application.

Using HDFS Replication

Any file stored on HDFS is by default replicated three times across various nodes of the cluster. This is done to guard against data loss, in case a copy on one node is lost, and it also increases performance. If data exists on only one node, all Mappers and Reducers that need that data must access that one node. This can be a bottleneck. Replication allows HDFS to store the chunks on separate servers, giving Hadoop a choice of nodes where the computation could be initialized, thereby reducing this network I/O bottleneck. Replication can be increased to potentially further reduce this bottleneck, but that will mean using more storage on HDFS. Consider this only if there is evidence that under-replication is creating a bottleneck.

Reducing the Number of Clusters

Clusters are usually represented as large, dense vectors, each of which consumes considerable storage. If the clustering job is trying to find a large number of clusters (k), these vectors are sent across the network from the Mappers to the Reducers. Having a small value of k will decrease this network I/O. It also decreases the overhead of the distance-measure computation in k-means and fuzzy k-means, where each cluster centroid increases the distance-measure computation by the number of points stored on the disk.

When it’s necessary to cluster data into large numbers of clusters, you can throw it all at Hadoop, add more computers, and let it scale for itself. But you can smartly reduce the distance-measure computation and clustering time by an order of magnitude by using a two-tiered approach for batch clustering. This can help speed up clustering for very large data sets, like Wikipedia.

Alternatively, we can explore more incremental clustering methods to reduce the amount of data that needs to be examined for clustering. This can help online aggregators, like news websites, to automatically add articles to a cluster as and when they arrive. These two approaches are explained in the next section.

11.3. Batch and online clustering

Let’s return to the AllMyNews.com online news portal case study from chapter 9. We found that a related-articles feature implementation could be constructed as a simple clustering problem. But clustering won’t give us the desired final output. Assume the portal has about a million news articles. We’d have to produce clusters containing up to, say, 100 articles, or else the list of related articles will become unusably large. But that implies producing 10,000 clusters.

This isn’t a good way to go about solving this problem. Although Hadoop enables us to scale up to do this, it would consume a wasteful amount of CPU and disk, which cost money. A better way to go about this would be to split the articles into 100 giant clusters. Then, for each cluster of about 10,000 articles, cluster again to get 100 smaller clusters, as illustrated in figure 11.4.

No. 11

Figure 11.4. Applying k-means in two levels. A single large MapReduce with 10,000 clusters will have Nx106x104 distance computations, where N is the number of iterations. On the other hand, the hierarchical breakdown of the problem gives rise to only N1x106x102 + N2x104x102x102 distance computations—almost 100 times fewer computations.

As in past examples, we’re clustering all articles in one big effort. This is known as batch clustering, or offline clustering. But as new articles come into the news portal, they need to be clustered too. It would be quite expensive to keep running the batch clustering jobs again and again just to cluster a few recent articles. Even if it were run hourly, we’d be unable to identify articles related to a new article for up to an hour. This is clearly not desirable behavior.

Enter online clustering techniques, which aim to efficiently cluster new items right away, given a clustering of existing items.

11.3.1. Case study: online news clustering

Online clustering here isn’t truly online or truly immediate. There are algorithms that directly cluster streams of data, but they have trouble scaling. Instead, consider the following technique:

  • Cluster one million articles, as discussed previously, and save the cluster centroids for all clusters.
  • Periodically, for each new article, use canopy clustering to assign it to the cluster whose centroid is closest, based on a very small distance threshold. This ensures that articles on topics that occurred previously are associated with that topic cluster and are shown instantly on the website. These documents are removed from the new document list.
  • The leftover documents that aren’t associated with any old clusters form new canopies. These canopies represent new topics that have appeared in the news and that have little or no match with any articles from the past.
  • Use the new canopy centroids, cluster the articles that aren’t associated with any of the old clusters, and add these temporary cluster centroids to the centroid list.
  • Less frequently, execute the full batch clustering to recluster the entire set of documents. While doing so, use all previous cluster centroids as input to the algorithm so that clustering achieves faster convergence.

The key here is to ensure that the incremental canopy clustering is done as quickly as possible, so that the user gets related clusters within minutes of a story being published. One dedicated node can easily take care of this. Alternatively, you could delay publishing the story on the website by a couple of minutes until the related-article clusters are generated.

This example shows that just because a solution can use up lots of resources easily and can scale doesn’t mean it should use lots of resources. We could have run full clustering on a dedicated cluster with thousands of nodes, but this would have just wasted money. A cheaper and faster solution is available with a little ingenuity.

11.3.2. Case study: clustering Wikipedia articles

Wikipedia’s articles are exported every night into XML files and made available here: http://dumps.wikimedia.org/enwiki/. This is an amazing data set. Since it’s being edited and organized by humans, many articles remain insufficiently or incorrectly grouped within Wikipedia. You’ll try to run clustering on these articles to extract the related articles, which could potentially help the editors group the related articles on the Wikipedia website.

Before attempting to cluster the articles, you need to extract the documents and vectors from the XML format. Fortunately, Mahout has a Wikipedia data set creator class that can read the XML format and output each individual article in SequenceFile format. The extractor can also select documents matching a certain category. For the purposes of this experiment, you’ll extract all documents in the science category.

First, download the latest pages-articles.xml.bz2 file from the previously mentioned website. Extract it to a local directory, and upload it to the HDFS. Assuming the Hadoop environment variables are set, run the Wikipedia extractor as follows:

echo "science" > categories.txt
bin/mahout seqwiki -c categories.txt -i articles.xml 
  -o wikipedia-seqfiles -e

This will select all documents from the article XML files that have a Wikipedia category exactly matching the word science and write those articles to a SequenceFile. You can run the SparseVectorsFromSequenceFiles job over this output to create the Wikipedia TF-IDF vectors. Then you can run any clustering algorithm on top of it. This whole process will take quite a while.

Once you have successfully run the clustering, you may wish to experiment with the clustering algorithm in order to observe the effect of, say, the choice of vector representation. Recreate the vectors as SequentialAccessSparseVector with the -seq flag in the dictionary vectorizer. This will run faster than the RandomAccessSparseVector, which is created as default.

Once you’re confident enough in clustering the Wikipedia sample set, extract the full data set and vectorize it as follows:

bin/mahout seqwiki -all -i articles.xml -o wikipedia-seqfiles

Unless you have access to a powerful Hadoop cluster, this may take up to a whole a day on one machine. Another option is to run it in the cloud, as you’ll see next.

Running Wikipedia Clustering on Amazon Elastic Mapreduce

One cheap way to explore Mahout is to purchase Hadoop cluster time from providers in the cloud like Amazon’s Elastic MapReduce. Section 6.6 of this book showed you a quick way to access this resource.

If you’re comfortable with the Amazon Elastic MapReduce (EMR) service, Mahout algorithms can be run as follows:

1.  Upload the Mahout job file (mahout-examples-0.5-job.jar) to an Amazon S3 bucket.

2.  Provide the path to the *-job.jar file as the JAR file.

3.  Specify the full class name for the driver class of the algorithm that needs to be run—perhaps org.apache.mahout.clustering.kmeans.KMeansDriver—and the parameters.

4.  Specify the input path of the Wikipedia Vectors file stored on an S3 bucket and the output path on the S3 bucket where clusters need to be written.

5.  Specify the other required parameters for the MapReduce, like the number of machines. Remember, each compute unit is charged by the hour, so a large cluster quickly gets expensive.

6.  Run the job.

You can follow the same procedure to generate vectors directly using the EMR service by uploading the sequence file and running the SparseVectorsFromSequenceFile class.

At the time of writing, it took an eight-node cluster about an hour to convert Wikipedia SequenceFiles into vectors. The clustering time depends heavily on the initial centroids, the distance measure, and the number of clusters—and, of course, the number of machines available to work on the computation. Always remember, the computation isn’t free.

11.4. Summary

In this chapter, we took a brief look at a Hadoop cluster and how a clustering job is executed on it. We also looked at the Mahout launcher script and its two modes of operation.

We discussed the various issues in clustering performance and analyzed various techniques to bring down the CPU and I/O clustering bottlenecks. Correct vector representation and optimized distance measurement are the keys to high performance clustering applications.

We also applied this knowledge to complete the pseudo-online clustering engine for our fictitious news aggregation portal—AllMyNews.com. With a bit of ingenuity, the large clustering problem was broken down into simpler and faster ones using a mix of techniques, thus achieving near-real-time clustering of related articles in the system. To highlight Mahout’s ability to scale clustering, we tried to cluster the largest public database available today, the English Wikipedia. Because the process was slow, we took a quick look at how multi-node clustering could be performed in the cloud using the Amazon Elastic MapReduce service.

With this, you’ve learned to run Mahout on a Hadoop cluster and make it scale. The journey in this part of the book wasn’t simple. We started in chapter 7 with a basic clustering of points in a plane, from which you learned about vectors and distance measures. We toured various algorithms in Mahout and tried to apply them to a real-world case study, which gradually evolved from small input and non-distributed computation to large-scale distributed computation. You also learned techniques for tuning the algorithms for speed and quality. Next, in chapter 12, you’ll use all you’ve learned and apply it to real-world case studies.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset