Up to this point, we have mainly demonstrated examples for ad hoc exploratory analysis. In building up analytical applications, we need to begin putting these into a more robust framework. As an example, we will demonstrate the use of a streaming clustering pipeline using PySpark. This application will potentially scale to very large datasets, and we will compose the pieces of the analysis in such a way that it is robust to failure in the case of malformed data.
As we will be using similar examples with PySpark in the following chapters, let's review the key ingredients we need in such application, some of which we already saw in Chapter 2, Exploratory Data Analysis and Visualization in Python. Most PySpark jobs we will create in this book consist of the following steps:
Let's look at some of these components using the Python notebook. Assuming we have Spark installed on our system, we'll start by importing the required dependencies:
>>> from pyspark import SparkContext >>> from pyspark.streaming import StreamingContext
We can then test starting the SparkContext
:
>>> sc = SparkContext( 'local', 'streaming-kmeans')
Recall that the first argument gives the URL for our Spark master, the machine that coordinates execution of Spark jobs and distributes tasks to the worker machines in a cluster. In this case, we will run it locally, so give this argument as localhost
, but otherwise this could be the URL of a remote machine in our cluster. The second argument is just the name we give to our application. With a context running, we can also generate the streaming context, which contains information about our streaming application, using the following:
>>> ssc = StreamingContext(sc, 10)
The first argument is simply the SparkContext
used as a parent of the StreamingContext
: the second is the frequency in seconds at which we will check our streaming data source for new data. If we expect regularly arriving data we could make this lower, or make it higher if we expect new data to be available less frequently.
Now that we have a StreamingContext
, we can add data sources. Let's assume for now we'll have two sources for training data (which could be historical). We want the job not to die if we give one line of bad data, and so we use a Parser
class that gives this flexibility:
>>> class Parser(): def __init__(self,type='train',delimiter=',',num_elements=5, job_uuid=''): self.type=type self.delimiter=delimiter self.num_elements=num_elements self.job_uuid=job_uuid def parse(self,l): try: line = l.split(self.delimiter) if self.type=='train': category = float(line[0]) feature_vector = Vectors.dense(line[1:]) return LabeledPoint(category, feature_vector) elif self.type=='test': category = -1 feature_vector = Vectors.dense(line) return LabeledPoint(category, feature_vector) else: # log exceptions f = open('/errors_events/{0}.txt'.format(self.job_uuid),'a') f.write('Unknown type: {0}'.format(self.type)) f.close() except: # log errors f = open('/error_events/{0}.txt'.format(self.job_uuid),'a') f.write('Error parsing line: {0}'.format) f.close()
We log error lines to a file with the name of our job ID, which will allow us to locate them later if we need to. We can then use this parser to train and evaluate the model. To train the model, we move files with three columns (a label and the data to be clustered) into the training directory. We can also add to the test data directory files with two columns only the coordinate features:
>>> num_features = 2 num_clusters = 3 training_parser = Parser('train',',',num_features+1,job_uuid) test_parser = Parser('test',',',num_features,job_uuid) trainingData = ssc.textFileStream("/training_data"). map(lambda x: training_parser.parse(x)).map(lambda x: x.features) testData = ssc.textFileStream("/test_data"). map(lambda x: test_parser.parse(x)).map(lambda x: x.features) streaming_clustering = StreamingKMeans(k=num_clusters, decayFactor=1.0). setRandomCenters(num_features,0,0) streaming_clustering.trainOn(trainingData) streaming_clustering.predictOn(testData). pprint() ssc.start()
The decay factor in the parameters gives the recipe for combining current cluster centers and old ones. For parameter 1.0, we use an equal weight between old and new, while for the other extreme, at 0, we only use the new data. If we stop the model at any point we, can inspect it using the lastestModel()
function:
>>> streaming_clustering.latestModel().clusterCenters
We could also predict using the predict()
function on an appropriately sized vector:
>> streaming_clustering.latestModel().predict([ … ])