Querying DynamoDB using Python

Now that we have created a table and added data, we just need to write some code to query it. This will form part of the code that will be used in the Lambda function later on.

Create a Python script called dynamo_query_table.py with the following code:

import decimal
import json

from boto3 import resource
from boto3.dynamodb.conditions import Key


class DecimalEncoder(json.JSONEncoder):
    """Helper class to convert a DynamoDB item to JSON
    """
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if o % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)


class DynamoRepository:
    def __init__(self, target_dynamo_table, region='eu-west-1'):
        self.dynamodb = resource(service_name='dynamodb', region_name=region)
        self.dynamo_table = target_dynamo_table
        self.table = self.dynamodb.Table(self.dynamo_table)

    def query_dynamo_record_by_parition(self, parition_key, 
parition_value): try: response = self.table.query( KeyConditionExpression=
Key(parition_key).eq(parition_value)) for record in response.get('Items'): print(json.dumps(record, cls=DecimalEncoder)) return except Exception as e: print('Exception %s type' % str(type(e))) print('Exception message: %s ' % str(e)) def query_dynamo_record_by_parition_sort_key(self,
partition_key, partition_value, sort_key, sort_value): try: response = self.table.query( KeyConditionExpression=Key(partition_key)
.eq(partition_value) & Key(sort_key).gte(sort_value)) for record in response.get('Items'): print(json.dumps(record, cls=DecimalEncoder)) return except Exception as e: print('Exception %s type' % str(type(e))) print('Exception message: %s ' % str(e)) def main(): table_name = 'user-visits' partition_key = 'EventId' partition_value = '324' sort_key = 'EventDay' sort_value = 20171001 dynamo_repo = DynamoRepository(table_name) print('Reading all data for partition_key:%s' % partition_value) dynamo_repo.query_dynamo_record_by_parition(partition_key,
partition_value) print('Reading all data for partition_key:%s with date > %d'
% (partition_value, sort_value)) dynamo_repo.query_dynamo_record_by_parition_sort_key(partition_key,
partition_value, sort_key, sort_value) if __name__ == '__main__': main()

As I did earlier, I've created the DynamoRepository class, which abstracts all interactions with DynamoDB, including the connection and querying of tables. The following are the two methods used for querying the table using DynamoDB's self.table.query():

  • The query_dynamo_record_by_parition() method, which queries for records by partition_key, also known as a hash key, which in this case is EventId. Here, we are using just the equality condition in the query(), shown as KeyConditionExpression=Key(partition_key).eq(parition_value)).
  • The query_dynamo_record_by_parition_sort_key() method, which queries for records by partition_key and sort_key , also known as a range key, which in this case is the EventDate. Here, we are using just the equality condition and the greater than or equal condition in the query() as KeyConditionExpression=Key(partition_key).eq(partition_value) & Key(sort_key).gte(sort_value)). This gives you the additional flexibility of quickly filtering by specific date ranges, for example, to retrieve the last 10 days of event data to display in a dashboard.

We then parse the returned records from the queries and print them to the console. This JSON will be what the Lambda will return to API Gateway as a response in the next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset