Case study: fitting classifier models in pyspark

Now that we have examined several algorithms for fitting classifier models in the scikit-learn library, let us look at how we might implement a similar model in PySpark. We can use the same census dataset from earlier in this chapter, and start by loading the data using a textRdd after starting the spark context:

>>> censusRdd = sc.textFile('census.data')

Next we need to split the data into individual fields, and strip whitespace

>>> censusRddSplit = censusRdd.map(lambda x: [e.strip() for e in x.split(',')])

Now, as before, we need to determine which of our features are categorical and need to be re-encoded using one-hot encoding. We do this by taking a single row and asking whether the string in each position represent a digit (is not a categorical variable):

>>> categoricalFeatures = [e for e,i in enumerate(censusRddSplit.take(1)[0]) if i.isdigit()==False]
>>> allFeatures = [e for e,i in enumerate(censusRddSplit.take(1)[0])]

Now, as before, we need to collect a dictionary representing the string-to-position mapping of each categorical label to a place in the one-hot-encoding vector:

>>> categoricalMaps = []
>>> for c in categoricalFeatures:
…    catDict = censusRddSplit.map(lambda x: x[c] if len(x) > c else None).
…    filter(lambda x: x is not None).
…    distinct().
…    zipWithIndex().
…    collectAsMap()
…    censusRddSplit.map(lambda x: x[c]).take(1)
…    categoricalMaps.append(catDict)

Next, we calculate what the total length of the one-hot encoding vector should be to represent all the features. We subtract two from this value because the last categorical features is income, which has two values and which we use as the label for the data:

>>> expandedFeatures = 0
>>> for c in categoricalMaps:
…    expandedFeatures += len(c)
expandedFeatures += len(allFeatures)-len(categoricalFeatures)-2

Now, we use a map function to turn all of our data into labeled point objects for use in logistic regression. To do so, we extract the label for each row from the last element in the vector, then instantiate an empty vector using the length of the one-hot-encoded feature set we calculated preceding. We use two indices: one for which categorical variable we are accessing (to index the right dictionary to perform our mapping), and a second to record where in the feature vector we are (since for categorical variables we will skip over k spaces for a given variable, where k is the number of categories in that variable).

>>> def formatPoint(p):
…      if p[-1] == '<=50K':
…          label = 0
…      else:
 …         label = 1
…      vector = [0.0]*expandedFeatures
…      categoricalIndex = 0
…      categoricalVariable = 0
…      for e,c in enumerate(p[:-1]):
…          if e in categoricalFeatures:
 …             vector[categoricalIndex + categoricalMaps[categoricalVariable][c]]=1
…              categoricalIndex += len(categoricalMaps[categoricalVariable])
…              categoricalVariable +=1
 …         else:
 …             vector[e] = c
…              categoricalIndex += 1
…      return LabeledPoint(label,vector)

We apply this function to all data points

>>> censusRddLabeled = censusRddSplit.map(lambda x: formatPoint(x))

Now that our data is in the right format, we can run logistic regression:

>>> from pyspark.mllib.classification import LogisticRegressionWithLBFGS
>>> censusLogistic = LogisticRegressionWithLBFGS.train(censusRddLabeled )

To access the weights from the resulting model, we can inspect the weights parameter:

>>> censusLogistic.weights

If we wanted to apply the generated model to a new dataset, we can use the predict() method of censusLogistic on a new feature vector. The steps described above are similar to the data processing we used for the scikit-learn example, but can ultimately scale to larger datasets.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset