Day 3: Building an “Internet of Things” System Around DynamoDB

On Day 2, we delved into DynamoDB‘s streaming feature and turned our DynamoDB database from a fairly standard cloud storage mechanism into a much more dynamic processing system consisting of a Kinesis-Lambda-DynamoDB system. What we didn’t do on Day 2 was provide any data inputs beyond some CLI test runs. On Day 3, we’ll provide some “Internet of Things”-style data from mock sensors.

The term “Internet of Things” (aka IoT) is kind of like “Big Data” or “the Cloud.” Its use is controversial because there’s a lot of buzz surrounding it, and what it points to isn’t so much something that already exists as much as it is a set of emerging technological possibilities. The “things” in the expression can refer to everything from moisture sensors in your refrigerator to a heart-rate monitor you wear on your wrist—essentially anything that isn‘t a “normal” computer (laptop, smartphone, cloud server) and is capable of producing data and transmitting it to some part of the Internet.

The interesting thing about the IoT from the perspective of this book is that things like sensors can produce tons of data, especially when aggregating across many data-producing devices. Imagine a 50-acre vineyard with moisture and temperature sensors hooked up every few feet along the rows of vines, or a high-tech manufacturing facility where each machine in an assembly line is feeding all kinds of data to some centralized source. In both cases, these devices might be sending data every few hours or they might be sending it more than once a second. IoT data usually comes in small chunks (maybe just a few sensor readings at a time), but those small chunks can really add up.

DynamoDB is a good choice for an IoT database because it’s built to handle not just huge data sets but data sets whose size can’t be determined in advanced. The downside of DynamoDB, as we discussed before, is that it isn’t built for complex queries of the type you’d be able to run on a relational database. For our needs here, though, DynamoDB’s “key-value plus” querying system will do just fine. Our tables will be fairly simple, and range queries will enable us to use DynamoDB as a powerful timeseries database. Any processing logic we need to apply beyond this can happen on the application side if we need it to.

Mock Data Inputs

In a real sensor data setup, you would likely use a protocol like Message Queue Telemetry Transport (MQTT)[52] as a device messaging protocol to feed data into a streaming data pipeline like ours, perhaps in conjunction with a broker interface like Amazon’s IoT service.[53]

Here, we’ll do something a bit simpler that doesn’t require having actual sensor devices on hand. We’ll use a Ruby script that auto-generates temperature data and writes that data to our data pipeline as JSON.

 require ​'aws-sdk'
 require ​'random-walk'
 require ​'time'
 
 STREAM_NAME = ​'temperature-sensor-data'
 
 # Make sure that both a sensor ID and number of iterations are entered
 if​ ARGV.length != 2
  abort(​"Must specify a sensor ID as the first arg and N as the second"​)
 end
 
 @sensor_id = ARGV[0]
 @iterator_limit = ARGV[1].to_i
 
 # The Kinesis client object. Supply a different region if necessary
 @kinesis_client = Aws::Kinesis::Client.new(​region: ​​'us-east-1'​)
 
 # An array used to generate random walk values
 @temp_walk_array = RandomWalk.generate(6000..10000, @iterator_limit, 1)
 
 # The iterator starts at 0
 @iterator = 0
 
 def​ write_temp_reading_to_kinesis
 # Generate a random current temperature from the walk array
  current_temp = @temp_walk_array[@iterator] / 100.0
 
 # The JSON payload for the reading
  data = {
 :sensor_id​ => @sensor_id,
 :current_time​ => Time.now.to_i,
 :temperature​ => current_temp,
  }
 
 # The record to write to Kinesis
  kinesis_record = {
 :stream_name​ => STREAM_NAME,
 :data​ => data.to_json,
 # We'll use just a single partition key here
 :partition_key​ => ​'sensor-data'​,
  }
 
 # Write the record to Kinesis
  @kinesis_client.put_record(kinesis_record)
 
  puts ​"Sensor ​​#{​@sensor_id​}​​ sent a temperature reading of ​​#{​current_temp​}​​"
 
  @iterator += 1
 
 # Exit if script has iterated N times
 if​ @iterator == @iterator_limit
  puts ​"The sensor has gone offline"
  exit(0)
 end
 end
 
 while​ ​true
  write_temp_reading_to_kinesis
 # Pause 2 seconds before supplying another reading
  sleep 2
 end

In order to run the script, you’ll need to install the random-walk and aws-sdk gems:

 $ ​​gem​​ ​​install​​ ​​random-walk​​ ​​aws-sdk

This script requires you to specify a sensor identifier, such as sensor-1 or whatever you’d like, as well as the number of times that you’d like the script to write data to the pipeline. To make the temp-sensor-1 sensor write 1,000 times to the pipeline:

 $ ​​ruby​​ ​​upload-sensor-data.rb​​ ​​temp-sensor-1​​ ​​1000

A randomized temperature reading will be written to the pipeline every 2 seconds until it’s gone through the supplied number of iterations. The temperature readings will be between 60 and 100 degrees and based on a random walk pattern, which means that the readings will follow a lightly meandering path rather than being truly random. A four-step random walk pattern would produce a series like 47.2, 47.3, 47.7, 47.6 as opposed to something like 47.2, 68.9, 50.3, 32.1. The script will also supply a Unix timestamp to each reading (which we can later use to run range queries).

Open up multiple shell sessions and run the script for multiple device IDs. Or use this command, which will write 1,000 sensor readings each for 10 sensors labeled sensor-1, sensor-2, and so on.

 $ ​​for​​ ​​n​​ ​​in​​ ​​{1..10};​​ ​​do
  ruby upload-sensor-data.rb sensor-${n} 1000 &
  done

Let that run for as long as you’d like. You can shut down all the running mock sensors at any time:

 $ ​​pgrep​​ ​​-f​​ ​​upload-sensor-data​​ ​​|​​ ​​xargs​​ ​​kill​​ ​​-9

Scan the SensorData table a few times to see the number of items steadily increasing:

 $ ​​aws​​ ​​dynamodb​​ ​​scan​​ ​​--table-name​​ ​​SensorData

As the table fills up, scan operations will become increasingly slow. Now would be a good time to explore local secondary index queries. Remember that when we created the SensorData table we created a local secondary index for the Temperature attribute. Get two timestamp values for two temperature readings—the first and the 201st—like this:

 $ ​​T1=$(aws​​ ​​dynamodb​​ ​​scan​​ ​​--table-name​​ ​​SensorData​​ ​​
  ​​--query​​ ​​Items[0].CurrentTime.N​​ ​​|​​ ​​tr​​ ​​-d​​ ​​'"'​​)
 $ ​​T2=$(aws​​ ​​dynamodb​​ ​​scan​​ ​​--table-name​​ ​​SensorData​​ ​​
  ​​--query​​ ​​Items[200].CurrentTime.N​​ ​​|​​ ​​tr​​ ​​-d​​ ​​'"'​​)

Then run this query, substituting the values for T1 and T2 (make sure to use the larger of the two values for t2).

 $ ​​aws​​ ​​dynamodb​​ ​​query​​ ​​--table-name​​ ​​SensorData​​ ​​
  ​​--expression-attribute-values​​ '​​{
  ":t1": {"N": "..."},
  ":t2": {"N": "..."},
  ":sensorId": {"S": "sensor-1"}
  }'
  --key-condition-expression
  'SensorId = :sensorId AND CurrentTime BETWEEN :t1 AND :t2'
  --projection-expression 'Temperature'

That should return all the Temperature values for the sensor-1 sensor between the two timestamps (feel free to substitute a different name if you used a different naming scheme).

Now see how many items are currently in the table:

 $ ​​aws​​ ​​dynamodb​​ ​​scan​​ ​​--table-name​​ ​​SensorData​​ ​​
  ​​--query​​ ​​Count

At this point, you may be up to several hundreds of items. Let the count get above 1,000 or so and then stop the sensors using the kill -9 command we used before. At that point, there will be plenty of data in the SensorData table to move on to the next exercise.

An SQL Querying Interface

Our mock sensors have now streamed over 1,000 entries into our DynamoDB table. We’ve seen one way of accessing that data, which is fetching sensor readings based on a time range. This is a cool capability, but if we wanted to gather more meaningful metrics for our sensor data, we’d need a more powerful interface. What if you had, say, data analysts or climate scientists on your team that needed to perform much more complex queries or mathematical calculations using that sensor data?

One way that you can do this on AWS is to use plain old SQL. Now, don’t get too excited just yet, because you can’t perform SQL queries over DynamoDB tables directly—at least not yet—but what you can do instead is transfer data stored in DynamoDB tables into an S3 (Simple Storage Service) bucket using a service called Data Pipeline and then run SQL queries directly against that data using another AWS service called Athena.

Time for Another Pipeline

Amazon’s Data Pipeline service enables you to create batch jobs that efficiently move data between Amazon services (including S3, the Redshift data warehousing service, and many more). You can write your own pipeline logic from scratch or you can use a template for ease of use. Fortunately for us, there’s a predefined Export DynamoDB table to S3 pipeline definition that will make this very simple. Ultimately, our streaming pipeline plus querying interface will look like the architecture in the figure that follows.

images/dynamodb-kinesis-lambda-s3-athena.png

To begin, you’ll need to create a new S3 bucket to store JSON data from the SensorData table in DynamoDB. You can call this bucket whatever you’d like, so long as it’s globally unique (as in unique among all AWS users). Perhaps call it sensor-data, and add your local username and use the aws tool to create the bucket, like this:

 $ ​​export​​ ​​BUCKET_NAME=s3://sensor-data-$USER
 $ ​​aws​​ ​​s3​​ ​​mb​​ ​​${BUCKET_NAME}

To set up the pipeline, go to the Date Pipeline page in the AWS console[54] and select Create new pipeline, as you can see in the figure that follows.

images/dynamodb-data-pipeline-console.png

When the pipeline creation page pops up, do the following:

  • Give the pipeline any name you’d like.

  • Under Source, select Build using a template and then the Export DynamoDB table to S3 template.

  • Under Source DynamoDB table name , specify the SensorData table, and under Output s3 folder, input the name of the S3 bucket that you created.

  • Leave the DynamoDB read throughput ratio value as is and supply the AWS region (we’ve been using us-east-1 thus far in these examples but yours may differ).

  • Under Run in the Schedule section, select on pipeline activation. Disable logging in the next section and select Default under Security/Access.

Now click Activate. That will bring you back to the data pipeline console. The pipeline job will go through several phases, beginning with WAITING. Behind the scenes, Data Pipeline is actually spinning up, using, and finally shutting down an Elastic MapReduce cluster to perform the data transfer (think back to Day 3 of the HBase chapter when we spun up our own EMR cluster to run HBase). The job should finish within a few minutes. Once it’s done, every item in the SensorData table will be kept in a single file in S3, with each sensor reading as a JSON object on a separate line in the file.

Querying the Data Using Athena

Athena is an AWS service that enables you to query data files stored in S3 using plain SQL queries. We’ll use Athena to get some aggregate metrics out of the data stored in our SensorData table. Go to the Athena console[55] and click on Query Editor. This is the interface that you can use to create tables and run queries. Run this query to create a sensor_data table (substituting the appropriate name for the S3 bucket that you created).

 CREATE​ EXTERNAL ​TABLE​ sensor_data (
  sensorid struct<s:​string​>,
  currenttime struct<n:​bigint​>,
  temperature struct<n:​float​>
 )
 ROW​ FORMAT SERDE ​'org.openx.data.jsonserde.JsonSerDe'
 with​ serdeproperties (​'paths'​=​'SensorId,CurrentTime,Temperature'​)
 LOCATION ​'s3://YOUR-S3-BUCKET/'​;

A few things to note here. First, we’re using data structures called structs as columns. That’s because the JSON object for each item in our DynamoDB table looks like this:

 {
 "SensorId"​: {
 "S"​: ​"..."
  },
 "CurrentTime"​: {
 "N"​: ​"..."
  },
 "Temperature"​: {
 "N"​: ​"..."
  }
 }

Structs will enable us to easily handle this nested JSON data. Second, the ROW FORMAT information simply specifies that we’re querying files storing JSON as well as which JSON fields we’re interested in (SensorId and so on). Finally, the LOCATION points to our S3 bucket (don’t forget the slash at the end).

The data from DynamoDB should all be in one file in a subfolder of the S3 bucket you created. The name of the subfolder is based on the current date and time. To see the name of that subfolder:

 $ ​​aws​​ ​​s3​​ ​​ls​​ ​​${BUCKET_NAME}/

Now, list the contents of that subfolder:

 $ ​​aws​​ ​​s3​​ ​​ls​​ ​​${BUCKET_NAME}/SUBFOLDER/

You’ll see that there are three files: a data file with a name like 5d2bf1ba-829e-402f-a9a4-2849b974be01, a manifest file, and a _SUCCESS file. Delete the latter two files, as they only contain metadata from Data Pipeline and will interfere with our Athena queries:

 $ ​​aws​​ ​​s3​​ ​​rm​​ ​​${BUCKET_NAME}/SUBFOLDER/manifest
 $ ​​aws​​ ​​s3​​ ​​rm​​ ​​${BUCKET_NAME}/SUBFOLDER/_SUCCESS

Now come back to the Athena console because you’re ready to run queries against the (now-sanitized) S3 folder. Let’s start with fetching 10 rows at random (see the figure that follows):

 SELECT​ * ​FROM​ sensor_data ​LIMIT​ 10;
images/dynamodb-sql-results.png

Here are some other queries that you can run:

 /* List the sensors for which you have data */
 SELECT​ ​DISTINCT​ sensorid.s ​AS​ SensorId ​FROM​ sensor_data;
 /* Get the number of readings from a specific sensor */
 SELECT​ COUNT(*) ​AS​ NumberOfReadings ​FROM​ sensor_data
 WHERE​ sensorid.s = ​'some-sensor-id'​;
 /* Find the average temperature measured by a specific sensor */
 SELECT​ ​AVG​(temperature.n) ​AS​ AverageTemp ​FROM​ sensor_data
 WHERE​ sensorid.s = ​'some-sensor-id'​;
 /* Find the average temperature across all sensors */
 SELECT​ ​AVG​(temperature.n) ​AS​ AverageTempAllSensors ​FROM​ sensor_data;
 /* Find the average temperature from a specific sensor */
 SELECT​ ​AVG​(temperature.n) ​AS​ AverageTemp ​FROM​ sensor_data
 WHERE​ sensorid.s = ​'some-sensor-id'​;
 /* Find the maximum temperature across all sensors */
 SELECT​ MAX(temperature.n) ​AS​ MaxTemp ​FROM​ sensor_data;
 /* Find the standard deviation for temperature across all sensors */
 SELECT​ STDDEV(temperature.n) ​AS​ StdDev ​FROM​ sensor_data;

Production Ready? Not Quite

The system that we’ve built here—the Kinesis/Lambda/DynamoDB pipeline plus Athena querying layer—has served us well in exploring these services, but you wouldn’t want to use it as a production system. Why? Because it’s built on the assumption that there’s only one table.

In reality, data in DynamoDB is often spread across many tables, while querying that data involves knowing in which tables the right data lives (one could call this tertiary indexing but the authors are unaware of this usage in the wild!). For our sensor data use case, that may involve, for example, storing data from different days in different tables and automatically writing new sensor data to the correct table.

To make a setup like that work, you could auto-infer the table name from the current date in the Lambda function, like this:

 var​ date = ​new​ Date().toJSON().slice(0,10).replace(​/-/g​, ​'-'​);
 var​ tableName = ​`sensor-data-​${date}​`​;
 var​ item = {
  TableName: tableName,
  Item: {
 // item data
  }
 }

The name of each table in this setup would be of the form sensor-data-YYYY-MM-DD. You could create DynamoDB tables for specific dates in advance, along with S3 buckets for holding JSON data and Athena external table definitions for those different buckets. This would, in turn, enable you to run queries across DynamoDB tables and thus to gather metrics across days. A query like this, for example, would find the average temperature for all sensors across two days’ worth of readings:

 SELECT​ (
  (​SELECT​ SUM(temperature.n) ​FROM​ sensor_data_05_01_2017) +
  (​SELECT​ SUM(temperature.n) ​FROM​ sensor_data_05_02_2017)
  ) / (
  (​SELECT​ COUNT(temperature.n) ​FROM​ sensor_data_05_01_2017) +
  (​SELECT​ COUNT(temperature.n) ​FROM​ sensor_data_05_02_2017)
 );

What a ride! We just combined five AWS services—DynamoDB, Kinesis, Lambda, DataPipeline, and Athena, plus some helper services like IAM—into a cohesive platform for sensor data.

Day 3 Homework

Find

  1. We added a global secondary index to our SensorData table, which increased our disk space usage. Read some documentation[56] to see how much space GSIs use.

  2. The Athena service enables you to use a broad palette of SQL commands for running queries against S3 buckets—but not all the commands you may be used to. Do some Googling and find out which commands are and aren’t supported.

Do

  1. Exercise 1 in the homework for Day 2 called for modifying the data pipeline to handle humidity readings from our mock sensors. Modify the sensor data input script from Day 3 so that each sensor reading contains a humidity reading. Make sure to use a random walk logic to keep those readings relatively un-volatile. Then create a new external table in Athena that can run queries against that new data model.

  2. In the last section of Day 3, we stated that the data pipeline we built isn’t quite production ready because we have only one table and that a better system would store data from different days in different tables. Begin building that system to the best of your ability, given what you’ve learned so far.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset