Earlier, with the spam example on binary classification, we saw how we prepared the data, separated it into training and test data, trained the model, and evaluated it against test data before we finally arrived at the metrics. This series of steps can be abstracted in a simplified manner using Spark's Pipeline API.
In this recipe, we'll take a look at how to use the Pipeline API to solve the same classification problem. Imagine the pipeline to be a factory assembly line where things happen one after another. In our case, we'll pass our raw unprocessed data through various processors before we finally feed the data into the classifier.
In this recipe, we'll classify the same spam/ham
dataset (https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection) first using the plain Pipeline, and then using a cross-validator to select the best model for us given a grid of parameters.
Let's summarize the steps:
The code for this recipe can be found at https://github.com/arunma/ScalaDataAnalysisCookbook/blob/master/chapter5-learning/src/main/scala/com/packt/scalada/learning/BinaryClassificationSpamPipeline.scala.
This process is a little different from the previous recipe, in the sense that we don't construct LabeledPoint now. Instead of an RDD of LabeledPoint, the pipeline requires a DataFrame. So, we convert each line of text into a Document
object (with the label and the content) and then convert
RDD[Document]
into a DataFrame by calling the toDF()
function on the RDD:
case class Document(label: Double, content: String) val docs = sc.textFile("SMSSpamCollection").map(line => { val words = line.split(" ") val label=if (words.head.trim()=="spam") 1.0 else 0.0 Document(label, words.tail.mkString(" ")) }) //Split dataset val spamPoints = docs.filter(doc => doc.label==1.0).randomSplit(Array(0.8, 0.2)) val hamPoints = docs.filter(doc => doc.label==0.0).randomSplit(Array(0.8, 0.2)) println("Spam count:" + (spamPoints(0).count) + "::" + (spamPoints(1).count)) println("Ham count:" + (hamPoints(0).count) + "::" + (hamPoints(1).count)) val trainingSpamSplit = spamPoints(0) val testSpamSplit = spamPoints(1) val trainingHamSplit = hamPoints(0) val testHamSplit = hamPoints(1) val trainingSplit = trainingSpamSplit ++ trainingHamSplit val testSplit = testSpamSplit ++ testHamSplit import sqlContext.implicits._ val trainingDFrame=trainingSplit.toDF() val testDFrame=testSplit.toDF()
In order to arrange the pipeline, we need to construct its participants. There are three unique participants (or pipeline stages) of this pipeline, and we have to line them up in the right order:
Let's construct these first:
val tokenizer=new Tokenizer().setInputCol("content").setOutputCol("tokens") val hashingTf=new HashingTF().setInputCol(tokenizer.getOutputCol).setOutputCol("tf") val idf = new IDF().setInputCol(hashingTf.getOutputCol).setOutputCol("tfidf") val assembler = new VectorAssembler().setInputCols(Array("tfidf", "label")).setOutputCol("features") val logisticRegression=new LogisticRegression().setFeaturesCol("features").setLabelCol("label").setMaxIter(10)
When RDD[Document]
is run against the first pipeline stage, that is, Tokenizer
, the "content"
field of the Document
is taken as the input column, and the output of the tokenizer is a bag of words that is captured in the "tokens"
output column. HashingTF
takes the "tokens"
and converts them into a TF vector. Notice that the input column of HashingTF
is the same as the output column from the previous stage. IDF
takes the tf
vector and returns a tf-idf
vector. VectorAssembler
merges the tf-idf
vector and the label to form a single vector. This will be used as an input to the classification algorithm. Finally, for the LogisticRegression stage, we specify the features column and the label column. However, if the input DataFrame has a column named "label"
with a Double type and "features"
of type Vector, there is no need to explicitly mention that. So, in our case, since we have "label"
as an attribute of the Document
case class and the output column of the HashingTF
is named "features"
, there is no need for us to specify them explicitly. The following code would work just fine:
val logisticRegression=new LogisticRegression().setMaxIter(10)
Internally, this implementation of LogisticRegression constructs LabeledPoints for each instance of the data, and uses some advanced optimization algorithms to derive a model from the training data.
At every stage, each of these transformations occurs against the input DataFrame of that particular stage, and the transformed DataFrame gets passed along until the final stage.
As the next step, we just need to form a pipeline out of the various pipeline stages that we constructed in the previous step. We then train a model by calling the pipeline.fit
function:
val pipeline=new Pipeline() pipeline.setStages(Array(tokenizer, hashingTf, logisticRegression)) val model=pipeline.fit(trainingDFrame)
Using the newly constructed model to predict the data is just a matter of calling the transform method of the model. Then, we also extract the actual label and the predicted value to calculate the metrics:
val predictsAndActualsNoCV:RDD[(Double,Double)]=model.transform(testDFrame).map(r => (r.getAs[Double]("label"), r.getAs[Double]("prediction"))).cache
Cross-validation is a multiple-iteration model validation technique in which our training and test sets are split into different partitions. The entire dataset is split into subsets, and for each iteration, analysis is done on one subset and validation on a different subset. For this recipe, we'll run the algorithm first without cross-validation, and then with cross-validation.
Firstly, we'll use the same validation metric and method that we used in the previous recipe. We will simply calculate the area under the ROC curve, the precision, and the confusion matrix:
def calculateMetrics(predictsAndActuals: RDD[(Double, Double)], algorithm: String) { val accuracy = 1.0 * predictsAndActuals.filter(predActs => predActs._1 == predActs._2).count() / predictsAndActuals.count() val binMetrics = new BinaryClassificationMetrics(predictsAndActuals) println(s"************** Printing metrics for $algorithm ***************") println(s"Area under ROC ${binMetrics.areaUnderROC}") println(s"Accuracy $accuracy") val metrics = new MulticlassMetrics(predictsAndActuals) println(s"Precision : ${metrics.precision}") println(s"Confusion Matrix ${metrics.confusionMatrix}") println(s"************** ending metrics for $algorithm *****************") }
A sample output of this pipeline without cross-validation is as follows:
************** Printing metrics for Without Cross validation *************** Area under ROC 0.9676924738149228 Accuracy 0.9656357388316151 Confusion Matrix 993.0 36.0 4.0 131.0 ************** ending metrics for Without Cross validation *****************
Before we use the cross-validator to choose the best model that fits the data, we would want to provide each of the parameters a set of alternate values that the validator can choose from.
The way we provide alternate values is in the form of a parameter grid:
val paramGrid=new ParamGridBuilder() .addGrid(hashingTf.numFeatures, Array(1000, 5000, 10000)) .addGrid(logisticRegression.regParam, Array(1, 0.1, 0.03, 0.01)) .build()
So, we say that the number of term frequency vectors that we want HashingTF to generate could be one of 1,000, 5,000, and 10,000, and the regularization parameter for logistic regression could be one of 1, 0.1, 0.03, and 0.01. Thus, in essence, we are passing a 3 x 4 matrix as the parameter grid.
Next, we construct a cross-validator and pass in the following parameters:
val crossValidator=new CrossValidator() .setEstimator(pipeline) .setEvaluator(new BinaryClassificationEvaluator()) .setEstimatorParamMaps(paramGrid) .setNumFolds(10)
We finally let the cross-validator run against the training dataset and derive the best model out of it. Contrast the following line with pipeline.fit
, where we skipped cross-validation:
val bestModel=crossValidator.fit(trainingDFrame)
Now, let's evaluate the model that is generated against the actual test data set (rather than the test dataset that the cross-validator uses internally):
val predictsAndActualsWithCV:RDD[(Double,Double)]=bestModel.transform(testDFrame).map(r => (r.getAs[Double]("label"), r.getAs[Double]("prediction"))).cache calculateMetrics(predictsAndActualsWithCV, "Cross validation")
A sample output of this pipeline with cross-validation is as follows:
************** Printing metrics for Cross validation *************** Area under ROC 0.9968220338983051 Accuracy 0.994579945799458 Confusion Matrix 938.0 6.0 0.0 163.0 ************** ending metrics for Cross validation *****************
As we can see, the area under ROC is far better for this model than for any of our previously generated models.