Unlike linear regression, wherein we predicted continuous values for the outcome (the y variable), logistic regression and the Support Vector Machine (SVM) are used to predict just one out of the n possibilities for the outcome (the y variable). If the outcome is one of two possibilities, then the classification is called a binary classification.
Logistic regression, when used for binary classification, looks at each data point and estimates the probability of that data point falling under the positive case. If the probability is less than a threshold, then the outcome is negative (or 0); otherwise, the outcome is positive (or 1).
As with any other supervised learning techniques, we will be providing training examples for logistic regression. We then add a bit of code for feature extraction and let the algorithm create a model that encapsulates the probability of each of the features belonging to one of the binary outcomes.
What SVM tries to do is map all of the training data as points in the feature space. The algorithm comes up with a hyperplane that separates the positive and negative training examples in such a way that the distance (margin band) between them is maximum. This is better illustrated with a diagram:
When a new and unseen data point comes up for prediction, the algorithm looks at that point and tries to find the closest point to the input data point. The label corresponding to that point will be predicted as the label for the input point as well.
Both the implementations of LogisticRegression and SVM in Spark use L2 regularization by default, but we are free to switch to L1 by setting the updater explicitly.
In this recipe, we'll classify a spam/ham dataset (https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection) against three variants of classification algorithms:
The BFGS optimization algorithm provides the benefits of converging to the minimum faster than SGD. Also, for BFGS, we need not break our heads coming up with an optimal learning rate.
Let's summarize the steps:
The code for this recipe can be found at https://github.com/arunma/ScalaDataAnalysisCookbook/blob/master/chapter5-learning/src/main/scala/com/packt/scalada/learning/BinaryClassificationSpam.scala.
As usual, our input data is in the form of a text file—SMSSpamCollection
. The data file looks like this:
As we can see, the label and the data are separated by a tab. So, while reading each line, we split the label and the content, and then populate a simple case class named Document
. This Document
class is just a temporary placeholder. In the next step, we'll convert these documents into LabeledPoints:
//Frankly, we could make this a tuple but this looks neat case class Document(label: String, content: String) val docs = sc.textFile("SMSSpamCollection").map(line => { val words = line.split(" ") Document(words.head.trim(), words.tail.mkString(" ")) })
For the tokenization, instead of relying on the tokenizer provided inside Spark, we'll see how to plug in two external NLP libraries—the Stanford CoreNLP and Scala NLP's Epic libraries. These are the two most popular NLP libraries: one from the Java world and the other from Scala. However, one thing that we ought to watch out for while using external libraries is that the instantiation of these APIs, and therefore the creation of heavyweight objects required for the use of these APIs (such as a tokenizer), should be done at the partition level. If we do it at the level of a closure, such as a map over RDD, we'll end up creating new instance of the API object for every single instance of the data.
In the case of Epic, we just split the documents into sentences and then tokenize them into words. We also add two more restrictions. Only those tokens that contain letters or digits will be considered, and the tokens should be of at least two characters:
import epic.preprocess.TreebankTokenizer import epic.preprocess.MLSentenceSegmenter //Use Scala NLP - Epic val labeledPointsUsingEpicRdd: RDD[LabeledPoint] = docs.mapPartitions { docIter => val segmenter = MLSentenceSegmenter.bundled().get val tokenizer = new TreebankTokenizer() val hashingTf = new HashingTF(5000) docIter.map { doc => val sentences = segmenter(doc.content) val tokens = sentences.flatMap(sentence => tokenizer(sentence)) //consider only features that are letters or digits and cut off all words that are less than 2 characters val filteredTokens=tokens.toList.filter(token => token.forall(_.isLetterOrDigit)).filter(_.length() > 1) new LabeledPoint(if (doc.label=="ham") 0 else 1, hashingTf.transform(filteredTokens)) } }.cache()
MLSentenceSegmenter
splits the paragraph into sentences. The sentences are then split into terms (or words) using the tokenizer. HashingTF
creates a map of terms with their frequency of occurrence. Finally, to construct a LabeledPoint for each document, we convert these terms into a term frequency vector for that document using the transform
function of HashingTF
. Also, we restrict the maximum number of interested terms to 5,000 by way of setting the numFeatures
in HashingTF
.
With Stanford CoreNLP, the process is a little more involved, in the sense that we reduce the tokens to lemmas (https://en.wikipedia.org/wiki/Lemmatisation). In order to do this, we create an NLP pipeline that splits sentences, tokenizes, and finally reduces the tokens to lemmas:
def corePipeline(): StanfordCoreNLP = { val props = new Properties() props.put("annotators", "tokenize, ssplit, pos, lemma") new StanfordCoreNLP(props) } def lemmatize(nlp: StanfordCoreNLP, content: String): List[String] = { //We are required to prepare the text as 'annotatable' before we annotate :-) val document = new Annotation(content) //Annotate nlp.annotate(document) //Extract all sentences val sentences = document.get(classOf[SentencesAnnotation]).asScala //Extract lemmas from sentences val lemmas = sentences.flatMap { sentence => val tokens = sentence.get(classOf[TokensAnnotation]).asScala tokens.map(token => token.getString(classOf[LemmaAnnotation])) } //Only lemmas with letters or digits will be considered. Also consider only those words which has a length of at least 2 lemmas.toList.filter(lemma => lemma.forall(_.isLetterOrDigit)).filter(_.length() > 1) } val labeledPointsUsingStanfordNLPRdd: RDD[LabeledPoint] = docs.mapPartitions { docIter => val corenlp = corePipeline() val stopwords = Source.fromFile("stopwords.txt").getLines() val hashingTf = new HashingTF(5000) docIter.map { doc => val lemmas = lemmatize(corenlp, doc.content) //remove all the stopwords from the lemma list lemmas.filterNot(lemma => stopwords.contains(lemma)) //Generates a term frequency vector from the features val features = hashingTf.transform(lemmas) //example : List(until, jurong, point, crazy, available, only, in, bugi, great, world, la, buffet, Cine, there, get, amore, wat) new LabeledPoint( if (doc.label.equals("ham")) 0 else 1, features) } }.cache()
With HashingTF
, we have a map of terms along with their frequency of occurrence in the documents. Now, the problem with taking this metric is that common words such as "the" and "a" get higher rankings compared to rare words. The inverse document frequency (IDF) calculates the occurrences of a word in all the documents and gives higher weight to a term that is uncommon. We'll now factor in the inverse document frequency so that we have the TF-IDF score (https://en.wikipedia.org/wiki/Tf–idf) for each term. This is easily achievable in Spark with the availability of org.apache.spark.mllib.feature.IDFModel
. We extract all term frequencies from LabeledPoints and pass them to the transform
function IDFModel
to generate the TF-IDF:
val labeledPointsUsingStanfordNLPRdd=getLabeledPoints(docs, "STANFORD") val lpTfIdf=withIdf(labeledPointsUsingStanfordNLPRdd).cache() def withIdf(lPoints: RDD[LabeledPoint]): RDD[LabeledPoint] = { val hashedFeatures = labeledPointsWithTf.map(lp => lp.features) val idf: IDF = new IDF() val idfModel: IDFModel = idf.fit(hashedFeatures) val tfIdf: RDD[Vector] = idfModel.transform(hashedFeatures) val lpTfIdf= labeledPointsWithTf.zip(tfIdf).map { case (originalLPoint, tfIdfVector) => { new LabeledPoint(originalLPoint.label, tfIdfVector) } } lpTfIdf } val lpTfIdf=withIdf(labeledPointsWithTf).cache()
Our test data has a skewed distribution of spam and ham data. We just have to make sure that when we split the data into training and test data into 80% and 20%, we first split the training and test data into two subsets and then split it into the 80:20 ratio. At the end of this, the training data and test data will have a ratio of 4:1 spam and ham samples.
The spam and ham counts in our dataset are 747 and 4827, respectively:
//Split dataset val spamPoints = lpTfIdf.filter(point => point.label == 1).randomSplit(Array(0.8, 0.2)) val hamPoints = lpTfIdf.filter(point => point.label == 0).randomSplit(Array(0.8, 0.2)) println ("Spam count:"+(spamPoints(0).count)+"::"+(spamPoints(1).count)) println ("Ham count:"+(hamPoints(0).count)+"::"+(hamPoints(1).count)) val trainingSpamSplit = spamPoints(0) val testSpamSplit = spamPoints(1) val trainingHamSplit = hamPoints(0) val testHamSplit = hamPoints(1) val trainingSplit = trainingSpamSplit ++ trainingHamSplit val testSplit = testSpamSplit ++ testHamSplit
Now that we have our training and test sets, the next obvious step is to train a model out of these examples. Let's create instances of the three variants of the algorithms that we would like to experiment with:
val logisticWithSGD = getAlgorithm("logsgd", 100, 1, 0.001) val logisticWithBfgs = getAlgorithm("logbfgs", 100, Double.Nan, 0.001) val svmWithSGD = getAlgorithm("svm", 100, 1, 0.001) def getAlgorithm(algo: String, iterations: Int, stepSize: Double, regParam: Double) = algo match { case "logsgd" => { val algo = new LogisticRegressionWithSGD() algo.setIntercept(true).optimizer.setNumIterations(iterations).setStepSize(stepSize).setRegParam(regParam) algo } case "logbfgs" => { val algo = new LogisticRegressionWithLBFGS() algo.setIntercept(true).optimizer.setNumIterations(iterations).setRegParam(regParam) algo } case "svm" => { val algo = new SVMWithSGD() algo.setIntercept(true).optimizer.setNumIterations(iterations).setStepSize(stepSize).setRegParam(regParam) algo } }
We can notice that the stepSize
parameter isn't set for logistic regression with BFGS.
Like linear regression, training and predicting the labels for the test set is just a matter of calling the run
and predict
methods of the classification algorithm.
Soon after the prediction is done, the next logical step is to evaluate the model. In order to generate metrics for this, we extract the predicted and the actual labels. Our runClassification
function trains the model using the training data and makes predictions against the test data. It then zips the predicted and the actual outcomes into a value called predictsAndActuals
. This value is returned from the function.
The runClassification
accepts a GeneralizedLinearAlgorithm
as the parameter, which is the parent of LinearRegressionWithSGD
, LogisticRegressionWithSGD
, and SVMWithSGD
:
val logisticWithSGDPredictsActuals=runClassification(logisticWithSGD, trainingSplit, testSplit) val logisticWithBfgsPredictsActuals=runClassification(logisticWithBfgs, trainingSplit, testSplit) val svmWithSGDPredictsActuals=runClassification(svmWithSGD, trainingSplit, testSplit) def runClassification(algorithm: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel], trainingData:RDD[LabeledPoint], testData:RDD[LabeledPoint]): RDD[(Double, Double)] = { val model = algorithm.run(trainingData) val predicted = model.predict(testData.map(point => point.features)) val actuals = testData.map(point => point.label) val predictsAndActuals: RDD[(Double, Double)] = predicted.zip(actuals) predictsAndActuals }
For generating the metrics, Spark has some inbuilt APIs. The two most common metrics used to evaluate a classification model are the area under curve and the confusion matrix. The org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
gives us the area under the curve, and org.apache.spark.mllib.evaluation.MulticlassMetrics
gives us the confusion matrix. We also calculate the simple accuracy measure manually using the values of predicated
and actuals
. The accuracy is simply the result of dividing the correctly classified count of the test dataset by the total count of the test dataset. Refer to https://en.wikipedia.org/wiki/Accuracy_and_precision#In_binary_classification for more details:
def calculateMetrics(predictsAndActuals: RDD[(Double, Double)], algorithm: String) { val accuracy = 1.0*predictsAndActuals.filter(predActs => predActs._1 == predActs._2).count() / predictsAndActuals.count() val binMetrics = new BinaryClassificationMetrics(predictsAndActuals) println(s"************** Printing metrics for $algorithm ***************") println(s"Area under ROC ${binMetrics.areaUnderROC}") //println(s"Accuracy $accuracy") val metrics = new MulticlassMetrics(predictsAndActuals) val f1=metrics.fMeasure println(s"F1 $f1") println(s"Precision : ${metrics.precision}") println(s"Confusion Matrix ${metrics.confusionMatrix}") println(s"************** ending metrics for $algorithm *****************") }
As we can see from the output, LogisticRegressionWithSGD
and SVMWithSGD
have a slightly bigger area under curve than LogisticRegressionWithBFGS
, which means that the two models perform a tad bit better.
This is a sample output (your output could vary):
************** Printing metrics for Logistic Regression with SGD *************** Area under ROC 0.9208860759493671 Accuracy 0.9769585253456221 Confusion Matrix 927.0 0.0 25.0 133.0 ************** ending metrics for Logistic Regression with SGD ***************** ************** Printing metrics for SVM with SGD *************** Area under ROC 0.9318656156156157 Precision : 0.9784845650140318 Confusion Matrix 921.0 4.0 19.0 125.0 ************** ending metrics for SVM with SGD ***************** ************** Printing metrics for Logistic Regression with BFGS *************** Area under ROC 0.8790559620074445 Accuracy 0.9596136962247586 Confusion Matrix 971.0 9.0 37.0 122.0 ************** ending metrics for Logistic Regression with BFGS **********************************