Day 2: Building a Streaming Data Pipeline

On Day 1, we made some solid progress on understanding the core concepts of DynamoDB, from how DynamoDB partitions data to the data types that it supports to secondary indexes. We worked through some simple table management commands and some basic CRUD operations on that table to see the concepts in action.

We were exposed to some features that set DynamoDB apart from other, more strict key-value stores, but those features are mostly garnishes. The main dish is DynamoDB’s unique blend of extreme scalability, predictably solid performance as you scale out, and freedom from operational burdens.

On Day 2, we’ll build something that can take full advantage of those features in a way that we couldn’t in our Day 1 CRUD exercises. We’ll build a streaming data pipeline that pumps sensor data into DynamoDB (we’ll use a Ruby script to act as mock sensors). Those mock sensors will populate our table with three pieces of data per “reading”:

  • An identifier for the sensor—for example, sensor-1 or temp-a1b2c3
  • A timestamp for the reading in Unix time (this will allow for easier sorting)
  • The current temperature picked up by the sensor—for example, 81.4 or 73.2

But instead of writing to DynamoDB directly, we’ll use a streaming data service called Kinesis that’s built for handling massive write throughput; messages passing through Kinesis will be written to DynamoDB using functions deployed to AWS’s Lambda service.

It may seem a bit odd to focus on connecting AWS services in a chapter devoted to DynamoDB. But it’s important to note that cloud databases are rarely used as standalone tools. Cloud providers typically offer lots and lots of services that can run in the same datacenter as your database, built to supplement one another. To use multiple services together is to go with the grain of cloud computing.

A Data Model for Sensor Data

The data pipeline that we build later today will be capable of handling lots of streaming data, to the tune of many megabytes per second. But before we open up that pipeline, we need to make some big choices about our data model. For reasons we went over on Day 1, getting data modeling right is everything in DynamoDB and any missteps can get you into trouble, especially in a big production system.

Getting Our Keys Right

Keys are extremely important in DynamoDB because they not only act as “addresses” for items, as in systems like Redis, in DynamoDB they also determine how your data is distributed between partitions, which in turn affects the write/read performance of the table.

We’ll call our DynamoDB table SensorData, and it will consist of the following columns:

  • The hash key will be a ReadingId attribute that will be randomly generated. This will ensure that our partition keys are evenly distributed across partitions. There will be no range key, as each item will have a different partition key; range keys can only sort across items with the same partition key, and in this case a range key for a single partition key would be nonsensical.

  • The ID of the device emitting the data (sensor1, factory-sensor-1337, and so on).

  • A timestamp number expressing the current Unix time.

  • The temperature for the reading.

When creating DynamoDB tables via the command line, you can either use flags (as for the previous ShoppingCart table) or you can use a JSON specification. The spec for our SensorData table will be complex enough that using CLI flags will be too unwieldy. So we’ll do it this way instead:

 $ ​​aws​​ ​​dynamodb​​ ​​create-table​​ ​​
  ​​--cli-input-json​​ ​​file://sensor-data-table.json

And here’s the JSON spec itself:

 {
 "TableName"​: ​"SensorData"​,
 "KeySchema"​: [
  {
 "AttributeName"​: ​"SensorId"​,
 "KeyType"​: ​"HASH"
  },
  {
 "AttributeName"​: ​"CurrentTime"​,
 "KeyType"​: ​"RANGE"
  }
  ],
 "AttributeDefinitions"​: [
  {
 "AttributeName"​: ​"SensorId"​,
 "AttributeType"​: ​"S"
  },
  {
 "AttributeName"​: ​"CurrentTime"​,
 "AttributeType"​: ​"N"
  },
  {
 "AttributeName"​: ​"Temperature"​,
 "AttributeType"​: ​"N"
  }
  ],
 "LocalSecondaryIndexes"​: [
  {
 "IndexName"​: ​"TemperatureIndex"​,
 "KeySchema"​: [
  {
 "AttributeName"​: ​"SensorId"​,
 "KeyType"​: ​"HASH"
  },
  {
 "AttributeName"​: ​"Temperature"​,
 "KeyType"​: ​"RANGE"
  }
  ],
 "Projection"​: {
 "ProjectionType"​: ​"ALL"
  }
  }
  ],
 "ProvisionedThroughput"​: {
 "ReadCapacityUnits"​: 2,
 "WriteCapacityUnits"​: 2
  }
 }

Keep in mind that this data model is highly specific to our use case. Many use cases may involve more complex column structures, more secondary indexes, more attributes, and so on. The important thing is making sure that you go through a similar decision-making process whenever you use DynamoDB; missteps here will cost you, and we do mean actual money.

Navigating Trade-offs in Performance Tuning

As we’ve stressed before, you simply shouldn’t use DynamoDB if you’re after anything but extremely good performance at massive scale. But as with any complex system, performance and scalability in DynamoDB are not guaranteed. There’s a wide variety of missteps that will ensure that you have a deeply sub-optimal experience, spend too much money, or both.

On Day 1, we talked about performance optimization from the standpoint of data modeling and how the distribution of primary keys is very important. In this section, the focus will be more operational. DynamoDB provides a small handful of knobs you can adjust that can have a big impact on performance. These knobs are the closest thing you get to “ops” in DynamoDB, but they aren’t terribly complex. You just need to keep a few basic principles in view.

Throughput is the central concept here. When we created the ShoppingCart table on Day 1, we used this command but didn’t provide much explanation of the --provisioned-throughput flag:

 $ ​​aws​​ ​​dynamodb​​ ​​create-table​​ ​​--table-name​​ ​​ShoppingCart​​ ​​
  ​​--attribute-definitions​​ ​​AttributeName=ItemName,AttributeType=S​​ ​​
  ​​--key-schema​​ ​​AttributeName=ItemName,KeyType=HASH​​ ​​
  ​​--provisioned-throughput​​ ​​ReadCapacityUnits=1,WriteCapacityUnits=1

Read capacity and write capacity units are handled differently in DynamoDB. Write capacity units (WCUs) are measured in units of 1 KB per second while read capacity units (RCUs) are measured in units of 4 KB per second. You can set both read and write throughput on a per-table basis. For workloads that are more write heavy, you could provision just one RCU and many WCUs, or vice versa for read-heavy workloads.

We won’t delve too far into DynamoDB throughput tuning here. Just keep a few things in mind:

  • Strive to match the RCUs and WCUs in a table to balance between read and write intensity. Perform regular checks—Amazon offers numerous services to help with this, such as CloudWatch—to ensure that you haven’t over- or under-provisioned throughput on either the read or the write side. Perhaps even set up an automated system to re-provision when necessary.

  • Considerations about read and write throughput include secondary indexes, both local and global. Throughput should be provisioned based on the size of index entries, not the size of the actual table items.

  • Strongly consistent reads are about twice as expensive as eventually consistent reads.

If you’re ever concerned about costs, the DynamoDB console on the AWS website provides an interface that you can use to get cost estimates, as shown in the figure that follows.

images/dynamodb-capacity-calculator.png

Throughput settings for a table are not fixed for all time and can be updated at will. For our purposes here, 1 unit of RCU and WCU will suffice for our table. But we couldn’t let you proceed beyond this section without mentioning this second, essential element of DynamoDB performance.

Now that we have a SensorData table ready to accept writes, we can get back to building our streaming sensor data pipeline.

It’s a Streaming World

Data streams are best understood in contrast to data batches. When you process a batch—maybe a huge CSV file or a massive Postgres table—you take a well-defined mass of data and apply some kind of logic to it, for example selecting all values that satisfy a WHERE clause in an SQL query or parsing all the values in a column in an Excel spreadsheet.

When you process a stream of data, however, you don’t know in advance how many items will pass through the stream for processing. Instead, you design some kind of processing logic, open up a pipe, and handle things as they come through. Although there’s still plenty of room for batch processing in today’s data landscape, streaming models have become more popular with the predominance of realtime inputs from devices such as smartphones and, in our case, sensors.

A Realtime, Streaming, Functional Data Pipeline

For today’s practical exercise, we’ll use a combination of three AWS services—DynamoDB, Lambda, and Kinesis—to build a data pipeline that’s capable of accepting lots of incoming data, processing it, and finally storing it in DynamoDB.

Amazon’s Kinesis is a service that enables you to manage data streams that you can pipe to any number of destinations, from S3 buckets to Redshift warehouses to, in our case, AWS Lambda functions (more on that in a minute). You can think of Kinesis as a managed, cloud-only version of Apache Kafka, and if you’ve worked with Kafka then some of the core primitives driving Kinesis may be familiar to you already: topics, producers, consumers, records, and so on. Not a perfect analogy, but it should suffice for our exercise.

Lambda is a Functions-as-a-Service (FaaS) platform that enables you to manage functions that run in Amazon datacenters without any concern for running servers or managing infrastructure. The popularity of Lambda has spawned the so-called “serverless” paradigm of computing, which is a bit of misnomer in that Lambda functions do, of course, run on servers, but the fact that you simply write functions that process inputs however you’d like and upload them to AWS makes it at least feel serverless.

What we build today will ultimately look like the figure.

images/dynamodb-kinesis-lambda.png

Data will flow through a Kinesis stream to a Lambda function that then processes the data. The Lambda processing step writes the incoming data to a DynamoDB table. Once we’ve built this, we’ll feed some example data into Kinesis using the aws CLI tool to test things and then start pumping sensor data in.

Before you start going through the CLI examples here, do two things. First, cd into the code/dynamodb folder for the DynamoDB chapter. Then, create a Kinesis stream using the aws tool:

 $ ​​export​​ ​​STREAM_NAME=temperature-sensor-data
 $ ​​aws​​ ​​kinesis​​ ​​create-stream​​ ​​
  ​​--stream-name​​ ​​${STREAM_NAME}​​ ​​
  ​​--shard-count​​ ​​1

When creating this stream, we gave it a name and specified a shard count. The shard count essentially determines how much parallel processing power you want your stream to have. More shards means more power but also more cost, so we’ll use just one shard for our example here as it’s all we’ll really need. Now let’s have a look at how AWS sees our stream:

 $ ​​aws​​ ​​kinesis​​ ​​describe-stream​​ ​​
  ​​--stream-name​​ ​​${STREAM_NAME}
 {
  "StreamDescription": {
  "RetentionPeriodHours": 24,
  "StreamName": "iot-temperature-data",
  "Shards": [],
  "StreamARN": "arn:aws:kinesis:...:stream/iot-temperature-data",
  "EnhancedMonitoring": [
  {
  "ShardLevelMetrics": []
  }
  ],
  "StreamStatus": "CREATING"
  }
 }

At first, the status of the stream will be CREATING, but within a few seconds the status should change to ACTIVE. Once that happens, you can write some data to the stream. Each record that you write to Kinesis needs to contain two things: a partition key and a binary large object (BLOB) of data. The partition key determines which shard the record is written to (as with DynamoDB’s partitioning system). Set the STREAM_ARN environment variable to the StreamARN shown in the output, which should begin with arn:aws:kinesis.

Here’s an example write to the temperature-sensor-data stream that we just created:

 $ ​​aws​​ ​​kinesis​​ ​​put-record​​ ​​
  ​​--stream-name​​ ​​${STREAM_NAME}​​ ​​
  ​​--partition-key​​ ​​sensor-data​​ ​​
  ​​--data​​ ​​"Baby's first Kinesis record"
 {
  "ShardId": "shardId-000000000000",
  "SequenceNumber": "4957192301...3577493926"
 }

Don’t worry too much about partitions, shards, and the returned JSON here, as we won’t delve too deeply into Kinesis in this book. The important thing to know for now is that our stream is ready to accept incoming records and pass them along to the Lambda function that we’ll create later.

Now one essential piece of our pipeline is in place, but there are more puzzle pieces we need to connect. Kinesis is a powerful system but it’s really just a fancy pipe. It doesn’t actually process or write data, so we can’t use it to actually store items in DynamoDB. We’ll need to create a processing layer that ingests records from Kinesis and actually does something with them.

Up until very recently, the default way to create this kind of processing layer in AWS would be to write an application that can handle incoming streams of data and run that application on Elastic Compute Cloud (EC2) servers. That’s a perfectly reasonable approach, but it requires you to fire up and manage servers. We can avoid that by using Lambda, which requires us to simply write some code, run some basic CLI commands, and let AWS do the work.

First, we need to write our function. Lambda supports several languages for Lambda functions, but let’s use JavaScript because its callback-based logic is a natural fit for Lambda (and perhaps even an inspiration for Lambda!). Here is the code for our handler:

 var​ AWS = require(​'aws-sdk'​);
 var​ DynamoDB = ​new​ AWS.DynamoDB({
  apiVersion: ​'2012-08-10'​,
  region: ​'us-east-1'​,
 });
 
 exports.kinesisHandler = ​function​(event, context, callback) {
 // We only need to handle one record at a time
 var​ kinesisRecord = event.Records[0];
 
 // The data payload is base 64 encoded and needs to be decoded to a string
 var​ data =
  Buffer.​from​(kinesisRecord.kinesis.data, ​'base64'​).toString(​'ascii'​);
 // Create a JSON object out of that string
 var​ obj = JSON.parse(data);
 var​ sensorId = obj.sensor_id,
  currentTime = obj.current_time,
  temperature = obj.temperature;
 
 // Define the item to write to DynamoDB
 var​ item = {
  TableName: ​"SensorData"​,
  Item: {
  SensorId: {
  S: sensorId
  },
  CurrentTime: {
 // Remember that all numbers need to be input as strings
  N: currentTime.toString()
  },
  Temperature: {
  N: temperature.toString()
  }
  }
  };
 
 // Perform a put operation, logging both successes and failures
  DynamoDB.putItem(item, ​function​(err, data) {
 if​ (err) {
  console.log(err, err.stack);
  callback(err.stack);
  } ​else​ {
  console.log(data);
  callback(​null​, data);
  }
  });
 }

When writing any Lambda handler function, you are given three JavaScript objects to work with when your function is triggered:

  • The event object contains the data that has been passed to the function, in our case the JSON object written to Kinesis.

  • The context object holds information about the environment in which the function is running.

  • The callback object signals that the operation is finished. If called with a single argument, that means that the function returns an error; if called with null and a string, then the function is deemed successful and the string represents the success message.

The event object that our function receives from Kinesis is a JSON object with a Records field that will hold the data for each record. In our pipeline, Kinesis will only deliver a single record at a time. For each record that arrives, we’ll put an item to our SensorData table.

Before we can stitch Kinesis and Lambda together, however, we need to create an AWS security role that enables us to do that. Role management in AWS is handled by a service called Identity and Access Management, or IAM. If you’ve managed users, roles, and groups in database systems such as Postgres, the concepts here are quite similar. The following set of commands will:

  • Create the role using a JSON document that you can peruse in the lambda-kinesis-role.json file.

  • Attach a policy to that role that will enable Kinesis to pass data to Lambda.

  • Store the ARN for this role in an environment variable.

 $ ​​export​​ ​​IAM_ROLE_NAME=kinesis-lambda-dynamodb
 $ ​​aws​​ ​​iam​​ ​​create-role​​ ​​
  ​​--role-name​​ ​​${IAM_ROLE_NAME}​​ ​​
  ​​--assume-role-policy-document​​ ​​file://lambda-kinesis-role.json
 $ ​​aws​​ ​​iam​​ ​​attach-role-policy​​ ​​
  ​​--role-name​​ ​​${IAM_ROLE_NAME}​​ ​​
  ​​--policy-arn​​ ​​
  ​​arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
 $ ​​aws​​ ​​iam​​ ​​attach-role-policy​​ ​​
  ​​--role-name​​ ​​${IAM_ROLE_NAME}​​ ​​
  ​​--policy-arn​​ ​​
  ​​arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess

Now you need to get the ARN for that role. Run this command and set the ROLE_ARN environment variable to the proper Arn, which should begin with arn:aws:iam:

 $ ​​aws​​ ​​iam​​ ​​get-role​​ ​​--role-name​​ ​​${IAM_ROLE_NAME}

In order to upload our function to Lambda, we need to create a Zip file out of it and then upload the zipped payload using the create-function command:

 $ ​​zip​​ ​​ProcessKinesisRecords.zip​​ ​​ProcessKinesisRecords.js
 $ ​​aws​​ ​​lambda​​ ​​create-function​​ ​​
  ​​--region​​ ​​us-east-1​​ ​​
  ​​--function-name​​ ​​ProcessKinesisRecords​​ ​​
  ​​--zip-file​​ ​​fileb://ProcessKinesisRecords.zip​​ ​​
  ​​--role​​ ​​${ROLE_ARN}​​ ​​
  ​​--handler​​ ​​ProcessKinesisRecords.kinesisHandler​​ ​​
  ​​--runtime​​ ​​nodejs6.10
 {
  "CodeSha256": "LdmN2sMF5kwdZiRbIYAtdhs4J8pX39Qa6EhvGdGAcOQ=",
  "FunctionName": "kinesis-dynamodb-processor",
  "CodeSize": 530,
  "MemorySize": 128,
  "FunctionArn": "arn:aws:lambda:...",
  "Version": "$LATEST",
  "Role": "arn:aws:iam:...",
  "Timeout": 3,
  "LastModified": "2017-04-13T04:07:36.833+0000",
  "Handler": "kinesisHandler",
  "Runtime": "nodejs4.3",
  "Description": ""
 }

If you have any modifications that you want to make to the function on your own, you can update it at any time by creating a new Zip file and using the update-function-code command:

 $ ​​zip​​ ​​ProcessKinesisRecords.zip​​ ​​ProcessKinesisRecords.js
 $ ​​aws​​ ​​lambda​​ ​​update-function-code​​ ​​
  ​​--function-name​​ ​​ProcessKinesisRecords​​ ​​
  ​​--zip-file​​ ​​fileb://ProcessKinesisRecords.zip

We can run a test invocation of the new function using a mock input from a text file and storing the output in another text file (that you can peruse on your own):

 $ ​​aws​​ ​​lambda​​ ​​invoke​​ ​​
  ​​--invocation-type​​ ​​RequestResponse​​ ​​
  ​​--function-name​​ ​​ProcessKinesisRecords​​ ​​
  ​​--payload​​ ​​file://test-lambda-input.txt​​ ​​
  ​​lambda-output.txt
 {
  "StatusCode": 200
 }

Success! We’ve triggered a Lambda function using a mock data input payload mimicking a Kinesis stream. Lambda then handled the data, turned it into a DynamoDB item, and made a successful write. What’s missing now is that Kinesis isn’t yet able to trigger and pass data to our Lambda function. For that, we need to create a source mapping that tells AWS that we want our iot-temperature-data to trigger our ProcessKinesisRecords Lambda function whenever a record passes into the stream. This command will create that mapping:

 $ ​​aws​​ ​​lambda​​ ​​create-event-source-mapping​​ ​​
  ​​--function-name​​ ​​ProcessKinesisRecords​​ ​​
  ​​--event-source-arn​​ ​​${KINESIS_STREAM_ARN}​​ ​​
  ​​--starting-position​​ ​​LATEST
 {
  "UUID": "0f56a8c7-de6d-4a77-b536-9ec87be5a065",
  "StateTransitionReason": "User action",
  "LastModified": 1492057092.585,
  "BatchSize": 100,
  "EventSourceArn": "arn:aws:kinesis:...",
  "FunctionArn": "arn:aws:lambda:...",
  "State": "Creating",
  "LastProcessingResult": "No records processed"
 }

You can see a description of the event source mapping:

 $ ​​aws​​ ​​lambda​​ ​​list-event-source-mappings
 {
  "EventSourceMappings": [
  {
  "UUID": "0f56a8c7-de6d-4a77-b536-9ec87be5a065",
  "StateTransitionReason": "User action",
  "LastModified": 1492057140.0,
  "BatchSize": 100,
  "State": "Enabled",
  "FunctionArn": "arn:aws:lambda:...",
  "EventSourceArn": "arn:aws:kinesis:...",
  "LastProcessingResult": "No records processed"
  }
  ]
 }

If the value of State isn’t yet Enabled, wait a few seconds and that should change.

At this point, the pipeline is in place. Records that are written to the temperature-sensor-stream stream in Kinesis will be processed by a Lambda function, which will write the results to the SensorData table in DynamoDB. Just to be sure, let’s put a record to the stream and see what happens:

 $ ​​export​​ ​​DATA=$(
  echo '{
  "sensor_id":"sensor-1",
  "temperature":99.9,"current_time":123456789
  }' | base64)
 $ ​​aws​​ ​​kinesis​​ ​​put-record​​ ​​
  ​​--stream-name​​ ​​${STREAM_NAME}​​ ​​
  ​​--partition-key​​ ​​sensor-data​​ ​​
  ​​--data​​ ​​${DATA}

That should return a JSON object with a ShardId and SequenceNumber as before. But let’s see if that mock temperature sensor reading ended up in DynamoDB:

 $ ​​aws​​ ​​dynamodb​​ ​​scan​​ ​​--table-name​​ ​​SensorData
 {
  "Count": 1,
  "Items": [
  {
  "CurrentTime": {
  "N": "123456789"
  },
  "Temperature": {
  "N": "99.9"
  },
  "SensorId": {
  "S": "sensor-1"
  }
  }
  ]
 }

Success again! Our pipeline is now all set up and ready to handle sensor data inputs. On Day 3, we’ll begin pumping data into Kinesis and letting Lambda do its magic.

Day 2 Homework

Find

  1. Read some documentation for the DynamoDB Streams feature (not to be confused with Kinesis).[51] Think of a compelling use case for this feature.

  2. Find one or more DynamoDB client libraries for your favorite programming language. Explore how to perform CRUD operations using that library.

  3. DynamoDB supports object expiry using time to live (TTL). Find some documentation on TTL in DynamoDB and think of some use cases for it.

Do

  1. One way to improve the performance of Kinesis is to write records to different partitions. Find some documentation on partitioning in Kinesis and think of a stream partitioning scheme that would work with our sensor data model (rather than writing all records to the sensor-data partition).

  2. Modify various elements of our data pipeline—the SensorData table definition, the Lambda function, and so on—to enable sensors to write humidity-related data to the pipeline (as a percentage). Make sure that all of these components line up properly!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset