Using Spark as an ETL tool

In the previous recipe, we subscribed to a Twitter stream and stored it in ElasticSearch. Another common source of streaming is Kafka, a distributed message broker. In fact, it's a distributed log of messages, which in simple terms means that there can be multiple brokers that has the messages partitioned among them.

In this recipe, we'll be subscribing the data that we ingested into ElasticSearch in the previous recipe and publishing the messages into Kafka. Soon after we publish the data to Kafka, we'll be subscribing to Kafka using the Spark Stream API. While this is a recipe that demonstrates treating ElasticSearch data as an RDD and publishing to Kafka using a KryoSerializer, the true intent of this recipe is to run a streaming classification algorithm against Twitter, which is our next recipe.

How to do it...

Let's look at the various steps involved in doing this.

  1. Setting up Kafka: This recipe uses Kafka version 0.8.2.1 for Spark 2.10, which can be downloaded from https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz.

    Once downloaded, let's extract, start the Kafka server, and create a Kafka topic through three commands from inside our Kafka home directory:

    1. Starting Zookeeper: Kafka uses Zookeeper (https://zookeeper.apache.org/) to hold coordination information between Kafka servers. It also holds the commit offset information of the data so that if a Kafka node fails, it knows where to resume from. The Zookeeper data directory and the client port (default 2181) is present in zookeeper.properties. The zookeeper-server-start.sh expects this to be passed as a parameter for it to start:
      bin/zookeeper-server-start.sh config/zookeeper.properties
      
    2. Starting the Kafka server: Again, in order to start Kafka, the configuration file to be passed to it is server.properties. The server.properties, among many things specifies the port on which the Kafka server listens (9092) and the Zookeeper port it needs to connect to (2181). This is passed to the kafka-server-start.sh startup script:
      bin/kafka-server-start.sh config/server.properties
      
    3. Creating a Kafka topic: In really simple terms, a topic can be compared to a JMS topic with the difference that there could be multiple publishers as well as a single subscriber in Kafka. Since we are running the Kafka in a non-replicated and non-partitioned mode using just one Kafka server, the topic named twtopic (Twitter topic) is created with a replication factor of 1 and the number of partitions is 1 as well:
      bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic twtopic
      
  2. Pulling data from ElasticSearch: The next step is to pull the data from ElasticSearch and treat it as a Spark DataFrame other than the optional setting in Spark configuration to point to the correct host and port. This is just a one-liner.

    The configuration change (if needed) is:

    //Default is localhost. Point to ES node when required
      val conf = new SparkConf()
        .setAppName("KafkaStreamProducerFromES")
        .setMaster("local[2]")
        .set(ConfigurationOptions.ES_NODES, "localhost")
        .set(ConfigurationOptions.ES_PORT, "9200")

    The following line queries the "spark/twstatus" index (that we published to in the last recipe) for all documents and extracts the data into a DataFrame. Optionally, you can pass in a query as a second argument (for example, "?q=fashion"):

      val twStatusDf=sqlContext.esDF("spark/twstatus")

    Let's try to sample the DataFrame using show():

    twStatusDf.show()
    

    The output is:

    How to do it...
  3. Preparing data to be published to Kafka: Before we do this step, let's go over what we aim to achieve from this step. Like we discussed at the beginning of the recipe, we will be running a classification algorithm against streaming data in the next recipe. As you know, any supervised learning algorithm requires a training dataset. Instead of us manually curating the dataset, we will be doing that in a very primitive fashion by marking all the tweets that have the word fashion in them as belonging to the fashion class and the rest of the tweets as not belonging to the fashion class.

    We will just take the content of the tweet and convert it into a case class called LabeledContent (similar to LabeledPoint in Spark MLlib):

    case class LabeledContent(label: Double, content: Array[String])

    LabeledContent only has two fields:

    • label: This indicates whether the tweet is about fashion or not (1.0 if the tweet is on fashion and 0.0 if it is not)
    • content: This holds a space-tokenized version of the tweet itself
      def convertToLabeledContentRdd(twStatusDf: DataFrame) = {
          //Convert the content alone to a (label, content) pair
          val labeledPointRdd = twStatusDf.map{row =>
              val content = row.getAs[String]("content").toLowerCase()
              val tokens = content.split(" ") //A very primitive space based tokenizer
              val labeledContent=if (content.contains("fashion")) LabeledContent(1, tokens)
              else LabeledContent(0, tokens)
              println (labeledContent.label, content)
              labeledContent
          }
          labeledPointRdd
      }
  4. Publishing data to Kafka using KryoSerializer: Now that we have the publish candidate (LabeledContent) ready, let's publish it to the Kafka topic. This involves just three lines.
    • Constructing the connection and transport properties: In properties, we configure the Kafka server port location and register the serializer that we use to serialize LabeledContent:
      val properties = Map[String, Object](ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092").asJava
    • Constructing the Kafka producer using the connection properties and the key and value serializer: The next step is to construct a Kafka producer using the properties we constructed earlier. The producer also needs a key and a value serializer. Since we don't have a key for our message, we fall back to Kafka's default, which fills in the hashcode by default, which we aren't interested on receipt.
      val producer = new KafkaProducer[String, Array[Byte]](properties, new StringSerializer, new ByteArraySerializer)
    • Sending data to the Kafka topic using the send method: We then serialize LabeledContent using KryoSerializer and send it to the Kafka topic "twtopic" (the one that we created earlier) using the producer.send method. The only purpose of using a KryoSerializer here is to speed up the serialization process:
      val serializedPoint = KryoSerializer.serialize(lContent)
              producer.send(new ProducerRecord[String, Array[Byte]]("twtopic", serializedPoint))

    For the KryoSerializer, we use Twitter's chill library (https://github.com/twitter/chill), which provides an easier abstraction over the serialization for Scala.

    The actual KryoSerializer is just five lines of code:

    object KryoSerializer {
      private val kryoPool = ScalaKryoInstantiator.defaultPool
    
      def serialize[T](anObject: T): Array[Byte] = kryoPool.toBytesWithClass(anObject)
      def deserialize[T](bytes: Array[Byte]): T = kryoPool.fromBytes(bytes).asInstanceOf[T]
    }

    The dependency for Twitter chill that needs to be added to our build.sbt is:

    "com.twitter" %% "chill" % "0.7.0"

    The entire publishing method looks like this:

      def publishToKafka(labeledPointRdd: RDD[LabeledContent]) {
        labeledPointRdd.foreachPartition { iterator =>
    
          val properties = Map[String, Object](ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092", "serializer.class" -> "kafka.serializer.DefaultEncoder").asJava
          val producer = new KafkaProducer[String, Array[Byte]](properties, new StringSerializer, new ByteArraySerializer)
    
          iterator.foreach { lContent =>
            val serializedPoint = KryoSerializer.serialize(lContent)
            producer.send(new ProducerRecord[String, Array[Byte]]("twtopic", serializedPoint))
          }
        }
      }
  5. Confirming receipt in Kafka: We could confirm whether the data is in Kafka using the JMX MBeans exposed by it. We'll use JConsole UI to explore MBeans. As you can see, the count of the messages is 24849, which matches the ElasticSearch document count (that was published in the previous recipe).
    How to do it...
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset