It’s well known that DynamoDB works well for lookup queries. But if you need to run heavy, analytical queries against the data in the DynamoDB table, you would need to use other tools for indexing data. One of the solutions you can come across in the AWS documentation and AWS blog posts is to use DynamoDB streams to load data to the AWS ElasticSearch service for indexing and providing a reacher search possibilities to users.

The image is from the original post in aws blog.

This time I decided to make all configurations via the provisioning tool - terraform. This will help to avoid too many screenshots of the AWS console and better show in the end, that the setup of this configuration consists indeed of three simple steps.

As usual the final code could be found the GitHub repository. All the pre phrases were said, let’s get started.

DynamoDB table

For the beginning we need to describe a table, which we are going to synchronize with ElasticSearch. As an example, I created some variation of the transactions table.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
resource "aws_dynamodb_table" "users-transactions-table" {
  name = "UsersTransactions"
  hash_key = "userId"
  range_key = "transactionId"
  write_capacity = 20
  read_capacity = 20

  attribute {
    name = "userId"
    type = "N"
  }

  attribute {
    name = "transactionId"
    type = "N"
  }

  attribute {
    name = "accountId"
    type = "N"
  }

  attribute {
    name = "amount"
    type = "N"
  }

  attribute {
    name = "shortDescription"
    type = "S"
  }

  local_secondary_index {
    name = "accountIdLI"
    projection_type = "KEYS_ONLY"
    range_key = "accountId"
  }

  local_secondary_index {
    name = "amountLI"
    projection_type = "KEYS_ONLY"
    range_key = "amount"
  }

  local_secondary_index {
    name = "shortDescriptionLI"
    projection_type = "KEYS_ONLY"
    range_key = "shortDescription"
  }

  stream_enabled = true

  stream_view_type = "NEW_IMAGE"
}

The crucial part here is stream_enabled = true. it creates the DynamoDB stream, which capture the changes happening in the table. The stream retains rows for 24 hours to be consumed by something, by Lambda in our case.

In this particular example, we are going to capture only new version of the row (stream_view_type = "NEW_IMAGE"), although the capturing logic could be a bit more complicated.

Processing stream Lambda

Now it’s time to create the Lambda function to consume events coming from DynamoDB and push them to the AWS ElasticSearch service. Let’s create function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
resource "aws_lambda_function" "process_dynamo_stream_function" {
  function_name = local.lambda_func_name
  handler = "index.handler"
  role = aws_iam_role.dyanmo_stream_lambda_role.arn
  runtime = "nodejs12.x"
  environment {
    variables = {
      ES_HOST = var.es_host
      ES_REGION = var.aws_region
    }
  }

  filename      = "./lambda_function_payload.zip"
  source_code_hash = filebase64sha256("./lambda_function_payload.zip")
}

Let me describe few things in this snippet. I am using the nodejs runtime for the function code, the same logic could be implemented in any runtime supported by AWS. The code is take locally (more details a little bit later) from the zip-file, for the production ready solution, probably could be used deployment via S3. Also, the function will need to know the ElasticSearch host, where to send the messages. This information provided to it through the environment object and taken as an output from the terraform module for the ElasticSearch.

Let’s assign the function to the event source:

1
2
3
4
5
resource "aws_lambda_event_source_mapping" "stream_function_event_trigger" {
  event_source_arn  = var.stream_arn
  function_name     = aws_lambda_function.process_dynamo_stream_function.arn
  starting_position = "LATEST"
}

the var.stream_arn is the output of the DynamoDB creation module (see the GitHub repository for more details).

Also we need to add needed permissions to the Lambda role:

  • it will need access to the DynamoDB streams reading,
  • access to the CloudWatch logs to write its own logs,
  • and finally access for calling the ElasticSearch domain.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
resource "aws_iam_role_policy" "dyanmo_stream_lambda_policy" {
  role = aws_iam_role.dyanmo_stream_lambda_role.id

  policy = <<-EOF
  {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "lambda:InvokeFunction",
            "Resource": "arn:aws:lambda:${var.aws_region}:${data.aws_caller_identity.current.account_id}:function:${local.lambda_func_name}*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:${var.aws_region}:${data.aws_caller_identity.current.account_id}:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:ListStreams"
            ],
            "Resource": "${var.stream_arn}"
        },
        {
            "Effect": "Allow",
            "Action": [
                "es:ESHttpGet",
                "es:ESHttpPost",
                "es:ESHttpPut"
            ],
            "Resource": "${var.domain_arn}"
        }
    ]
  }
  EOF
}

resource "aws_iam_role" "dyanmo_stream_lambda_role" {
  name = "dyanmo_stream_lambda_role"

  assume_role_policy = <<-EOF
  {
    "Version": "2012-10-17",
    "Statement": [
      {
        "Action": "sts:AssumeRole",
        "Principal": {
          "Service": "lambda.amazonaws.com"
        },
        "Effect": "Allow",
        "Sid": ""
      }
    ]
  }
  EOF
}

Lambda function code

And here’s the main part - the actual Lambda code, which makes the magic:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
'use strict';
var AWS = require("aws-sdk");
const httpAwsEs = require('http-aws-es');
const elasticsearch = require('elasticsearch');

const client = new elasticsearch.Client({
    host: process.env.ES_HOST,
    connectionClass: httpAwsEs,
    amazonES: {
        region: process.env.ES_REGION,
        credentials: new AWS.EnvironmentCredentials('AWS')
    }
});

exports.handler = (event, context, callback) => {

    event.Records.forEach((record) => {
        console.log('Stream record: ', JSON.stringify(record, null, 2));

        if (record.eventName === 'INSERT') {
            var rawData = record.dynamodb.NewImage;
            client.index({
                index: 'transactions',
                body: {
                    accountId: rawData.accountId.N,
                    shortDescription: rawData.shortDescription.S,
                    amount: rawData.amount.N,
                    userId: rawData.userId.N,
                    transactionId: rawData.transactionId.N
                }
            });
        }
    });
    callback(null, `Successfully processed ${event.Records.length} records.`);
};

There is no special part of AWS SDK for making ElasticSearch calls, but luckily there are other ES clients could be easy found, I only need to be sure, that the clients send authenticated and signed request. For that I will use nice little library http-aws-es. Putting all together and have this small snippet of code above, which does the job - pushing new rows in DynamoDB table to the transactions index.

Packing everything to zip file and we are ready to deploy our Lambda function.

1
zip -r ../lambda_function_payload.zip .

ElasticSearch

The last puzzle piece, but not least. Let’s create the ElasticSearch domain now. Before starting, I would comment here that the configuration of the ES domain is quite flexible and diverse, that’s why I would say it’s a bit overcomplicated also. But if you are not afraid of getting a bit deeper into the details of configuration, that should be totally fine.

First of all, I have to notice, that I wanted to show the outcome of this setup in some pleasant way and decided to use the Kibana, which comes with AWS ElasticSearch out of the box. The only problem here, that I didn’t want to setup also the security layer for the Kibana instance (it has a few SSO-kind of options), so I just opened the ES domain to public. But in case of production ready solution, you would need to keep ES in the private network or configure proper security for it.

Saying that, let’s take a look how we are creating AWS ES Domain by means of terraform module:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
resource "aws_elasticsearch_domain" "dynamo_indexing_es" {
  domain_name = "dynamostream"
  elasticsearch_version = "7.9"

  cluster_config {
    instance_type = "t2.small.elasticsearch"
  }

  ebs_options {
    ebs_enabled = true
    volume_size = 10
  }
}

resource "aws_elasticsearch_domain_policy" "dynamo_indexing_es_policy" {
  domain_name = aws_elasticsearch_domain.dynamo_indexing_es.domain_name
  access_policies = <<POLICIES
{
    "Version": "2012-10-17",
    "Statement": [
    {
        "Effect": "Allow",
        "Principal": {
            "AWS": "*"
        },
        "Action": [
            "es:*",
        ],
        "Resource": "${aws_elasticsearch_domain.dynamo_indexing_es.arn}/*"
    },
    {
        "Effect": "Allow",
        "Principal": {
            "AWS": "${var.lambda_role_arn}"
        },
        "Action": "es:*",
        "Resource": "${aws_elasticsearch_domain.dynamo_indexing_es.arn}/*"
    }
]
}
POLICIES
}

warning

In the policy, the first rule grants all actions to all principals - that is what should be adjusted.

Additionally, I have configured the output to show the Kibana endpoint, where final data can be checked:

1
2
3
output "domain_kebana_host" {
  value = aws_elasticsearch_domain.dynamo_indexing_es.kibana_endpoint
}

The Test

Now the fun part came, let’s build our infrastructure and run the test.

1
terraform apply

after a few minutes, you can find in the console something like that:

1
es_kibana_host = search-dynamostream-ph7dni5uop7gcepdpxhhhyar3i.eu-central-1.es.amazonaws.com/_plugin/kibana/

Before going to the specified URL, let’s put a row in the DynamoDB table:

1
2
3
aws dynamodb put-item \
--table-name UsersTransactions \
--item userId={N=3},transactionId={N=51},accountId={N=3},amount={N=11},shortDescription={S="Final mega test 2"}

A few moments later, we can see in Lambda logs, that the new event was processed: lambda log

and if go to the Kibana interface, we can see the entries of our transactions index: lambda log

Now we are free to search our data securely stored in DynamoDB and indexed in the AWS ElasticSearch.

Conclusion

The solution described here is indeed quite simple and natural for the AWS services. The usage of provisioning tools like terraform makes it event smother. The simplicity comes also with good durability of the solution because the services used were designed to be integrated with each other as a result we need to write relatively less code to achieve a quite powerful result.

Update: I wrote the second part about the alternative way of getting the same result.