Binary classification using LogisticRegression and SVM

Unlike linear regression, wherein we predicted continuous values for the outcome (the y variable), logistic regression and the Support Vector Machine (SVM) are used to predict just one out of the n possibilities for the outcome (the y variable). If the outcome is one of two possibilities, then the classification is called a binary classification.

Logistic regression, when used for binary classification, looks at each data point and estimates the probability of that data point falling under the positive case. If the probability is less than a threshold, then the outcome is negative (or 0); otherwise, the outcome is positive (or 1).

As with any other supervised learning techniques, we will be providing training examples for logistic regression. We then add a bit of code for feature extraction and let the algorithm create a model that encapsulates the probability of each of the features belonging to one of the binary outcomes.

What SVM tries to do is map all of the training data as points in the feature space. The algorithm comes up with a hyperplane that separates the positive and negative training examples in such a way that the distance (margin band) between them is maximum. This is better illustrated with a diagram:

Binary classification using LogisticRegression and SVM

When a new and unseen data point comes up for prediction, the algorithm looks at that point and tries to find the closest point to the input data point. The label corresponding to that point will be predicted as the label for the input point as well.

How to do it...

Both the implementations of LogisticRegression and SVM in Spark use L2 regularization by default, but we are free to switch to L1 by setting the updater explicitly.

In this recipe, we'll classify a spam/ham dataset (https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection) against three variants of classification algorithms:

  • Logistic regression with SGD as the optimization algorithm
  • Logistic regression with BFGS as the optimization algorithm
  • Support vector machine with SGD as the optimization algorithm

The BFGS optimization algorithm provides the benefits of converging to the minimum faster than SGD. Also, for BFGS, we need not break our heads coming up with an optimal learning rate.

Let's summarize the steps:

  1. Importing the data.
  2. Tokenizing the data and converting it into LabeledPoints.
  3. Factoring the Inverse Document Frequency (IDF).
  4. Preparing the training and test data.
  5. Constructing the algorithm.
  6. Training the model and predicting the test data.
  7. Evaluating the model.

Importing the data

As usual, our input data is in the form of a text file—SMSSpamCollection. The data file looks like this:

Importing the data

As we can see, the label and the data are separated by a tab. So, while reading each line, we split the label and the content, and then populate a simple case class named Document. This Document class is just a temporary placeholder. In the next step, we'll convert these documents into LabeledPoints:

//Frankly, we could make this a tuple but this looks neat
  case class Document(label: String, content: String)

  val docs = sc.textFile("SMSSpamCollection").map(line => {
    val words = line.split("	")
    Document(words.head.trim(), words.tail.mkString(" "))
  })

Tokenizing the data and converting it into LabeledPoints

For the tokenization, instead of relying on the tokenizer provided inside Spark, we'll see how to plug in two external NLP libraries—the Stanford CoreNLP and Scala NLP's Epic libraries. These are the two most popular NLP libraries: one from the Java world and the other from Scala. However, one thing that we ought to watch out for while using external libraries is that the instantiation of these APIs, and therefore the creation of heavyweight objects required for the use of these APIs (such as a tokenizer), should be done at the partition level. If we do it at the level of a closure, such as a map over RDD, we'll end up creating new instance of the API object for every single instance of the data.

In the case of Epic, we just split the documents into sentences and then tokenize them into words. We also add two more restrictions. Only those tokens that contain letters or digits will be considered, and the tokens should be of at least two characters:

import epic.preprocess.TreebankTokenizer
import epic.preprocess.MLSentenceSegmenter
//Use Scala NLP - Epic
      val labeledPointsUsingEpicRdd: RDD[LabeledPoint] = docs.mapPartitions { docIter =>

        val segmenter = MLSentenceSegmenter.bundled().get
        val tokenizer = new TreebankTokenizer()
        val hashingTf = new HashingTF(5000)

        docIter.map { doc =>
          val sentences = segmenter(doc.content)
          val tokens = sentences.flatMap(sentence => tokenizer(sentence))

          //consider only features that are letters or digits and cut off all words that are less than 2 characters
          val filteredTokens=tokens.toList.filter(token => token.forall(_.isLetterOrDigit)).filter(_.length() > 1)

          new LabeledPoint(if (doc.label=="ham") 0 else 1, hashingTf.transform(filteredTokens))
        }
      }.cache()

MLSentenceSegmenter splits the paragraph into sentences. The sentences are then split into terms (or words) using the tokenizer. HashingTF creates a map of terms with their frequency of occurrence. Finally, to construct a LabeledPoint for each document, we convert these terms into a term frequency vector for that document using the transform function of HashingTF. Also, we restrict the maximum number of interested terms to 5,000 by way of setting the numFeatures in HashingTF.

With Stanford CoreNLP, the process is a little more involved, in the sense that we reduce the tokens to lemmas (https://en.wikipedia.org/wiki/Lemmatisation). In order to do this, we create an NLP pipeline that splits sentences, tokenizes, and finally reduces the tokens to lemmas:

def corePipeline(): StanfordCoreNLP = {
        val props = new Properties()
        props.put("annotators", "tokenize, ssplit, pos, lemma")
        new StanfordCoreNLP(props)
      }

      def lemmatize(nlp: StanfordCoreNLP, content: String): List[String] = {
        //We are required to prepare the text as 'annotatable' before we annotate :-)
        val document = new Annotation(content)
        //Annotate
        nlp.annotate(document)
        //Extract all sentences
        val sentences = document.get(classOf[SentencesAnnotation]).asScala

        //Extract lemmas from sentences
        val lemmas = sentences.flatMap { sentence =>
          val tokens = sentence.get(classOf[TokensAnnotation]).asScala
          tokens.map(token => token.getString(classOf[LemmaAnnotation]))

        }
        //Only lemmas with letters or digits will be considered. Also consider only those words which has a length of at least 2
        lemmas.toList.filter(lemma => lemma.forall(_.isLetterOrDigit)).filter(_.length() > 1)
      }

      val labeledPointsUsingStanfordNLPRdd: RDD[LabeledPoint] = docs.mapPartitions { docIter =>
        val corenlp = corePipeline()
        val stopwords = Source.fromFile("stopwords.txt").getLines()
        val hashingTf = new HashingTF(5000)

        docIter.map { doc =>
          val lemmas = lemmatize(corenlp, doc.content)
          //remove all the stopwords from the lemma list
          lemmas.filterNot(lemma => stopwords.contains(lemma))

          //Generates a term frequency vector from the features
          val features = hashingTf.transform(lemmas)

          //example : List(until, jurong, point, crazy, available, only, in, bugi, great, world, la, buffet, Cine, there, get, amore, wat)
          new LabeledPoint(
            if (doc.label.equals("ham")) 0 else 1,
            features)

        }
      }.cache()

Factoring the inverse document frequency

With HashingTF, we have a map of terms along with their frequency of occurrence in the documents. Now, the problem with taking this metric is that common words such as "the" and "a" get higher rankings compared to rare words. The inverse document frequency (IDF) calculates the occurrences of a word in all the documents and gives higher weight to a term that is uncommon. We'll now factor in the inverse document frequency so that we have the TF-IDF score (https://en.wikipedia.org/wiki/Tf–idf) for each term. This is easily achievable in Spark with the availability of org.apache.spark.mllib.feature.IDFModel. We extract all term frequencies from LabeledPoints and pass them to the transform function IDFModel to generate the TF-IDF:

val labeledPointsUsingStanfordNLPRdd=getLabeledPoints(docs, "STANFORD")
val lpTfIdf=withIdf(labeledPointsUsingStanfordNLPRdd).cache()


def withIdf(lPoints: RDD[LabeledPoint]): RDD[LabeledPoint] = {
    val hashedFeatures = labeledPointsWithTf.map(lp => lp.features)
    val idf: IDF = new IDF()
    val idfModel: IDFModel = idf.fit(hashedFeatures)

    val tfIdf: RDD[Vector] = idfModel.transform(hashedFeatures)

    val lpTfIdf= labeledPointsWithTf.zip(tfIdf).map {
      case (originalLPoint, tfIdfVector) => {
        new LabeledPoint(originalLPoint.label, tfIdfVector)
      }
    }

    lpTfIdf
  }

  val lpTfIdf=withIdf(labeledPointsWithTf).cache()

Prepare the training and test data

Our test data has a skewed distribution of spam and ham data. We just have to make sure that when we split the data into training and test data into 80% and 20%, we first split the training and test data into two subsets and then split it into the 80:20 ratio. At the end of this, the training data and test data will have a ratio of 4:1 spam and ham samples.

The spam and ham counts in our dataset are 747 and 4827, respectively:

  //Split dataset
  val spamPoints = lpTfIdf.filter(point => point.label == 1).randomSplit(Array(0.8, 0.2))
  val hamPoints = lpTfIdf.filter(point => point.label == 0).randomSplit(Array(0.8, 0.2))

  println ("Spam count:"+(spamPoints(0).count)+"::"+(spamPoints(1).count))
  println ("Ham count:"+(hamPoints(0).count)+"::"+(hamPoints(1).count))

  val trainingSpamSplit = spamPoints(0)
  val testSpamSplit = spamPoints(1)

  val trainingHamSplit = hamPoints(0)
  val testHamSplit = hamPoints(1)

  val trainingSplit = trainingSpamSplit ++ trainingHamSplit
  val testSplit = testSpamSplit ++ testHamSplit

Constructing the algorithm

Now that we have our training and test sets, the next obvious step is to train a model out of these examples. Let's create instances of the three variants of the algorithms that we would like to experiment with:

  val logisticWithSGD = getAlgorithm("logsgd", 100, 1, 0.001)
  val logisticWithBfgs = getAlgorithm("logbfgs", 100, Double.Nan, 0.001)
  val svmWithSGD = getAlgorithm("svm", 100, 1, 0.001)



  def getAlgorithm(algo: String, iterations: Int, stepSize: Double, regParam: Double) = algo match {
    case "logsgd" => {
      val algo = new LogisticRegressionWithSGD()
algo.setIntercept(true).optimizer.setNumIterations(iterations).setStepSize(stepSize).setRegParam(regParam)
      algo
    }
    case "logbfgs" => {
      val algo = new LogisticRegressionWithLBFGS()
algo.setIntercept(true).optimizer.setNumIterations(iterations).setRegParam(regParam)
      algo
    }
    case "svm" => {
      val algo = new SVMWithSGD()
algo.setIntercept(true).optimizer.setNumIterations(iterations).setStepSize(stepSize).setRegParam(regParam)
      algo
    }
  }

We can notice that the stepSize parameter isn't set for logistic regression with BFGS.

Training the model and predicting the test data

Like linear regression, training and predicting the labels for the test set is just a matter of calling the run and predict methods of the classification algorithm.

Soon after the prediction is done, the next logical step is to evaluate the model. In order to generate metrics for this, we extract the predicted and the actual labels. Our runClassification function trains the model using the training data and makes predictions against the test data. It then zips the predicted and the actual outcomes into a value called predictsAndActuals. This value is returned from the function.

The runClassification accepts a GeneralizedLinearAlgorithm as the parameter, which is the parent of LinearRegressionWithSGD, LogisticRegressionWithSGD, and SVMWithSGD:

val logisticWithSGDPredictsActuals=runClassification(logisticWithSGD, trainingSplit, testSplit)
val logisticWithBfgsPredictsActuals=runClassification(logisticWithBfgs, trainingSplit, testSplit)
val svmWithSGDPredictsActuals=runClassification(svmWithSGD, trainingSplit, testSplit)


 def runClassification(algorithm: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel], trainingData:RDD[LabeledPoint], testData:RDD[LabeledPoint]): RDD[(Double, Double)] = {
    val model = algorithm.run(trainingData)
    val predicted = model.predict(testData.map(point => point.features))
    val actuals = testData.map(point => point.label)
    val predictsAndActuals: RDD[(Double, Double)] = predicted.zip(actuals)
    predictsAndActuals
  }

Evaluating the model

For generating the metrics, Spark has some inbuilt APIs. The two most common metrics used to evaluate a classification model are the area under curve and the confusion matrix. The org.apache.spark.mllib.evaluation.BinaryClassificationMetrics gives us the area under the curve, and org.apache.spark.mllib.evaluation.MulticlassMetrics gives us the confusion matrix. We also calculate the simple accuracy measure manually using the values of predicated and actuals. The accuracy is simply the result of dividing the correctly classified count of the test dataset by the total count of the test dataset. Refer to https://en.wikipedia.org/wiki/Accuracy_and_precision#In_binary_classification for more details:

 def calculateMetrics(predictsAndActuals: RDD[(Double, Double)], algorithm: String) {

    val accuracy = 1.0*predictsAndActuals.filter(predActs => predActs._1 == predActs._2).count() / predictsAndActuals.count()
    val binMetrics = new BinaryClassificationMetrics(predictsAndActuals)
    println(s"************** Printing metrics for $algorithm ***************")
    println(s"Area under ROC ${binMetrics.areaUnderROC}")
    //println(s"Accuracy $accuracy")

    val metrics = new MulticlassMetrics(predictsAndActuals)
    val f1=metrics.fMeasure
    println(s"F1 $f1")
    println(s"Precision : ${metrics.precision}")
    println(s"Confusion Matrix 
${metrics.confusionMatrix}")
    println(s"************** ending metrics for $algorithm *****************")

  }

As we can see from the output, LogisticRegressionWithSGD and SVMWithSGD have a slightly bigger area under curve than LogisticRegressionWithBFGS, which means that the two models perform a tad bit better.

This is a sample output (your output could vary):

************** Printing metrics for Logistic Regression with SGD ***************
Area under ROC 0.9208860759493671
Accuracy 0.9769585253456221
Confusion Matrix
927.0  0.0
25.0   133.0
************** ending metrics for Logistic Regression with SGD *****************

************** Printing metrics for SVM with SGD ***************
Area under ROC 0.9318656156156157
Precision : 0.9784845650140318
Confusion Matrix
921.0  4.0
19.0   125.0
************** ending metrics for SVM with SGD *****************

************** Printing metrics for Logistic Regression with BFGS ***************
Area under ROC 0.8790559620074445
Accuracy 0.9596136962247586
Confusion Matrix
971.0  9.0
37.0   122.0
************** ending metrics for Logistic Regression with BFGS **********************************
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset