© Raul Estrada and Isaac Ruiz 2016

Raul Estrada and Isaac Ruiz, Big Data SMACK, 10.1007/978-1-4842-2175-4_10

10. Data Pipelines

Raul Estrada and Isaac Ruiz1

(1)Mexico City, Mexico

Well, we have reached the chapter where we have to connect everything, especially theory and practice. This chapter has two parts: the first part is an enumeration of the data pipeline strategies and the second part is how to connect the technologies:

  • Spark and Cassandra

  • Akka and Kafka

  • Akka and Cassandra

  • Akka and Spark

  • Kafka and Cassandra

Data Pipeline Strategies and Principles

The following are data pipeline strategies and principles:

  • Asynchronous message passing

  • Consensus and gossip

  • Data locality

  • Failure detection

  • Fault tolerance / no single point of failure

  • Isolation

  • Location transparency

  • Parallelism

  • Partition for scale

  • Replay for any point of failure

  • Replicate for resiliency

  • Scalable infrastructure

  • Share nothing

  • Dynamo systems principles

It is important to mention that not all technologies implement SMACK stack strategies; this is because some technologies are not designed to take this strategy into consideration or the strategy is doesn’t fit its design. In this section, we try to define concepts and explain how technologies meet. Note that we don’t cover Apache Mesos, because it is more implicit to the infrastructure than the pipeline architecture.

Asynchronous Message Passing

Technologies: Akka, Kafka, Spark

An actor sends a message to a process (another actor) and relies on the process and the supporting system to select and invoke the actual code to run. Asynchronous message passing is implemented, so all the complexities that naturally occur when trying to synchronize actors and data are handled by an intermediary level of software, called middleware. The most common middleware to support asynchronous message passing is called message-oriented middleware (MOM).

Consensus and Gossip

Technologies: Akka, Cassandra

The Paxos protocol is a family of protocols for solving consensus . In Paxos, followers send commands to a leader. During normal operation, the leader receives a client’s command, assigns it a new command number (n), and then begins the nth instance of the consensus algorithm by sending messages to a set of acceptor processes.

A gossip protocol occurs when one node transmits information about the new instances to only some of their known colleagues, and if one of them already knows from other sources about the new node, the first node’s propagation is stopped. Thus, the information about the node is propagated in an efficient and rapid way through the network.

Data Locality

Technologies: Cassandra, Kafka

Related storage locations are frequently accessed, depending on the memory access pattern. There are two types of locality: temporal and spatial. Temporal locality refers to the reuse of specific data and/or resources within a short duration of time. Spatial locality refers to the use of data within relatively close storage locations. In practice, latency and throughput are affected by the efficiency of the cache, which is improved by increasing the locality of reference.

Failure Detection

Technologies: Cassandra, Spark, Akka, Kafka

In Kafka, upon successfully registering a consumer, the coordinator adds the consumer to the ping request scheduler’s queue, which then tries to keep track of whether the consumer is still alive.

In Cassandra, failure detectionis a method for locally determining (from the gossip state and the history) if another node in the system is up or down. Cassandra uses this information to avoid routing client requests to unreachable nodes whenever possible.

In Akka and Spark, you use three Spark properties related to network variables: spark.akka.heartbeat.pauses, spark.akka.failure-detector.threshold, and spark.akka.heartbeat.interval.

Fault Tolerance/No Single Point of Failure

Technologies: Spark, Cassandra, Kafka

A single point of failure (SPOF) is the part of a system that, if it fails, stops the entire system from working. SPOFs are not desirable in any system using high availability or reliability.

Isolation

Technologies: Spark, Cassandra, Kafka

Cassandra is at the opposite end of the ACID properties used in most relational databases, because it uses BASE (Basically Available Soft-state Eventual consistency). Cassandra’s weak consistency comes in the form of eventual consistency, which means that the database eventually reaches a consistent state. As the data is replicated, the latest version is sitting on a particular node, but older versions are still out there on other nodes; yet eventually all nodes will see the latest version.

In Spark, each application gets its own executor processes, which stays up for the duration of the entire application and runs tasks in multiple threads. This has the benefit of isolating applications from each other.

Location Transparency

Technologies: Akka, Spark, Cassandra, Kafka

In Spark, Cassandra, and Kafka, location transparency allows reading and writing to any node in a cluster, and the system replicates the information to the entire cluster.

One of Akka’s key contributions to the actor model is the concept of location transparency . An actor’s mailing address can actually be a remote location but the location is “transparent” to the developer. Akka abstracts away the transmission of the message over the network; the actor’s mailing address is always accessed the same way.

Parallelism

Technologies: Spark, Cassandra, Kafka, Akka

In Kafka, there is parallelism of the partition within the topic. Kafka is able to provide ordering, which guarantees load balancing over a pool of consumer processes.

In Cassandra and Spark, there is data parallelism, a form of distributing data across multiple processors in a cluster environment and different parallel computing nodes. It contrasts task parallelism , another form of parallelism.

In Spark and Akka, task parallelism focuses on distributing tasks (performed by individual processes or threads) across different processors.

Partition for Scale

Technologies: Cassandra, Spark, Kafka, Akka

A network partitionrefers to the failure of a network device that causes a network to split. If you recall the CAP theorem, the partition tolerance in this context means a data processing system’s ability to continue processing data even if a network partition causes communication errors between subsystems. All the SMACK technologies are network topology aware.

Replay for Any Point of Failure

Technologies: Spark, Cassandra, Kafka, Akka

In Spark there is checkpointing, the computation to recover, the streaming computation (i.e., the DStreams set up with the streaming context) periodically checkpoints to another set of files in the same fault-tolerant file system.

For Kafka and Cassandra, there is the ZooKeeper operation (see Chapters 5 and 8, respectively).

For Akka, there is the Akka persistence (see Chapter 4).

Replicate for Resiliency

Technologies: Spark, Cassandra, Kafka

Kafka replicates the log for each topic’s partitions across a configurable number of servers. This allows automatic failover to these replicas when a server in the cluster fails, so messages remain available in the presence of failures.

Cassandra stores replicas on multiple nodes to ensure reliability and fault tolerance. A replication strategy determines the nodes where replicas are placed. The total number of replicas across the cluster is referred to as the replication factor. All replicas are equally important; there is no primary or master replica.

If you recall, Spark does not implement replication. Spark uses HDFS implementations , which implement replication. Spark can recompute chunks of data as a function of input data in HDFS, so if a node crashes, the results for those input shards are computed again in the cluster. Spark can checkpoint computed data back to HDFS, so it saves the results of expensive computations. Spark also makes checkpoints on streams (DStream instances).

Scalable Infrastructure

Technologies: Spark, Cassandra, and Kafka

For Spark, recall that you can use either the stand-alone deploy mode, which only needs Java installed on each node, or the Mesos and Yarn cluster managers.

In Kafka, adding servers to a cluster is done simply by assigning them unique broker ids and then starting Kafka on the new servers. However, these new servers are not automatically assigned any data partitions, so unless partitions are moved to them, they won’t do any work until new topics are created. Usually when you add machines to your cluster, you need to move some existing data to these machines.

Cassandra allows you to add capacity to a cluster by introducing new nodes to the cluster in stages or by adding an entire data center.

Share Nothing/Masterless

Technologies: Cassandra, Akka

In shared nothing architecture , each node is independent and self-sufficient, and there is no single point of contention across the system. More specifically, none of the nodes shares memory or disk storage.

The architecture of Cassandra is masterless, meaning all nodes are the same. Cassandra provides automatic data distribution across all nodes that participate in a ring or database cluster. Since Cassandra is masterless, a client can connect with any node in a cluster. Clients can interface with a Cassandra node using CQL on any node.

Dynamo Systems Principles

Dynamo is a set of techniques that when applied together make a highly available key-value distributed data store or structured storage system. Its principles are as follows:

  • Incremental scalability: Dynamo should be able to scale out one storage host (referred to as a node) with minimal impact on both the system’s operators and the system itself.

  • Symmetry: Every node in Dynamo has the same set of responsibilities as its peers; there is no distinguished node that takes a special role or an extra set of responsibilities.

  • Decentralization: The Dynamo design favors decentralized peer-to-peer techniques over centralized control.

  • Heterogeneity: The Dynamo system is able to exploit heterogeneity in the infrastructure it runs on. For example, the work distribution must be proportional to each individual server’s capabilities. This is fundamental and powerful when adding new nodes with higher capacity and it doesn’t have to upgrade all hosts at once.

Spark and Cassandra

To work with Apache Spark and Apache Cassandra together, you need to work with the Spark-Cassandra connector . If you recall the history, Cassandra was created on Facebook, but as it became a bigger project, it needed one enterprise to support it. Although Apache Cassandra is an open source project, the company responsible for making decisions with Cassandra is DataStax.

DataStax developed the Spark-Cassandra connector, which is a wonderful library that lets you do three fundamental but powerful tasks:

  • Expose Cassandra tables as Spark RDDs

  • Write Spark RDDs to Cassandra

  • Execute CQL queries in your Spark applications

The following are some Apache-Cassandra connector features:

  • Compatible with Apache Cassandra version 2.0 or higher

  • Compatible with Apache Spark versions 1.0 through 1.5

  • Compatible with Scala versions 2.10 and 2.11

  • Exposes Cassandra tables as Spark RDDs

  • Maps table rows to CassandraRow objects or tuples

  • Maps rows to objects of user-defined classes

  • Saves RDDs back to Cassandra by an implicit saveToCassandra call (nice, with one instruction)

  • Makes joins with a Cassandra data subset using joinWithCassandraTable call

  • Allows RDDs partition according to Cassandra replication using repartitionByCassandraReplica call

  • Converts data types between Cassandra and Scala

  • Supports all the Cassandra data types, including collections

  • Filters rows on server side via the CQL WHERE clause

  • Allows the for execution on arbitrary CQL statements

  • Plays with Cassandra virtual nodes

  • Works with PySpark DataFrames

It is very important to emphasize that the development of the connector is performed after versions of Apache Spark, Apache Cassandra, and Scala are released; typically, the most current versions are not supported.

Connector version compatibility is shown in Table 10-1.

Table 10-1. Spark Cassandra Connector Version Compatibility

Connector

Spark

Cassandra

Cassandra Java Driver

1.6

1.6

2.1.5, 2.2, 3.0

3.0

1.5

1.5, 1.6

2.1.5, 2.2, 3.0

3.0

1.4

1.4

2.1.5

2.1

1.3

1.3

2.1.5

2.1

1.2

1.2

2.1, 2.0

2.1

1.1

1.1, 1.0

2.1, 2.0

2.1

1.0

1.0, 0.9

2.0

2.0

Spark Streaming with Cassandra

As you know, Spark Streaming extends the core API to allow high throughput and fault-tolerant processing of live data streams. Data can be ingested from many sources, such as Akka, Apache Kafka, Apache Flume, ZeroMQ, TCP sockets, and so forth. The results are stored in Cassandra.

If you didn’t know, there is support for Akka within Spark streaming. Chapter 8 has an example of how to use Apache Kafka with Spark Streaming, for which we also show an embedded Kafka and ZooKeeper server for quick user prototyping.

Setting up Spark Streaming

Let’s revisit the classic example of word count with Spark Streaming, which writes to the console with wordCounts.print().

  1. Create a StreamingContext with a SparkConf configuration.

    val ssc = new StreamingContext(sparkConf, Seconds(1))
  2. Create a DStream that connects to the server at IP and port.

    val lines = ssc.socketTextStream(serverIP, serverPort)
  3. Count each word in each batch.

    val words = lines.flatMap(_.split(" "))
    val pairs = words.map( word => (word, 1) )
    val wordCounts = pairs.reduceByKey(_ + _)
  4. Print a few of the counts to the console (don’t forget the start() method).

    wordCounts.print()
    ssc.start()  

Setting up Cassandra

Let’s add the Cassandra-specific functions on the StreamingContext and RDD into scope. To do this, you simply replace the print to console with pipe the output to Cassandra (the Spark-Cassandra connector does all the magic):

import com.datastax.spark.connector.streaming._
wordCounts.saveToCassandra("streaming_test", "words")

Setting up SparkContext

The following explains how to set up the SparkContext .

  1. As usual, start by importing Spark:

    import org.apache.spark._
  2. Before creating the SparkContext, set the spark.cassandra.connection.host property to the address of one of the Cassandra nodes:

    val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1")
  3. Create a SparkContext. Substitute 127.0.0.1 with the actual address of your Spark master (or use "local" to run in local mode):

    val sc = new SparkContext("spark://127.0.0.1:7077", "test", conf)
  4. Enable Cassandra-specific functions on the SparkContext, RDD, and DataFrame:

    import com.datastax.spark.connector._

Create a Streaming Context

The second parameter in the streaming context is the batchDuration, which sets the interval that streaming data will be divided into batches. Note that the Spark API supports milliseconds, seconds, and minutes, all accepted as duration. Try not to confuse this duration with scala.concurrent.duration.Duration.

val ssc = new StreamingContext(conf, Seconds(n))

Creating a Stream

We can create a stream with any stream type available or with a custom Spark stream. The Spark-Cassandra connector supports Akka actor streams, subsequently it will support many more stream types. We can also extend the types already provided.

import com.datastax.spark.connector.streaming.TypedStreamingActor
Kafka Stream

The Kafka stream creates an input stream that pulls messages from a Kafka broker:

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafka.kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
Actor Stream

The following is an actor stream:

val stream = ssc.actorStream[String](Props[TypedStreamingActor[String]], "stream", StorageLevel.MEMORY_AND_DISK)

Enable Spark Streaming with Cassandra

Do the following to enable Cassandra-specific functions on the StreamingContext, DStream, and RDD:

import com.datastax.spark.connector.streaming._

In our example, streaming_test is the keyspace name and words is the table name.

Saving Data

This shows how to save data:

val wc = stream.flatMap(_.split("\s+"))
        .map(x => (x, 1))
        .reduceByKey(_ + _)
        .saveToCassandra("streaming_test", "words", SomeColumns("word", "count"))

This starts the computation:

sc.start()

Reading the StreamingContext from Cassandra

To read the StreamingContext from Cassandra, we use something like this:

val rdd = ssc.cassandraTable("streaming_test", "key_value")
        .select("key", "value").where("foo = ?", 3)

Loading and Analyzing Data from Cassandra

Use the sc.cassandraTable method to view this table as a Spark RDD:

val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)
println(rdd.first)
println(rdd.map(_.getInt("value")).sum)        

Saving data from a RDD to Cassandra

The following shows how to add two more rows to the table:

val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))      

Saving Datasets to Cassandra

It’s possible to save any RDD to Cassandra , not just a CassandraRDD. The requisite is that the object class of RDD is a tuple and has property names corresponding to Cassandra column names.

It’s also possible to save an RDD to an existing Cassandra table, as well as to let the connector create the appropriate table automatically, based on the definition of the RDD item class.

To save an RDD to an existing table, you need to import com.datastax.spark.connector._ and call the saveToCassandra method with the keyspace name, the table name, and the list of columns. It is important to include at least all the primary key columns. To save an RDD into a new table, instead of calling saveToCassandra, you have to call saveAsCassandraTable or saveAsCassandraTableEx with the name of the table you want to create.

Saving a Collection of Tuples

Assume the following table definition:

CREATE TABLE ks.words (word text PRIMARY KEY, count int);
save( "bar", 20);
save("foo",10);

You have the following Spark code:

val collection = sc.parallelize(Seq(("cat", 30), ("dog", 40)))
collection.saveToCassandra("ks", "words", SomeColumns("word", "count"))


cqlsh:test> select * from words;

 word | count
------+-------
  bar |    20
  foo |    10
  cat |    30
  dog |    40


(4 rows)

With tuples, the use of a custom mapper is also supported, as shown here:

val collection = sc.parallelize(Seq((30, "cat"), (40, "dog")))
collection.saveToCassandra("ks", "words", SomeColumns("word" as "_2", "count" as "_1"))


cqlsh:test> select * from words;

 word | count
------+-------
  bar |    20
  foo |    10
  cat |    30
  dog |    40


(4 rows)

Saving a Collection of Objects

When saving a collection of objects of a class defined by the user, the items to be saved must provide appropriately named public property accessors to get every column to be saved. This example provides more information on property column naming conventions.

case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("fox", 50), WordCount("cow", 60)))
collection.saveToCassandra("ks", "words", SomeColumns("word", "count"))


cqlsh:test> select * from words;

 word | count
------+-------
  bar |    20
  foo |    10
  cat |    30
  dog |    40
  fox |    50
  cow |    60

You can specify custom columns to property mapping with SomeColumns. If the property names in the objects to be saved don’t correspond to the column names in the destination table, use the “as” keyword on the column names that you want to override. The parameter order uses the table column name first, and then the object property name.

For example, let’s say that you want to save WordCount objects to a table that has word (TEXT) and num (INT) columns. This is the table definition in Cassandra:

CREATE TABLE ks.words (word text PRIMARY KEY, count int);

This is the Spark code:

case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("fox", 50), WordCount("cow", 60)))
collection.saveToCassandra("ks", "words2", SomeColumns("word", "num" as "count"))

Modifying CQL Collections

The default behavior of the Spark-Cassandra connector is to overwrite collections when inserted into a Cassandra table. To override this behavior, you can specify a custom mapper with instructions on how to treat the collection.

The following are the operations supported:

  • append/add (lists, sets, maps)

  • prepend (lists)

  • remove (lists, sets) not supported for maps

  • overwrite (lists, sets, maps): default

Let’s take the elements from rddSetField and remove them from the corresponding "a_set" C* column, and then take elements from rddMapField and add them to the "a_map" C* column, where the key == key C* column is in the RDD elements.

("key", "a_set" as "rddSetField" remove , "a_map" as "rddMapField" append)

The following is an example schema:

CREATE TABLE ks.collections_mod (
      key int PRIMARY KEY,
      list_col list<text>,
      map_col map<text, text>,
      set_col set<text>
  )

The following are appending and prepending lists:

val listElements = sc.parallelize(Seq(
  (1,Vector("One")),
  (1,Vector("Two")),
  (1,Vector("Three"))))


val preElements = sc.parallelize(Seq(
  (1,Vector("PreOne")),
  (1,Vector("PreTwo")),
  (1,Vector("PreThree"))))


listElements.saveToCassandra("ks", "collections_mod", SomeColumns("key", "list_col" append))
preElements.saveToCassandra("ks", "collections_mod", SomeColumns("key", "list_col" prepend))


cqlsh> select * from ks.collections_mod where key = 1;

key   | list_col                                |map_col | set_col
------+-----------------------------------------+--------+----------
   1  | ['PreThree', 'PreTwo', 'PreOne', 'One', | null   | null
         'Two', 'Three']
(1 rows)

Saving Objects of Cassandra User-Defined Types

To save structures consisting of many fields, use a case class or a com.datastax.spark.connector.UDTValue class. An instance of this class can be easily obtained from a Scala map by calling the fromMap method.

Take the following table definition as an example:

CREATE TYPE ks.address (city text, street text, number int);
CREATE TABLE ks.companies (name text PRIMARY KEY, address FROZEN<address>);
CREATE TABLE ks.udts (key text PRIMARY KEY, name text, addr FROZEN<address>);

You can use a case class to insert into the UDT:

case class Address(street: String, city: String, zip: Int)
val address = Address(city = "San Jose", zip = 95126, street = "Santa Clara")
val col = Seq((1, "Raul", address))
sc.parallelize(col).saveToCassandra(ks, "udts", SomeColumns("key", "name", "addr"))

Or use the fromMap of UDTValue to create the UDT:

import com.datastax.spark.connector.UDTValue
case class Company(name: String, address: UDTValue)
val address = UDTValue.fromMap(Map("city" -> "Palo Alto", "street" -> "Infinite Loop", "number" -> 1))
val company = Company("Apple", address)
sc.parallelize(Seq(company)).saveToCassandra("ks", "companies")

Converting Scala Options to Cassandra Options

To convert Cassandra options to Scala options, you use an implemented implicit. This means that Cassandra options can be dealt with as if they were normal Scala options. For the reverse transformation (from a Scala option into a Cassandra option), you need to define the None behavior. This is done via CassandraOption.deleteIfNoneand CassandraOption.unsetIfNone.

import com.datastax.spark.connector.types.CassandraOption

//Setup original data (1, 1, 1) ... (6, 6, 6)
sc.parallelize(1 to 6).map(x => (x,x,x)).saveToCassandra(ks, "tab1")


//Setup options Rdd (1, None, None) (2, None, None) ... (6, None, None)
val optRdd = sc.parallelize(1 to 6).map(x => (x, None, None))


//Deleting second column, but ignore the third column
optRdd.map{ case (x: Int, y: Option[Int], z: Option[Int]) =>
    (x, CassandraOption.deleteIfNone(y), CassandraOption.unsetIfNone(z))
  }.saveToCassandra(ks, "tab1")


val results = sc.cassandraTable[(Int, Option[Int], Option[Int])](ks, "tab1").collect

The following shows the results:

(1, None, Some(1)),
(2, None, Some(2)),
(3, None, Some(3)),
(4, None, Some(4)),
(5, None, Some(5)),
(6, None, Some(6))

Saving RDDs as New Tables

As mentioned, you use the saveAsCassandraTable method to automatically create a new table with the given name and save the RDD into it. The keyspace that you are saving to must exist. The following code creates a new words_new table in the test keyspace with word and count columns, where word becomes a primary key:

case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveAsCassandraTable("test", "words_new", SomeColumns("word", "count"))

To customize the table definition, call saveAsCassandraTableEx. The following code demonstrates how to add another column of int type to the table definition, creating a new words_new_2 table:

import com.datastax.spark.connector.cql.{ColumnDef, RegularColumn, TableDef}
import com.datastax.spark.connector.types.IntType


case class WordCount(word: String, count: Long)
val table1 = TableDef.fromType[WordCount]("test", "words_new")
val table2 = TableDef("test", "words_new_2", table1.partitionKey, table1.clusteringColumns,
table1.regularColumns :+ ColumnDef("additional_column", RegularColumn, IntType))
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveAsCassandraTableEx(table2, SomeColumns("word", "count"))

The following is example code to create a table with a custom definition. It defines which columns are partition and clustering column keys:

import com.datastax.spark.connector.cql.{ColumnDef, RegularColumn, TableDef, ClusteringColumn, PartitionKeyColumn}
import com.datastax.spark.connector.types._


// 1. Define the RDD structure
case class outData(col1:UUID, col2:UUID, col3: Double, col4:Int)


// 2. Define columns
val p1Col = new ColumnDef("col1",PartitionKeyColumn,UUIDType)
val c1Col = new ColumnDef("col2",ClusteringColumn(0),UUIDType)
val c2Col = new ColumnDef("col3",ClusteringColumn(1),DoubleType)
val rCol = new ColumnDef("col4",RegularColumn,IntType)


// 3. Create table definition
val table = TableDef("test","words",Seq(p1Col),Seq(c1Col, c2Col),Seq(rCol))


// 4. Map RDD into custom data structure and create the table
val rddOut = rdd.map(s => outData(s._1, s._2(0), s._2(1), s._3))
rddOut.saveAsCassandraTableEx(table, SomeColumns("col1", "col2", "col3", "col4"))

Akka and Kafka

A connector is available for Scala 2.11 at Maven Central in the following coordinates:

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.11-M4"

This is a producer settings example:

import akka.kafka._
import akka.kafka.scaladsl._
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.serialization.ByteArraySerializer
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer).withBootstrapServers("localhost:9092")

The following is a produce messages example:

Source(1 to 10000)
  .map(_.toString)
  .map(elem => new ProducerRecord[Array[Byte], String]("topic1", elem))
  .to(Producer.plainSink(producerSettings))

This is an example of produce messages in a flow:

Source(1 to 10000).map(elem => ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic1", elem.toString), elem))
    .via(Producer.flow(producerSettings))
    .map { result =>
      val record = result.message.record
      println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value} (${result.message.passThrough}")
      result
    }

This is a consumer settings example:

import akka.kafka._
import akka.kafka.scaladsl._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig


val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
  .withBootstrapServers("localhost:9092")
  .withGroupId("group1")
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

The following database example shows consumer messages and stores a representation, including offset:

db.loadOffset().foreach { fromOffset =>
        val subscription = Subscriptions.assignmentWithOffset(new TopicPartition("topic1", 1) -> fromOffset)
    Consumer.plainSource(consumerSettings, subscription)
      .mapAsync(1)(db.save)}

This is a consume messages at-most-once example:

Consumer.atMostOnceSource(consumerSettings.withClientId("client1"), Subscriptions.topics("topic1"))
    .mapAsync(1) { record =>
      rocket.launch(record.value)
    }

This is a consume messages at-least-once example:

Consumer.committableSource(consumerSettings.withClientId("client1"), Subscriptions.topics("topic1"))
    .mapAsync(1) { msg =>
      db.update(msg.value).flatMap(_ => msg.committableOffset.commitScaladsl())
    }

This is a connect a consumer to a producer example:

Consumer.committableSource(consumerSettings.withClientId("client1"))
    .map(msg =>
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.value), msg.committableOffset))
    .to(Producer.commitableSink(producerSettings))

This is a consume messages at-least-once and commit in batches example:

Consumer.committableSource(consumerSettings.withClientId("client1"), Subscriptions.topics("topic1"))
    .mapAsync(1) { msg =>
      db.update(msg.value).map(_ => msg.committableOffset)
    }
    .batch(max = 10, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
      batch.updated(elem)
    }
    .mapAsync(1)(_.commitScaladsl())

Here is a reusable Kafka consumer example:

//Consumer is represented by actor
//Create new consumer
val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings))


//Manually assign topic partition to it
val stream1 = Consumer
    .plainExternalSource[Array[Byte], String](consumer, Subscriptions.assignment(new TopicPartition("topic1", 1)))
    .via(business)
    .to(Sink.ignore)


//Manually assign another topic partition
val stream2 = Consumer
    .plainExternalSource[Array[Byte], String](consumer, Subscriptions.assignment(new TopicPartition("topic1", 2)))
    .via(business)
    .to(Sink.ignore)

This is a consumer group example:

//Consumer group represented as Source[(TopicPartition, Source[Messages])]
val consumerGroup = Consumer.committablePartitionedSource(consumerSettings.withClientId("client1"), Subscriptions.topics("topic1"))
  //Process each assigned partition separately
  consumerGroup.map {
    case (topicPartition, source) =>
      source
        .via(business)
        .toMat(Sink.ignore)(Keep.both)
        .run()
  }.mapAsyncUnordered(maxPartitions)(_._2)

Here is a use case:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import com.softwaremill.react.kafka.{ProducerMessage, ConsumerProperties, ProducerProperties, ReactiveKafka}
import org.reactivestreams.{ Publisher, Subscriber }


implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()


val kafka = new ReactiveKafka()
val publisher: Publisher[StringConsumerRecord] = kafka.consume(ConsumerProperties(
 bootstrapServers = "localhost:9092",
 topic = "lowercaseStrings",
 groupId = "groupName",
 valueDeserializer = new StringDeserializer()
))
val subscriber: Subscriber[StringProducerMessage] = kafka.publish(ProducerProperties(
  bootstrapServers = "localhost:9092",
  topic = "uppercaseStrings",
  valueSerializer = new StringSerializer()
))


Source.fromPublisher(publisher).map(m => ProducerMessage(m.value().toUpperCase))
  .to(Sink.fromSubscriber(subscriber)).run()

Akka and Cassandra

Let’s use the DataStacks Cassandra driver and Akka to build an application that downloads tweets and then stores their id, text, name, and date in a Cassandra table. This shows you how to build a simple Akka application with just a few actors, how to use Akka I/O to make HTTP requests, and how to store the data in Cassandra.

Let’s begin by constructing the core of our system. It contains three actors: two that interact with the database and one that downloads the tweets. TwitterReadActor reads from the cluster, TweetWriteActor writes into the cluster, and TweetScanActor downloads the tweets and passes them to TweetWriteActor to be written.

class TweetReadActor(cluster: Cluster) extends Actor {   ... }

class TweetWriterActor(cluster: Cluster) extends Actor {   ... }

class TweetScanActor(tweetWrite: ActorRef, queryUrl: String => String) extends Actor { ... }

The constructor parameter of the read and write actors is Cassandra’s Cluster instance. The scan actor takes an ActorRef of the write actor and a function that, given a String query, can construct the query URL to download the tweets. To construct our application, we have to instantiate the actors in the right sequence, as follows:

val system = ActorSystem()
def queryUrl(query: String): String = ???
val cluster: Cluster = ???
val reader  = system.actorOf(Props(new TweetReaderActor(cluster)))
val writer  = system.actorOf(Props(new TweetWriterActor(cluster)))
val scanner = system.actorOf(Props(new TweetScannerActor(writer, queryUrl)))

Writing to Cassandra

Now that we have the structure, we can take a look at TwitterWriterActor. It receives instances of Tweet and writes to the tweets keyspace in Cassandra.

class TweetWriterActor(cluster: Cluster) extends Actor {
  val session = cluster.connect(Keyspaces.akkaCassandra)
  val preparedStatement = session.prepare("INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);")


  def receive: Receive = {
    case tweets: List[Tweet] =>
    case tweet: Tweet        =>
  }
}

To store the tweets, we need to connect to the correct keyspace, which gives us the Cassandra session. Trying to be as efficient as possible, we will take advantage of Cassandra’s PreparedStatements and BoundStatements. The PreparedStatement is a pre-chewed CQL statement, a BoundStatement is a prepared statement whose parameter values are set.

class TweetWriterActor(cluster: Cluster) extends Actor {
  val session = cluster.connect(Keyspaces.akkaCassandra)
  val preparedStatement = session.prepare("INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);")


  def saveTweet(tweet: Tweet): Unit =
    session.executeAsync(preparedStatement.bind(tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt))


  def receive: Receive = {
    case tweets: List[Tweet] =>
    case tweet: Tweet        =>
  }
}

The only thing that remains to be done is to use it in the receive function.

class TweetWriterActor(cluster: Cluster) extends Actor {
  val session = cluster.connect(Keyspaces.akkaCassandra)
  val preparedStatement = session.prepare("INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);")


  def saveTweet(tweet: Tweet): Unit =
    session.executeAsync(preparedStatement.bind(tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt))


  def receive: Receive = {
    case tweets: List[Tweet] => tweets foreach saveTweet
    case tweet: Tweet        => saveTweet(tweet)
  }
}

We now have the code that saves instances of Tweet to the keyspace in our Cassandra cluster.

Reading from Cassandra

Reading the data is ever so slightly more complex. We need to be able to construct Cassandra queries; then, given a Cassandra row, we need to be able to turn it into our Tweet object. We want to take advantage of the asynchronous nature of the Cassandra driver.

object TweetReaderActor {
  case class FindAll(maximum: Int = 100)
  case object CountAll
}


class TweetReaderActor(cluster: Cluster) extends Actor {
  val session = cluster.connect(Keyspaces.akkaCassandra)
  val countAll  = new BoundStatement(session.prepare("select count(*) from tweets;"))


  def receive: Receive = {
    case FindAll(maximum)  =>
      // reply with List[Tweet]
    case CountAll =>
      // reply with Long
  }
}

We have defined the FindAll and CountAll messages that our actor will react to. We have also left in the code that gives us the session and then used the session to construct a BoundStatement that counts all rows. Next up, we need to be able to construct an instance of Tweet given a row.

class TweetReaderActor(cluster: Cluster) extends Actor {
  ...
  def buildTweet(r: Row): Tweet = {
    val id = r.getString("key")
    val user = r.getString("user_user")
    val text = r.getString("text")
    val createdAt = r.getDate("createdat")
    Tweet(id, user, text, createdAt)
  }
  ...
}

We simply pick the values of the columns in the row and use them to make an instance of Tweet. We would like to asynchronously execute some query, map the rows returned from that query execution to turn them into the tweets, and then pipe the result to the sender.

class TweetReaderActor(cluster: Cluster) extends Actor {
  val session = cluster.connect(Keyspaces.akkaCassandra)
  val countAll  = new BoundStatement(session.prepare("select count(*) from tweets;"))


  import scala.collection.JavaConversions._
  import cassandra.resultset._
  import context.dispatcher
  import akka.pattern.pipe


  def buildTweet(r: Row): Tweet = {...}

  def receive: Receive = {
    case FindAll(maximum)  =>
      val query = QueryBuilder.select().all().from(Keyspaces.akkaCassandra, "tweets").limit(maximum)
      session.executeAsync(query) map(_.all().map(buildTweet).toList) pipeTo sender
    case CountAll =>
      session.executeAsync(countAll) map(_.one.getLong(0)) pipeTo sender
  }
}

We construct the query using Cassandra’s QueryBuilder. We call the executeAsync method on the session, which returns ResultSetFuture. Using implicit conversion in cassandra.resultset._, we turn the ResultSetFuture into Scala’s Future[ResultSet]. This allows us to use the Future.map method to turn the ResultSet into List[Tweet].

Calling the session.executeAsync(query) map expects as its parameter a function from ResultSet to some type B. In our case, B is List[Tweet]. The ResultSet contains the all() method, which returns java.util.List[Row]. To be able to map over java.util.List[Row], we need to turn it into the Scala List[Row]. To do so, we bring in the implicit conversions in scala.collection.JavaConversions. And now, we can complete the parameter of the Future.map function.

session.executeAsync gives us Future[List[Tweet]], which is tantalizingly close to what we need. We do not want to block for the result, and we don’t use the onSuccess function, because all that it would do is pass on the result to the sender. So, instead, we pipe the success of the future to the sender. That completes the picture, explaining the entire session.executeAsync(query) map(_.all().map(buildTweet).toList) pipeTo sender line.

Connecting to Cassandra

We need to explain where the cluster value comes from. Thinking about the system you are writing, you may need to have different values of cluster for tests and for the main system. Moreover, the test cluster will most likely need some special setup. You simply define that there is a CassandraCluster trait that returns the cluster and to give implementations that do the right thing: one that loads the configuration from the ActorSystem’s configuration and one that is hard-coded to be used in tests.

trait CassandraCluster {
  def cluster: Cluster
}

The configuration-based implementation and the test configuration differ only in the values they use to make the Cluster instance.

// in src/scala/main
trait ConfigCassandraCluster extends CassandraCluster {
  def system: ActorSystem


  private def config = system.settings.config

  import scala.collection.JavaConversions._
  private val cassandraConfig = config.getConfig("akka-cassandra.main.db.cassandra")
  private val port = cassandraConfig.getInt("port")
  private val hosts = cassandraConfig.getStringList("hosts").toList


  lazy val cluster: Cluster =
    Cluster.builder().
      addContactPoints(hosts: _*).
      withCompression(ProtocolOptions.Compression.SNAPPY).
      withPort(port).
      build()
}


// in src/scala/test
trait TestCassandraCluster extends CassandraCluster {
  def system: ActorSystem


  private def config = system.settings.config

  import scala.collection.JavaConversions._
  private val cassandraConfig = config.getConfig("akka-cassandra.test.db.cassandra")
  private val port = cassandraConfig.getInt("port")
  private val hosts = cassandraConfig.getStringList("hosts").toList


  lazy val cluster: Cluster =
    Cluster.builder().
      addContactPoints(hosts: _*).
      withPort(port).
      withCompression(ProtocolOptions.Compression.SNAPPY).
      build()


}

This allows you to mix in the appropriate trait and get the properly configured cluster. You want to have the cluster in a well-known state, so you create the CleanCassandra trait that resets the cluster given by a CassandraCluster.cluster.

trait CleanCassandra extends SpecificationStructure {
  this: CassandraCluster =>


  private def runClq(session: Session, file: File): Unit = {
    val query = Source.fromFile(file).mkString
    query.split(";").foreach(session.execute)
  }


  private def runAllClqs(): Unit = {
    val session = cluster.connect(Keyspaces.akkaCassandra)
    val uri = getClass.getResource("/").toURI
    new File(uri).listFiles().foreach { file =>
      if (file.getName.endsWith(".cql")) runClq(session, file)
    }
    session.shutdown()
  }


  override def map(fs: => Fragments) = super.map(fs) insert Step(runAllClqs())
}

When you mix in this trait into your test, it registers the runAllClqs() steps to be executed before all other steps in the test.

Scanning Tweets

Now that you know that you can safely store and retrieve the tweets from Cassandra, you need to write the component that is going to download them. In our system, this is the TweetScannerActor that receives a message of type String, and it performs the HTTP request to download the tweets.

class TweetScannerActor(tweetWrite: ActorRef, queryUrl: String => String)
  extends Actor with TweetMarshaller {


  import context.dispatcher
  import akka.pattern.pipe


  private val pipeline = sendReceive ∼> unmarshal[List[Tweet]]

  def receive: Receive = {
    case query: String => pipeline(Get(queryUrl(query))) pipeTo tweetWrite
  }
}
trait TweetMarshaller {
  type Tweets = List[Tweet]


  implicit object TweetUnmarshaller extends Unmarshaller[Tweets] {

    val dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy")

    def mkTweet(status: JsValue): Deserialized[Tweet] = {
      val json = status.asJsObject
      ...
    }


    def apply(entity: HttpEntity): Deserialized[Tweets] = {
      val json = JsonParser(entity.asString).asJsObject
      ...
    }
  }
}

The typeclass instance is the TweetUnmarshaller singleton, which extends Unmarshaller[Tweets]. Notice that we have also defined a type alias, Tweets = List[Tweet], by extending Unmarshaller[Tweets]. We must implement the apply method, which is applied to HttpEntity. It should return deserialized tweets or indicate an error.

Testing TweetScannerActor

To test the scanner fully, we would like to use a well-known service. But where do we get it? We can’t really use the live service, because the tweets keep changing. It seems that the only way is to implement a mock service and use it in our tests.

class TweetScanActorSpec extends TestKit(ActorSystem())
  with SpecificationLike with ImplicitSender {


  sequential

  val port = 12345
  def testQueryUrl(query: String) = s"http://localhost:$port/q=$query"


  val tweetScan = TestActorRef(new TweetScannerActor(testActor, testQueryUrl))

  "Getting all ‘typesafe’ tweets" >> {

    "should return more than 10 last entries" in {
      val twitterApi = TwitterApi(port)
      tweetScan ! "typesafe"
      Thread.sleep(1000)
      val tweets = expectMsgType[List[Tweet]]
      tweets.size mustEqual 4
      twitterApi.stop()
      success
    }
  }
}

When constructing TweetScannerActor, we give it the testActor and a function that returns URLs on localhost on some port. In the body of the example, we start the mock TwitterApi on the given port, and use TweetScannerActor to make the HTTP request. Because we gave the testActor the writer ActorRef, we should now be able to see the List[Tweet] that would have been sent to TweetWriterActor.

Because our mock tweet set contains four tweets, we can make the assertion that the list indeed contains four tweets.

Since the components in the system work as expected, we can therefore assemble the App object, which brings everything together in a command-line interface.

object Main extends App with ConfigCassandraCluster {
  import Commands._
  import akka.actor.ActorDSL._


  def twitterSearchProxy(query: String) = s"http://twitter-search-proxy.herokuapp.com/search/tweets?q=$query"

  implicit lazy val system = ActorSystem()
  val write = system.actorOf(Props(new TweetWriterActor(cluster)))
  val read = system.actorOf(Props(new TweetReaderActor(cluster)))
  val scan = system.actorOf(Props(new TweetScannerActor(write, twitterSearchProxy)))


  // we don't want to bother with the ``ask`` pattern, so
  // we set up sender that only prints out the responses to
  // be implicitly available for ``tell`` to pick up.
  implicit val _ = actor(new Act {
    become {
      case x => println(">>> " + x)
    }
  })


  @tailrec
  private def commandLoop(): Unit = {
    Console.readLine() match {
      case QuitCommand                => return
      case ScanCommand(query)         => scan ! query.toString


      case ListCommand(count)         => read ! FindAll(count.toInt)
      case CountCommand               => read ! CountAll


      case _                          => return
    }


    commandLoop()
  }


  // start processing the commands
  commandLoop()


  // when done, stop the ActorSystem
  system.shutdown()


}

We have the main commandLoop() function, which reads the line from standard input, matches it against the commands, and sends the appropriate messages to the right actors. It also mixes in the “real” source of the Cassandra cluster values and specifies the live function that constructs the URL to retrieve the tweets.

Akka and Spark

We start developing Spark Streaming application by creating a SparkConf that’s followed by a StreamingContext.

val conf = new SparkConf(false) // skip loading external settings
  .setMaster("local[*]") // run locally with enough threads
  .setAppName("Spark Streaming with Scala and Akka") // name in Spark web UI
  .set("spark.logConf", "true")
  .set("spark.driver.port", s"$driverPort")
  .set("spark.driver.host", s"$driverHost")
  .set("spark.akka.logLifecycleEvents", "true")
val ssc = new StreamingContext(conf, Seconds(1))

This gives a context to access the actor system that is of type ReceiverInputDStream.

val actorName = "helloer"
val actorStream: ReceiverInputDStream[String] = ssc.actorStream[String](Props[Helloer], actorName)

DStream lets you define a high-level processing pipeline in Spark Streaming.

actorStream.print()

In the preceding case, the print() method is going to print the first ten elements of each RDD generated in this DStream. Nothing happens until start() is executed.

ssc.start()

With the context up and running, the code connects to an Akka remote actor system in Spark Streaming that hosts the helloer actor and sends messages that, as the preceding code shows, display them all to standard output.

import scala.concurrent.duration._
val actorSystem = SparkEnv.get.actorSystem
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/Supervisor0/$actorName"
val timeout = 100 seconds
val helloer = Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
helloer ! "Hello"
helloer ! "from"
helloer ! "Apache Spark (Streaming)"
helloer ! "and"
helloer ! "Akka"
helloer ! "and"
helloer ! "Scala"

Kafka and Cassandra

We need to use kafka-connect-cassandra , which is published by Tuplejump on Maven Central. It is defined as a dependency in the build file. Let’s looking at the following example, with SBT:

libraryDependencies += "com.tuplejump" %% "kafka-connect-cassandra" % "0.0.7"

This code polls Cassandra with a specific query. Using this, data can be fetched from Cassandra in two modes:

  • bulk

  • timestamp based

The modes change automatically based on the query, for example:

SELECT * FROM userlog ; //bulk

SELECT * FROM userlog WHERE ts > previousTime() ; //timestamp based

SELECT * FROM userlog WHERE ts = currentTime() ; //timestamp based

SELECT * FROM userlog WHERE ts >= previousTime() AND  ts <= currentTime() ; //timestamp based

Here, previousTime() and currentTime() are replaced before fetching the data.

CQL Types Supported

CQL Type

Schema Type

ASCII

STRING

VARCHAR

STRING

TEXT

STRING

BIGINT

INT64

COUNTER

INT64

BOOLEAN

BOOLEAN

DECIMAL

FLOAT64

DOUBLE

FLOAT64

FLOAT

FLOAT32

TIMESTAMP

TIMESTAMP

The following types are not currently supported: BLOB, INET, UUID, TIMEUUID, LIST, SET, MAP, CUSTOM, UDT, TUPLE, SMALLINT, TINYINT, DATE, and TIME.

Cassandra Sink

Cassandra Sink stores Kafka SinkRecord in Cassandra tables. Currently, only the STRUCT type is supported in the SinkRecord. The STRUCT can have multiple fields with primitive field types. We assume one-to-one mapping between the column names in the Cassandra sink table and the field names.

The SinkRecords has this STRUCT value:

{
    'id': 1,
    'username': 'user1',
    'text': 'This is my first tweet'
}

The library doesn’t create the Cassandra tables; users are expected to create them before starting the sink.

Summary

This chapter reviewed the connectors among all the SMACK stack technologies. The Spark and Kafka connection was explained in the Chapter 8. Apache Mesos integration was explained in Chapter 7. We end this book with a brief fast data glossary for you to consult if you need the definition of a specific term.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset