Chapter 15. Data Engineering

Data science may be the sexiest job of the 21st century, but the field is evolving rapidly into different job titles. Data scientist has been too crude a description for a whole series of tasks. As of 2020, two jobs that can pay the same or more are data engineer and machine learning engineer.

Even more surprising is the vast number of data engineer roles needed to support a traditional data scientist. It is somewhere between three to five data engineers to one data scientist.

What is happening? Let’s look at it from another angle. Let’s pretend we are writing headlines for a newspaper and want to say something eye-catching. We could say, “CEO is the sexiest job for the rich.” There are few CEOs, just like there are few NBA stars, just like there are few professional actors who are making a living. For every CEO, how many people are working to make that CEO successful? This last statement is content-free and meaningless, like “water is wet.”

This statement isn’t to say that you can’t make a living as a data scientist; it is more a critique of the logistics behind the statement. There is a huge demand for skills in data, and they range from DevOps to machine learning to communication. The term data scientist is nebulous. Is it a job or a behavior? In a way, it is a lot like the word DevOps. Is DevOps a job, or is it a behavior?

In looking at job posting data and salary data, it appears the job market is saying there is an apparent demand for actual roles in data engineering and machine learning engineering. This is because those roles perform identifiable tasks. A data engineer task could be creating a pipeline in the cloud that collects both batch and streaming data and then creates APIs to access that data and schedule those jobs. This job is not a squishy task. It works, or it doesn’t.

Likewise, a machine learning engineer builds machine learning models and deploys them in a way that they are maintainable. This job is also not squishy. An engineer can do data engineering or machine learning engineering though, and still exhibit behaviors attributed to data science and DevOps. Today is an exciting time to be involved in data, as there are some considerable opportunities to build complex and robust data pipelines that feed into other complex and powerful prediction systems. There is an expression that says, “you can never be too rich or too thin.” Likewise, with data, you can never have too much DevOps or data science skills. Let’s dive into some DevOps-flavored ideas for data engineering.

Small Data

Toolkits are an exciting concept. If you call a plumber to your house, they arrive with tools that help them be more effective than you could be at a task. If you hire a carpenter to build something at your house, they also have a unique set of tools that help them perform a task in a fraction of the time you could. Tools are essential to professionals, and DevOps is no exception.

In this section, the tools of data engineering outline themselves. These tools include small data tasks like reading and writing files, using pickle, using JSON, and writing and reading YAML files. Being able to master these formats is critical to be the type of automator who can tackle any task and turn it into a script. Tools for Big Data tasks are also covered later in the chapter. It discusses distinctly different tools than the tools used for small data.

What is Big Data and what is small data then? One easy way to figure the distinction out is the laptop test. Does it work on your laptop? If it doesn’t, then it is Big Data. A good example is Pandas. Pandas require between 5 to 10 times the amount of RAM as the dataset. If you have a 2-GB file and you are using Pandas, most likely your laptop won’t work.

Dealing with Small Data Files

If there was a single defining trait of Python, it would be a relentless pursuit of efficiency in the language. A typical Python programmer wants to write just enough code to get a task done but wants to stop at the point where the code becomes unreadable or terse. Also, a typical Python programmer will not want to write boilerplate code. This environment has led to a continuous evolution of useful patterns.

One example of an active pattern is using the with statement to read and write files. The with statement handles the boring boilerplate parts of closing a file handle after the work has completed. The with statement is also used in other parts of the Python language to make tedious tasks less annoying.

Write a File

This example shows that writing a file using the with statement automatically closes the file handle upon execution of a code block. This syntax prevents bugs that can occur quickly when the handle is accidentally not closed:

with open("containers.txt", "w") as file_to_write:
  file_to_write.write("Pod/n")
  file_to_write.write("Service/n")
  file_to_write.write("Volume/n")
  file_to_write.write("Namespace/n")

The output of the file reads like this:

cat containers.txt

Pod
Service
Volume
Namespace

Read a File

The with context also is the recommended way to read a file. Notice that using readlines() uses line breaks to return a lazily evaluated iterator:

with open("containers.txt") as file_to_read:
  lines = file_to_read.readlines()
  print(lines)

The output:

['Pod
', 'Service
', 'Volume
', 'Namespace
']

In practice, this means that you can handle large log files by using generator expressions and not worry about consuming all of the memory on your machine.

Generator Pipeline to Read and Process Lines

This code is a generator function that opens a file and returns a generator:

def process_file_lazily():
  """Uses generator to lazily process file"""

  with open("containers.txt") as file_to_read:
    for line in file_to_read.readlines():
      yield line

Next, this generator is used to create a pipeline to perform operations line by line. In this example, the line converts to a lowercase string. Many other actions could be chained together here, and it would be very efficient because it is only using the memory necessary to process a line at a time:

# Create generator object
pipeline = process_file_lazily()
# convert to lowercase
lowercase = (line.lower() for line in pipeline)
# print first processed line
print(next(lowercase))

This is the output of the pipeline:

pod

In practice, this means that files that are effectively infinite because they are so large could still be processed if the code works in a way that exits when it finds a condition. For example, perhaps you need to find a customer ID across terabytes of log data. A generator pipeline could look for this customer ID and then exit the processing at the first occurrence. In the world of Big Data, this is no longer a theoretical problem.

Using YAML

YAML is becoming an emerging standard for DevOps-related configuration files. It is a human-readable data serialization format that is a superset of JSON. It stands for “YAML Ain’t Markup Language.” You often see YAML in build systems such as AWS CodePipeline, CircleCI, or PaaS offerings such as Google App Engine.

There is a reason YAML is so often used. There is a need for a configuration language that allows rapid iteration when interacting with highly automated systems. Both a nonprogrammer and a programmer can intuitively figure out how to edit these files. Here is an example:

import yaml

kubernetes_components = {
    "Pod": "Basic building block of Kubernetes.",
    "Service": "An abstraction for dealing with Pods.",
    "Volume": "A directory accessible to containers in a Pod.",
    "Namespaces": "A way to divide cluster resources between users."
}

with open("kubernetes_info.yaml", "w") as yaml_to_write:
  yaml.safe_dump(kubernetes_components, yaml_to_write, default_flow_style=False)

The output written to disk looks like this:

cat kubernetes_info.yaml

Namespaces: A way to divide cluster resources between users.
Pod: Basic building block of Kubernetes.
Service: An abstraction for dealing with Pods.
Volume: A directory accessible to containers in a Pod.

The takeway is that it makes it trivial to serialize a Python data structure into a format that is easy to edit and iterate on. Reading this file back is just two lines of code.

import yaml

with open("kubernetes_info.yaml", "rb") as yaml_to_read:
  result = yaml.safe_load(yaml_to_read)

The output then can be pretty printed:

import pprint
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(result)
{   'Namespaces': 'A way to divide cluster resources between users.',
    'Pod': 'Basic building block of Kubernetes.',
    'Service': 'An abstraction for dealing with Pods.',
    'Volume': 'A directory accessible to containers in a Pod.'}

Big Data

Data has been growing at a rate faster than the growth of computer processing power. To make things even more interesting, Moore’s Law, which states that speed and capability of computers can be expected to double every two years, effectively ceased to apply around 2015, according to Dr. David Patterson at UC Berkeley. CPU speed is only increasing around 3% per a year now.

New methods of dealing with Big Data are necessary. Some of the new methods, include using ASICs like GPUs, tensor processing units (TPUs), and as well as AI and data platforms provided by cloud vendors. On the chip level, this means that a GPU could be the ideal target for a complex IT process instead of a CPU. Often this GPU is paired together with a system that can deliver a distributed storage mechanism that allows both distributed computing and distributed disk I/O. An excellent example of this is Apache Spark, Amazon SageMaker, or the Google AI Platform. All of them can utilize ASICs (GPU, TPU, and more), plus distributed storage along with a management system. Another example that is more low level is Amazon Spot Instance deep learning AMIs with Amazon Elastic File System (EFS) mount points.

For a DevOps professional, this means a few things. First, it means that special care when delivering software to these systems makes sense. For example, does the target platform have the correct GPU drivers? Are you deploying via containers? Is this system going to use distributed GPU processing? Is the data mostly batch, or is it streaming? Thinking about these questions up front can go a long way to ensuring the correct architecture.

One problem with buzzwords like AI, Big Data, cloud, or data scientist is that they mean different things to different people. Take data scientist for example. In one company it could mean someone who generates business intelligence dashboards for the sales team, and in another company, it could mean someone who is developing self-driving car software. Big Data has a similar context issue; it can mean many different things depending on whom you meet. Here is one definition to consider. Do you need different software packages to handle data on your laptop than in production?

An excellent example of a “small data” tool is the Pandas package. According to the author of the Pandas package, it can take between 5 and 10 times the amount of RAM as the size of the file used. In practice, if your laptop has 16 GB of RAM and you open a 2-GB CSV file, it is now a Big Data problem because your laptop may not have enough RAM, 20 GB, to work with the file. Instead, you may need to rethink how to handle the problem. Perhaps you can open a sample of the data, or truncate the data to get around the problem initially.

Here is an example of this exact problem and a workaround. Let’s say you are supporting data scientists that keep running into Pandas out-of-memory errors because they are using files too large for Pandas. One such example is the Open Food Facts dataset from Kaggle. When uncompressed, the dataset is over 1 GB. This problem fits precisely into the sweet spot of where Pandas could struggle to process it. One thing you can do is use the Unix shuf command to create a shuffled sample:

time shuf -n 100000 en.openfoodfacts.org.products.tsv
    > 10k.sample.en.openfoodfacts.org.products.tsv
    1.89s user 0.80s system 97% cpu 2.748 total

In a little under two seconds, the file can be cut down to a manageable size. This approach is preferable to simply using heads or tails, because the samples are randomly selected. This problem is significant for a data science workflow. Also, you can inspect the lines of the file to see what you are dealing with first:

wc -l en.openfoodfacts.org.products.tsv
  356002 en.openfoodfacts.org.products.tsv

The source file is about 350,000 lines, so grabbing 100,000 shuffled lines is approximately a third of the data. This task can be confirmed by looking at the transformed file. It shows 272 MB, around one-third of the size of the original 1-GB file:

du -sh 10k.sample.en.openfoodfacts.org.products.tsv
272M    10k.sample.en.openfoodfacts.org.products.tsv

This size is more much manageable by Pandas, and this process could then be turned into an automation workflow that creates randomized sample files for Big Data sources. This type of process is just one of many particular workflows that Big Data demands.

Another definition of Big Data is by McKinsey, who defined Big Data in 2011 as “datasets whose size is beyond the ability of typical database software tools to capture, store, manage, and analyze.” This definition is reasonable as well, with the slight modification that it isn’t just database software tools; it is any tool that touches data. When a tool that works well on a laptop, such as Pandas, Python, MySQL, deep learning/machine learning, Bash, and more, fails to perform conventionally due to the size or velocity (rate of change) of the data, it is now a Big Data problem. Big Data problems require specialized tools, and the next section dives into this requirement.

Big Data Tools, Components, and Platforms

Another way to discuss Big Data is to break it down into tools and platforms. Figure 15-1 shows a typical Big Data architecture life cycle.

pydo 1501
Figure 15-1. Big Data architecture

Let’s discuss a few key components.

Data Sources

Some of the familiar sources of Big Data include social networks and digital transactions. As people have migrated more of their conversations and their business transactions online, it has led to an explosion of data. Additionally, mobile technology such as tablets, phones, and laptops that record audio and video, exponentially create sources of data.

Other data sources include the Internet of Things (IoT), which includes sensors, lightweight chips, and devices. All of this leads to an unstoppable proliferation of data that needs to be stored somewhere. The tools involved in Data Sources could range from IoT client/server systems such as AWS IoT Greengrass, to object storage systems such as Amazon S3 or Google Cloud Storage.

Filesystems

Filesystems have played a huge role in computing. Their implementation, though, is continuously evolving. In dealing with Big Data, one issue is having enough disk I/O, to handle distributed operations.

One modern tool that deals with this is the Hadoop Distributed File System (HDFS). It works by clustering many servers together, allowing aggregated CPU, disk I/O, and storage. In practice, this makes HDFS a fundamental technology for dealing with Big Data. It can migrate large volumes of data or filesystems for distributed computing jobs. It is also the backbone of Spark, which can do both stream- and batch-based machine learning.

Other types of filesystems include object storage filesystems such as Amazon S3 filesystem and Google Cloud Platform storage. They allow for huge files to be stored in a distributed and highly available manner, or more precisely, 99.999999999% reliability. There are Python APIs and command-line tools available that communicate with these filesystems, enabling easy automation. These cloud APIs are covered in more detail in Chapter 10.

Finally, another type of filesystem to be aware of is a traditional network filesystem, or NFS, made available as a managed cloud service. An excellent example of this is Amazon Elastic File System (Amazon EFS). For a DevOps professional, a highly available and elastic NFS filesystem can be an incredibly versatile tool, especially coupled with containers technology. Figure 15-2 shows an example of mounting EFS in a container.

pydo 1502
Figure 15-2. Mounting EFS in a container

One powerful automation workflow is to programmatically create Docker containers through a build system such as AWS CodePipeline or Google Cloud Build. Those containers then get registered in the cloud container registry, for example, Amazon ECR. Next, a container management system such as Kubernetes spawns containers that mount the NFS. This allows both the power of immutable container images that spawn quickly, and access to centralized source code libraries and data. This type of workflow could be ideal for an organization looking to optimize machine learning operations.

Data Storage

Ultimately the data needs to live somewhere, and this creates some exciting opportunities and challenges. One emerging trend is to utilize the concept of a data lake. Why do you care about a data lake? A data lake allows for data processing in the same location as storage. As a result, many data lakes need to have infinite storage and provide infinite computing (i.e., be on the cloud). Amazon S3 is often a common choice for a data lake.

The data lake constructed in this manner can also be utilized by a machine learning pipeline that may depend on the training data living in the lake, as well as the trained models. The trained models could then always be A/B tested to ensure the latest model is improving the production prediction (inference) system, as shown in Figure 15-3.

pydo 1503
Figure 15-3. Data lake

Other forms of storage will be very familiar to traditional software developers. These storage systems include relational databases, key/value databases, search engines like Elasticsearch, and graph databases. In a Big Data architecture, each type of storage system may play a more specific role. In a small-scale system, a relational database may be a jack of all trades, but in a Big Data architecture, there is less room for tolerance of a mismatch of the storage system.

An excellent example of mismatch in storage choice is using a relational database as a search engine by enabling full-text search capabilities instead of using a specialized solution, like Elasticsearch. Elasticsearch is designed to create a scalable search solution, while a relational database is designed to provide referential integrity and transactions. The CTO of Amazon, Werner Vogel, makes this point very well by stating that a “one size database doesn’t fit anyone.” This problem is illustrated in Figure 15-4, which shows that each type of database has a specific purpose.

pydo 1504
Figure 15-4. Amazon databases

Picking the correct storage solutions, including which combination of databases to use, is a crucial skill for any type of data architect to ensure that a system works at optimal efficiency. In thinking through the design of a system that is fully automated and efficient, maintenance should be a consideration. If a particular technology choice is being abused, such as using a relational database for a highly available messaging queue, then maintenance costs could explode, which in turn creates more automation work. So another component to consider is how much automation work it takes to maintain a solution.

Real-Time Streaming Ingestion

Real-time streaming data is a particularly tricky type of data to deal with. The stream itself increases the complexity of dealing with the data, and it is possible the stream needs to route to yet another part of the system that intends to process the data in a streaming fashion. One example of a cloud-based streaming ingestion solution is Amazon Kinesis Data Firehose. See Figure 15-5.

pydo 1505
Figure 15-5. Kinesis log files

Here is an example of code that would do that. Notice that Python’s asyncio module allows for highly concurrent network operations that are single threaded. Nodes could emit this in a job farm, and it could be metrics or error logs:

import asyncio

def send_async_firehose_events(count=100):
    """Async sends events to firehose"""

    start = time.time()
    client = firehose_client()
    extra_msg = {"aws_service": "firehose"}
    loop = asyncio.get_event_loop()
    tasks = []
    LOG.info(f"sending aysnc events TOTAL {count}",extra=extra_msg)
    num = 0
    for _ in range(count):
        tasks.append(asyncio.ensure_future(put_record(gen_uuid_events(),
                                                      client)))
        LOG.info(f"sending aysnc events: COUNT {num}/{count}")
        num +=1
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    end = time.time()
    LOG.info("Total time: {}".format(end - start))

Kinesis Data Firehose works by accepting capture data and routing it continuously to any number of destinations: Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, or some third-party service like Splunk. An open source alternative to using a managed service like Kinesis is to use Apache Kafka. Apache Kafka has similar principles in that it works as a pub/sub architecture.

Case Study: Building a Homegrown Data Pipeline

In the early days of Noah’s work as CTO and General Manager at a startup in the early 2000s, one problem that cropped up was how to build the company’s first machine learning pipeline and data pipeline. A rough sketch of what that looked like is in the following diagram of a Jenkins data pipeline (Figure 15-6).

pydo 1506
Figure 15-6. Jenkins data pipeline

The inputs to the data pipeline are any data source needed for business analytics or machine learning predictions. These sources included a relational database, Google Analytics, and social media metrics, to a name a few. The collection jobs ran every hour and generated CSV files that were available internally by Apache web service. This solution was a compelling and straightforward process.

The jobs themselves were Jenkins jobs that were Python scripts that ran. If something needed to be changed, it was fairly straightforward to change a Python script for a particular job. An added benefit to this system was that it was straightforward to debug. If a job had a failure, the jobs showed up as failed, and it was straightforward to look at the output of the job and see what happened.

The final stage of the pipeline then created machine learning predictions and an analytics dashboard that served out dashboards via an R-based Shiny application. The simplicity of the approach is the most influential factor in this type of architecture, and as a bonus it leverages existing DevOps skills.

Serverless Data Engineering

Another emerging pattern is serverless data engineering. Figure 15-7 is a high-level architectural diagram of what a serverless data pipeline is.

pydo 1507
Figure 15-7. Serverless data pipeline

Next, let’s look at what a timed lambda does.

Using AWS Lambda with CloudWatch Events

You can create a CloudWatch timer to call the lambda using the AWS Lambda console and to set up a trigger, as shown in Figure 15-8.

pydo 1508
Figure 15-8. CloudWatch Lambda timer

Using Amazon CloudWatch Logging with AWS Lambda

Using CloudWatch logging is an essential step for Lambda development. Figure 15-9 is an example of a CloudWatch event log.

pydo 1509
Figure 15-9. CloudWatch event log

Using AWS Lambda to Populate Amazon Simple Queue Service

Next, you want to do the following locally in AWS Cloud9:

  1. Create a new Lambda with Serverless Wizard.

  2. cd into lambda and install packages one level up.

pip3 install boto3 --target ../
pip3 install python-json-logger --target ../

Next, you can test local and deploy this code:

'''
Dynamo to SQS
'''

import boto3
import json
import sys
import os

DYNAMODB = boto3.resource('dynamodb')
TABLE = "fang"
QUEUE = "producer"
SQS = boto3.client("sqs")

#SETUP LOGGING
import logging
from pythonjsonlogger import jsonlogger

LOG = logging.getLogger()
LOG.setLevel(logging.INFO)
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
LOG.addHandler(logHandler)

def scan_table(table):
    '''Scans table and return results'''

    LOG.info(f"Scanning Table {table}")
    producer_table = DYNAMODB.Table(table)
    response = producer_table.scan()
    items = response['Items']
    LOG.info(f"Found {len(items)} Items")
    return items

def send_sqs_msg(msg, queue_name, delay=0):
    '''Send SQS Message

    Expects an SQS queue_name and msg in a dictionary format.
    Returns a response dictionary.
    '''

    queue_url = SQS.get_queue_url(QueueName=queue_name)["QueueUrl"]
    queue_send_log_msg = "Send message to queue url: %s, with body: %s" %
        (queue_url, msg)
    LOG.info(queue_send_log_msg)
    json_msg = json.dumps(msg)
    response = SQS.send_message(
        QueueUrl=queue_url,
        MessageBody=json_msg,
        DelaySeconds=delay)
    queue_send_log_msg_resp = "Message Response: %s for queue url: %s" %
        (response, queue_url)
    LOG.info(queue_send_log_msg_resp)
    return response

def send_emissions(table, queue_name):
    '''Send Emissions'''

    items = scan_table(table=table)
    for item in items:
        LOG.info(f"Sending item {item} to queue: {queue_name}")
        response = send_sqs_msg(item, queue_name=queue_name)
        LOG.debug(response)

def lambda_handler(event, context):
    '''
    Lambda entrypoint
    '''

    extra_logging = {"table": TABLE, "queue": QUEUE}
    LOG.info(f"event {event}, context {context}", extra=extra_logging)
    send_emissions(table=TABLE, queue_name=QUEUE)

```

This code does the following:

  1. Grabs company names from Amazon DynamoDB.

  2. Puts the names into Amazon SQS.

To test it, you can do a local test in Cloud9 (Figure 15-10).

pydo 1510
Figure 15-10. Local test in Cloud9

Next you can verify messages in SQS, as shown in Figure 15-11.

pydo 1511
Figure 15-11. SQS verification

Don’t forget to set the correct IAM role! You need to assign the lambda an IAM role that can write messages to SQS, as shown in Figure 15-12.

pydo 1512
Figure 15-12. Permission error

Wiring Up CloudWatch Event Trigger

The final step to enable CloudWatch trigger does the following: enable timed execution of producer, and verify that messages flow into SQS, as shown in Figure 15-13.

pydo 1513
Figure 15-13. Configure timer

You can now see messages in the SQS queue (Figure 15-14).

pydo 1514
Figure 15-14. SQS queue

Creating Event-Driven Lambdas

With the producer lambda out of the way, next up is to create an event-driven lambda that fires asynchronously upon every message in SQS (the consumer). The Lambda function can now fire in response to every SQS message (Figure 15-15).

pydo 1515
Figure 15-15. Fire on SQS event

Reading Amazon SQS Events from AWS Lambda

The only task left is to write the code to consume the messages from SQS, process them using our API, and then write the results to S3:

import json

import boto3
import botocore
#import pandas as pd
import pandas as pd
import wikipedia
import boto3
from io import StringIO

#SETUP LOGGING
import logging
from pythonjsonlogger import jsonlogger

LOG = logging.getLogger()
LOG.setLevel(logging.DEBUG)
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
LOG.addHandler(logHandler)

#S3 BUCKET
REGION = "us-east-1"

### SQS Utils###
def sqs_queue_resource(queue_name):
    """Returns an SQS queue resource connection

    Usage example:
    In [2]: queue = sqs_queue_resource("dev-job-24910")
    In [4]: queue.attributes
    Out[4]:
    {'ApproximateNumberOfMessages': '0',
     'ApproximateNumberOfMessagesDelayed': '0',
     'ApproximateNumberOfMessagesNotVisible': '0',
     'CreatedTimestamp': '1476240132',
     'DelaySeconds': '0',
     'LastModifiedTimestamp': '1476240132',
     'MaximumMessageSize': '262144',
     'MessageRetentionPeriod': '345600',
     'QueueArn': 'arn:aws:sqs:us-west-2:414930948375:dev-job-24910',
     'ReceiveMessageWaitTimeSeconds': '0',
     'VisibilityTimeout': '120'}

    """

    sqs_resource = boto3.resource('sqs', region_name=REGION)
    log_sqs_resource_msg =
      "Creating SQS resource conn with qname: [%s] in region: [%s]" %
    (queue_name, REGION)
    LOG.info(log_sqs_resource_msg)
    queue = sqs_resource.get_queue_by_name(QueueName=queue_name)
    return queue

def sqs_connection():
    """Creates an SQS Connection which defaults to global var REGION"""

    sqs_client = boto3.client("sqs", region_name=REGION)
    log_sqs_client_msg = "Creating SQS connection in Region: [%s]" % REGION
    LOG.info(log_sqs_client_msg)
    return sqs_client

def sqs_approximate_count(queue_name):
    """Return an approximate count of messages left in queue"""

    queue = sqs_queue_resource(queue_name)
    attr = queue.attributes
    num_message = int(attr['ApproximateNumberOfMessages'])
    num_message_not_visible = int(attr['ApproximateNumberOfMessagesNotVisible'])
    queue_value = sum([num_message, num_message_not_visible])
    sum_msg = """'ApproximateNumberOfMessages' and
    'ApproximateNumberOfMessagesNotVisible' =
      *** [%s] *** for QUEUE NAME: [%s]""" %
        (queue_value, queue_name)
    LOG.info(sum_msg)
    return queue_value

def delete_sqs_msg(queue_name, receipt_handle):

    sqs_client = sqs_connection()
    try:
        queue_url = sqs_client.get_queue_url(QueueName=queue_name)["QueueUrl"]
        delete_log_msg = "Deleting msg with ReceiptHandle %s" % receipt_handle
        LOG.info(delete_log_msg)
        response = sqs_client.delete_message(QueueUrl=queue_url,
          ReceiptHandle=receipt_handle)
    except botocore.exceptions.ClientError as error:
        exception_msg =
          "FAILURE TO DELETE SQS MSG: Queue Name [%s] with error: [%s]" %
            (queue_name, error)
        LOG.exception(exception_msg)
        return None

    delete_log_msg_resp = "Response from delete from queue: %s" % response
    LOG.info(delete_log_msg_resp)
    return response

def names_to_wikipedia(names):

    wikipedia_snippit = []
    for name in names:
        wikipedia_snippit.append(wikipedia.summary(name, sentences=1))
    df = pd.DataFrame(
        {
            'names':names,
            'wikipedia_snippit': wikipedia_snippit
        }
    )
    return df

def create_sentiment(row):
    """Uses AWS Comprehend to Create Sentiments on a DataFrame"""

    LOG.info(f"Processing {row}")
    comprehend = boto3.client(service_name='comprehend')
    payload = comprehend.detect_sentiment(Text=row, LanguageCode='en')
    LOG.debug(f"Found Sentiment: {payload}")
    sentiment = payload['Sentiment']
    return sentiment

def apply_sentiment(df, column="wikipedia_snippit"):
    """Uses Pandas Apply to Create Sentiment Analysis"""

    df['Sentiment'] = df[column].apply(create_sentiment)
    return df

### S3 ###

def write_s3(df, bucket):
    """Write S3 Bucket"""

    csv_buffer = StringIO()
    df.to_csv(csv_buffer)
    s3_resource = boto3.resource('s3')
    res = s3_resource.Object(bucket, 'fang_sentiment.csv').
        put(Body=csv_buffer.getvalue())
    LOG.info(f"result of write to bucket: {bucket} with:
 {res}")

def lambda_handler(event, context):
    """Entry Point for Lambda"""

    LOG.info(f"SURVEYJOB LAMBDA, event {event}, context {context}")
    receipt_handle  = event['Records'][0]['receiptHandle'] #sqs message
    #'eventSourceARN': 'arn:aws:sqs:us-east-1:561744971673:producer'
    event_source_arn = event['Records'][0]['eventSourceARN']

    names = [] #Captured from Queue

    # Process Queue
    for record in event['Records']:
        body = json.loads(record['body'])
        company_name = body['name']

        #Capture for processing
        names.append(company_name)

        extra_logging = {"body": body, "company_name":company_name}
        LOG.info(f"SQS CONSUMER LAMBDA, splitting arn: {event_source_arn}",
          extra=extra_logging)
        qname = event_source_arn.split(":")[-1]
        extra_logging["queue"] = qname
        LOG.info(f"Attempting Delete SQS {receipt_handle} {qname}",
          extra=extra_logging)
        res = delete_sqs_msg(queue_name=qname, receipt_handle=receipt_handle)
        LOG.info(f"Deleted SQS receipt_handle {receipt_handle} with res {res}",
          extra=extra_logging)

    # Make Pandas dataframe with wikipedia snippts
    LOG.info(f"Creating dataframe with values: {names}")
    df = names_to_wikipedia(names)

    # Perform Sentiment Analysis
    df = apply_sentiment(df)
    LOG.info(f"Sentiment from FANG companies: {df.to_dict()}")

    # Write result to S3
    write_s3(df=df, bucket="fangsentiment")

You can see that one easy way to download the files is to use the AWS CLI:

noah:/tmp $ aws s3 cp --recursive s3://fangsentiment/ .
download: s3://fangsentiment/netflix_sentiment.csv to ./netflix_sentiment.csv
download: s3://fangsentiment/google_sentiment.csv to ./google_sentiment.csv
download: s3://fangsentiment/facebook_sentiment.csv to ./facebook_sentiment.csv

OK, so what did we accomplish? Figure 15-16 shows our serverless AI data engineering pipeline.

pydo 1516
Figure 15-16. Serverless AI data engineering pipeline

Conclusion

Data engineering is an evolving job title, and it benefits greatly from strong DevOps skills. The DevOps best practices of Microservices, Continous Delivery, Infrastructure as Code, and Monitoring and Logging play a tremendous role in this category. Often by leveraging cloud-native technologies, it makes hard problems possible, and simple problems effortless.

Here are some next steps to continue on a journey of mastery with data engineering. Learn serverless technology. It doesn’t matter what the cloud is, learn it! This environment is the future, and data engineering, in particular, is well suited to capitalize on this trend.

Exercises

  • Explain what Big Data is and what its key characteristics are.

  • Use small data tools in Python to solve common problems.

  • Explain what a data lake is and how it is used.

  • Explain the appropriate use cases for different types of purpose-built databases.

  • Build a serverless data engineering pipeline in Python.

Case Study Question

  • Using the same architecture shown in this chapter, build an end-to-end serverless data engineering pipeline that scrapes a website using Scrapy, Beautiful Soup, or a similar library, and sends image files to Amazon Rekognition to be analyzed. Store the results of the Rekognition API call in Amazon DynamoDB. Run this job on a timer once a day.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset