How to do it...

  1. Start the Spark shell with the Kafka integration package:
$ spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1
  1. Create a stream to listen to messages for the oscars topic:
scala> val data = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","oscars").load()
  1. To find out if it is really a streaming DataFrame or not:
scala> data.isStreaming
  1. Get the schema of data DataFrame:
scala> data.printSchema
  1. Create a stream to listen to messages for the oscars topic:
scala> val data = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","oscars").load()
  1. Cast the stream to String datatype:
scala> val kvstream = data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  1. Write the stream to console based receiver and keep it running until terminated:
scala> val feed = kvstream.writeStream.format("console").start
scala> feed.awaitTermination
  1. Publish a message on the oscars topic in Kafka in another window:
$ kafka-console-producer.sh --broker-list localhost:9092 --topic oscars
  1. Now, publish messages on Kafka by pressing Enter at step 8 and after every message.
  2. Now as you publish messages on Kafka, you will see them in the Spark shell.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset