Creating and launching a Kinesis Firehose stream

Within a few minutes, our ElasticSearch cluster should be up and running. We will now focus on the Kinesis Firehose component of our stack, which will let us feed data into ElasticSearch.

We will create a new script and call it firehose-cf-template.py.

The script starts as usual with several imports, the creation of a template variable, and a brief description:

"""Generating CloudFormation template.""" 
 
from troposphere import ( 
    GetAtt, 
    Join, 
    Ref, 
    Template, 
    ImportValue 
) 
 
from troposphere.firehose import ( 
    BufferingHints, 
    CloudWatchLoggingOptions, 
    DeliveryStream, 
    S3Configuration, 
    ElasticsearchDestinationConfiguration, 
    RetryOptions, 
) 
 
from troposphere.iam import Role 
 
from troposphere.s3 import Bucket 
 
t = Template() 
 
t.add_description('Effective DevOps in AWS: Kinesis Firehose Stream') 

The first resource we will create is an S3 bucket:

t.add_resource(Bucket( 
    "S3Bucket", 
    DeletionPolicy="Retain" 
)) 

Following this, we will create a new role to give the permissions to our Firehose stream to communicate with ElasticSearch and S3. To save a bit of time, we are going to use some of the managed policies that AWS provides. In a production environment, these policies might be too open for your liking, and you may instead opt for writing your own:

t.add_resource(Role( 
    'FirehoseRole', 
     ManagedPolicyArns=[ 
        'arn:aws:iam::aws:policy/AmazonS3FullAccess', 
        'arn:aws:iam::aws:policy/AmazonESFullAccess', 
    ], 
    AssumeRolePolicyDocument={ 
        'Version': '2012-10-17', 
        'Statement': [{ 
            'Action': 'sts:AssumeRole', 
            'Principal': {'Service': 'firehose.amazonaws.com'}, 
            'Effect': 'Allow', 
        }] 
    } 
)) 

Finally, we will create our Firehose stream. We will create a new resource of the type DeliveryStream and give it the name FirehoseLogs:

t.add_resource(DeliveryStream( 
    'FirehoseLogs', 
    DeliveryStreamName='FirehoseLogs', 

DeliveryStreams can be used to deliver data to several services. In our case, we want it to deliver data to ElasticSearch. For that, we will create an ElasticSearchDestinationConfiguration parameter as follows:

    ElasticsearchDestinationConfiguration=ElasticsearchDestinationConfiguration( 

The first piece of information we need to provide is the identifier of the ElasticSearch domain. We will do that by referencing the LogsDomainArn variable that we exported in the previous section:

        DomainARN=ImportValue("LogsDomainArn"),

Next, we will reference the IAM role we just defined before the creation of our DeliveryStream:

        RoleARN=GetAtt("FirehoseRole", "Arn"),

We will then specify the index name. You can picture that as the name of the database you want your logs to be in. We call our index logs to keep things simple:

        IndexName="logs",

In addition, in ElasticSearch, indices contain documents of different types (each type has its own name and mapping). We will name ours Logs:

        TypeName="Logs",

One of the common ways to shard data across an ElasticSearch cluster is to use temporal sharding. In our case, we will pick a daily rotation. For instance, an index containing the logs of March 24, 2020 will be called logs-2020.03.24. To do that, we will configure the IndexRotationPeriod to rotate logs every day:

        IndexRotationPeriod="OneDay", 

Occasionally, ElasticSearch might get congested and won't reply right away. We will configure our stream to retry delivering data for five minutes:

        RetryOptions=RetryOptions( 
            DurationInSeconds="300" 
        ), 

Kinesis Firehose works by buffering data until you reach a certain duration or a certain size. We will set them to do the minimum, which is 1 minute and 1 MB:

        BufferingHints=BufferingHints( 
            IntervalInSeconds=60, 
            SizeInMBs=1 
        ), 

At this point, Kinesis Firehose has all the information it needs to send data to ElasticSearch. We will now configure it to also store all those logs on S3. We will first configure the stream to back up all documents (the alternative is backing up only the ones that failed being inserted into ElasticSearch):

        S3BackupMode="AllDocuments", 

Following this, we will configure the S3. This will involve configuring the buffering as we did for ElasticSearch, referencing the bucket we created at the beginning of the template, stating whether we want to compress those logs, referencing the prefix for our files and, finally entering the role to use for these operations:

        S3Configuration=S3Configuration( 
            BufferingHints=BufferingHints( 
                IntervalInSeconds=300, 
                SizeInMBs=5 
            ), 
            BucketARN=Join("", [ 
                "arn:aws:s3:::", Ref("S3Bucket") 
            ]), 
            CompressionFormat='UNCOMPRESSED', 
            Prefix='firehose-logs', 
            RoleARN=GetAtt("FirehoseRole", "Arn"),
        ), 
    ) 
)) 

We will conclude our script by printing the JSON output of our template:

print t.to_json() 

Your script should look as follows: http://bit.ly/2v2tFAo.

We can now commit the script, create the template, and launch it:

$ git add firehose-cf-template.py
$ git commit -m "Adding Firehose template"
$ git push
$ python firehose-cf-template.py > firehose-cf.template
$ aws cloudformation create-stack
--stack-name firehose
--template-body file://firehose-cf.template
--capabilities CAPABILITY_IAM

At this point, we have a working Firehose-to-ElasticSearch pipeline (also known as the EKK stack). We now need to circle back to our application and make some changes to deliver our logs to it.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset