Using StreamingLogisticRegression to classify a Twitter stream using Kafka as a training stream

In the previous recipe, we published all the tweets that were stored in ElasticSearch to a Kafka topic. In this recipe, we'll subscribe to the Kafka stream and train a classification model out of it. We will later use this trained model to classify a live Twitter stream.

How to do it...

This is a really small recipe that is composed of 3 steps:

  1. Subscribing to a Kafka stream: There are two ways to subscribe to a Kafka stream and we'll be using the DirectStream method, which is faster. Just like Twitter streaming, Spark has first-class support for subscribing to a Kafka stream. This is achieved by adding the spark-streaming-kafka dependency. Let's add it to our build.sbt file:
    "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion

    The subscription process is more or less the reverse of the publishing process even in terms of the properties that we pass to Kafka:

    val topics = Set("twtopic")
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

    Once the properties are constructed, we subscribe to twtopic using KafkaUtils.createDirectStream:

    val kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](streamingContext, kafkaParams, topics).repartition(2)

    With the stream at hand, let's reconstruct LabeledContent out of it. We can do that through KryoSerializer's deserialize function:

    val trainingStream = kafkaStream.map {
            case (key, value) =>
              val labeledContent = KryoSerializer.deserialize(value).asInstanceOf[LabeledContent]
  2. Training the classification model: Now that we are receiving the LabeledContent objects from the Kafka stream, let's train our classification model out of them. We will use StreamingLogisiticRegressionWithSGD for this, which as the name indicates, is a streaming version of the LogisticRegressionWithSGD algorithm we saw in Chapter 5, Learning from Data. In order to train the model, we have to construct a LabeledPoint, which is a pair of labels (represented as a double) and a feature vector. Since this is a text, we'll use the HashingTF's transform function to generate the feature vector for us:
    val hashingTf = new HashingTF(5000)
    
    val kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](streamingContext, kafkaParams, topics).repartition(2)
    val trainingStream = kafkaStream.map {
            case (key, value) =>
              val labeledContent = KryoSerializer.deserialize(value).asInstanceOf[LabeledContent]
              val vector = hashingTf.transform(labeledContent.content)
              LabeledPoint(labeledContent.label, vector)
    }

    trainingStream now is a stream of LabeledPoint, which we will be using to train our model:

    val model = new StreamingLogisticRegressionWithSGD()
          .setInitialWeights(Vectors.zeros(5000))
          .setNumIterations(25).setStepSize(0.1).setRegParam(0.001)
    
    model.trainOn(trainingStream)

    Since we specified the maximum number of features in our HashingTF to be 5000, we set the initial weights to be 0 for all 5,000 features. The rest of the parameters are the same as the regular LogisticRegressionWithSGD algorithm that trains on a static dataset.

  3. Classifying a live Twitter stream: Now that we have the model in hand, let's use it to predict whether the incoming stream of tweets is about fashion or not. The Twitter setup in this section is the same as the first recipe where we subscribed to a Twitter stream:
    val filter = List("fashion", "tech", "startup", "spark")
        val twitterStream = TwitterUtils.createStream(streamingContext, twitterAuth, filter, StorageLevel.MEMORY_AND_DISK)

    The crucial part is the invocation of model.predictOnValues, which gives us the predicted label. Once the prediction is made, we save them as text files in our local directory. It's not the best way to do it and we will probably want to push this data to some appendable data source instead.

    val contentAndFeatureVector=twitterStream.map { status =>
          val tokens=status.getText().toLowerCase().split(" ")
          val vector=hashingTf.transform(tokens)
          (status.getText(), vector)
        }
    
        val contentAndPrediction=model.predictOnValues(contentAndFeatureVector)
    
    
    //Not the best way to store the results. Creates a whole lot of files
        contentAndPrediction.saveAsTextFiles("predictions", "txt")

    In order to consolidate the predictions that are spread over multiple files, a really simple aggregation command was used:

    find predictions* -name "part*" |xargs cat >> output.txt
    
    How to do it...

    Here is a sample of the prediction. The results are fairly okay considering the training dataset itself was not classified in a very scientific way. Also, the tokenization is just space-based, the data isn't scaled nor was the IDF used.

    How to do it...
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset