In the previous recipe, we published all the tweets that were stored in ElasticSearch to a Kafka topic. In this recipe, we'll subscribe to the Kafka stream and train a classification model out of it. We will later use this trained model to classify a live Twitter stream.
This is a really small recipe that is composed of 3 steps:
DirectStream
method, which is faster. Just like Twitter streaming, Spark has first-class support for subscribing to a Kafka stream. This is achieved by adding the spark-streaming-kafka
dependency. Let's add it to our build.sbt
file:"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
The subscription process is more or less the reverse of the publishing process even in terms of the properties that we pass to Kafka:
val topics = Set("twtopic") val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
Once the properties are constructed, we subscribe to twtopic
using KafkaUtils.createDirectStream
:
val kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](streamingContext, kafkaParams, topics).repartition(2)
With the stream at hand, let's reconstruct LabeledContent
out of it. We can do that through KryoSerializer's deserialize
function:
val trainingStream = kafkaStream.map { case (key, value) => val labeledContent = KryoSerializer.deserialize(value).asInstanceOf[LabeledContent]
LabeledContent
objects from the Kafka stream, let's train our classification model out of them. We will use StreamingLogisiticRegressionWithSGD
for this, which as the name indicates, is a streaming version of the LogisticRegressionWithSGD algorithm we saw in Chapter 5, Learning from Data. In order to train the model, we have to construct a LabeledPoint
, which is a pair of labels (represented as a double) and a feature vector. Since this is a text, we'll use the HashingTF's transform
function to generate the feature vector for us:val hashingTf = new HashingTF(5000) val kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](streamingContext, kafkaParams, topics).repartition(2) val trainingStream = kafkaStream.map { case (key, value) => val labeledContent = KryoSerializer.deserialize(value).asInstanceOf[LabeledContent] val vector = hashingTf.transform(labeledContent.content) LabeledPoint(labeledContent.label, vector) }
trainingStream
now is a stream of LabeledPoint
, which we will be using to train our model:
val model = new StreamingLogisticRegressionWithSGD() .setInitialWeights(Vectors.zeros(5000)) .setNumIterations(25).setStepSize(0.1).setRegParam(0.001) model.trainOn(trainingStream)
Since we specified the maximum number of features in our HashingTF
to be 5000
, we set the initial weights to be 0
for all 5,000 features. The rest of the parameters are the same as the regular LogisticRegressionWithSGD algorithm that trains on a static dataset.
fashion
or not. The Twitter setup in this section is the same as the first recipe where we subscribed to a Twitter stream:val filter = List("fashion", "tech", "startup", "spark") val twitterStream = TwitterUtils.createStream(streamingContext, twitterAuth, filter, StorageLevel.MEMORY_AND_DISK)
The crucial part is the invocation of model.predictOnValues
, which gives us the predicted label. Once the prediction is made, we save them as text files in our local directory. It's not the best way to do it and we will probably want to push this data to some appendable data source instead.
val contentAndFeatureVector=twitterStream.map { status => val tokens=status.getText().toLowerCase().split(" ") val vector=hashingTf.transform(tokens) (status.getText(), vector) } val contentAndPrediction=model.predictOnValues(contentAndFeatureVector) //Not the best way to store the results. Creates a whole lot of files contentAndPrediction.saveAsTextFiles("predictions", "txt")
In order to consolidate the predictions that are spread over multiple files, a really simple aggregation command was used:
find predictions* -name "part*" |xargs cat >> output.txt
Here is a sample of the prediction. The results are fairly okay considering the training dataset itself was not classified in a very scientific way. Also, the tokenization is just space-based, the data isn't scaled nor was the IDF used.