In the previous recipe, we subscribed to a Twitter stream and stored it in ElasticSearch. Another common source of streaming is Kafka, a distributed message broker. In fact, it's a distributed log of messages, which in simple terms means that there can be multiple brokers that has the messages partitioned among them.
In this recipe, we'll be subscribing the data that we ingested into ElasticSearch in the previous recipe and publishing the messages into Kafka. Soon after we publish the data to Kafka, we'll be subscribing to Kafka using the Spark Stream API. While this is a recipe that demonstrates treating ElasticSearch data as an RDD and publishing to Kafka using a KryoSerializer, the true intent of this recipe is to run a streaming classification algorithm against Twitter, which is our next recipe.
Let's look at the various steps involved in doing this.
Once downloaded, let's extract, start the Kafka server, and create a Kafka topic through three commands from inside our Kafka home directory:
data
directory and the client port (default 2181
) is present in zookeeper.properties
. The zookeeper-server-start.sh
expects this to be passed as a parameter for it to start:bin/zookeeper-server-start.sh config/zookeeper.properties
server.properties
. The server.properties
, among many things specifies the port on which the Kafka server listens (9092) and the Zookeeper port it needs to connect to (2181). This is passed to the kafka-server-start.sh
startup script:bin/kafka-server-start.sh config/server.properties
twtopic
(Twitter topic) is created with a replication factor of 1 and the number of partitions is 1 as well:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic twtopic
The configuration change (if needed) is:
//Default is localhost. Point to ES node when required val conf = new SparkConf() .setAppName("KafkaStreamProducerFromES") .setMaster("local[2]") .set(ConfigurationOptions.ES_NODES, "localhost") .set(ConfigurationOptions.ES_PORT, "9200")
The following line queries the "spark/twstatus"
index (that we published to in the last recipe) for all documents and extracts the data into a DataFrame. Optionally, you can pass in a query as a second argument (for example, "?q=fashion"
):
val twStatusDf=sqlContext.esDF("spark/twstatus")
Let's try to sample the DataFrame using show()
:
twStatusDf.show()
The output is:
fashion
in them as belonging to the fashion
class and the rest of the tweets as not belonging to the fashion
class.We will just take the content of the tweet and convert it into a case class called LabeledContent
(similar to LabeledPoint
in Spark MLlib):
case class LabeledContent(label: Double, content: Array[String])
LabeledContent
only has two fields:
label
: This indicates whether the tweet is about fashion
or not (1.0
if the tweet is on fashion
and 0.0
if it is not)content
: This holds a space-tokenized version of the tweet itselfdef convertToLabeledContentRdd(twStatusDf: DataFrame) = { //Convert the content alone to a (label, content) pair val labeledPointRdd = twStatusDf.map{row => val content = row.getAs[String]("content").toLowerCase() val tokens = content.split(" ") //A very primitive space based tokenizer val labeledContent=if (content.contains("fashion")) LabeledContent(1, tokens) else LabeledContent(0, tokens) println (labeledContent.label, content) labeledContent } labeledPointRdd }
LabeledContent
) ready, let's publish it to the Kafka topic. This involves just three lines.properties
, we configure the Kafka server port location and register the serializer that we use to serialize LabeledContent
:val properties = Map[String, Object](ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092").asJava
properties
we constructed earlier. The producer
also needs a key and a value serializer. Since we don't have a key for our message, we fall back to Kafka's default, which fills in the hashcode by default, which we aren't interested on receipt.val producer = new KafkaProducer[String, Array[Byte]](properties, new StringSerializer, new ByteArraySerializer)
LabeledContent
using KryoSerializer
and send it to the Kafka topic "twtopic"
(the one that we created earlier) using the producer.send
method. The only purpose of using a KryoSerializer
here is to speed up the serialization process:val serializedPoint = KryoSerializer.serialize(lContent) producer.send(new ProducerRecord[String, Array[Byte]]("twtopic", serializedPoint))
For the KryoSerializer, we use Twitter's chill
library (https://github.com/twitter/chill), which provides an easier abstraction over the serialization for Scala.
The actual KryoSerializer is just five lines of code:
object KryoSerializer { private val kryoPool = ScalaKryoInstantiator.defaultPool def serialize[T](anObject: T): Array[Byte] = kryoPool.toBytesWithClass(anObject) def deserialize[T](bytes: Array[Byte]): T = kryoPool.fromBytes(bytes).asInstanceOf[T] }
The dependency for Twitter chill
that needs to be added to our build.sbt
is:
"com.twitter" %% "chill" % "0.7.0"
The entire publishing method looks like this:
def publishToKafka(labeledPointRdd: RDD[LabeledContent]) { labeledPointRdd.foreachPartition { iterator => val properties = Map[String, Object](ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092", "serializer.class" -> "kafka.serializer.DefaultEncoder").asJava val producer = new KafkaProducer[String, Array[Byte]](properties, new StringSerializer, new ByteArraySerializer) iterator.foreach { lContent => val serializedPoint = KryoSerializer.serialize(lContent) producer.send(new ProducerRecord[String, Array[Byte]]("twtopic", serializedPoint)) } } }