This is a second part of the previous writing about the option for streaming data from Amazon DynamoDb to other services. I wasn’t aware of all options of Amazon Kinesis family services when was writing the first part. But during preparation, I noticed that this is a good alternative to the old school AWS Lambda processing, which I described in the first part.

Well, what’s so interesting in this alternative is that you don’t need to bother about the connectivity to your  Amazon ES from Lambda code, which was quite a tricky place, to be honest. All you need is to enable Kinesis stream right there in the DynamoDb configuration for the table, and later use it as a source for Amazon Kinesis Firehose service. And the Kinesis Firehose in its turn already has a nice feature to deliver stream in a bunch of places and Amazon ES is one of them. How nice, isn’t it, like magic with no hands at all. We even don’t need to write code in Lambda to transfer data as-is from DynamoDb to Elastic Search.

To be completely accurate, you would need to write the lambda code if you want to transform the data which you sending to  the ES, but it’s a much better way to do it because Lambda would have only one simple responsibility to translate on data format to another.

Ok, enough talking, let’s take a look at how can we do that using the same terraform provisioning tool. As usual, the final code could be found in the GitHub repository.

A few words about terraform

I will post only changes to the previous setup which I have done, so I highly recommend you to take a look at the previous article.

Because I am adding change to the previous project, I need to keep the alternative approach also. For this reason the new param use_kinesis was added to the main variables list of the project, and all dependent modules will use it as an expression for the count parameter.

The variables.tf file:

1
2
3
4
variable "use_kinesis" {
  type = bool
  description = "Whether using kinesis to stream data to ES or plain Lambda"
}

The main.tf file:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# if not use_kinesis
module "StreamProcessingLambda" {
  count = var.use_kinesis ? 0 : 1
  source = "./modules/lambda"

  aws_region = var.aws_region
  stream_arn = module.TransactionsTable.streamArn
  domain_arn = module.EsDomain.domain_arn
  es_host = module.EsDomain.domain_host
}

# if use_kinesis
module "StreamProcessingKinesis" {
  count = var.use_kinesis ? 1 : 0
  source = "./modules/kinesis-stream"

  domain_arn = module.EsDomain.domain_arn
}

Notice the count param in the above code snippet.

DynamoDB table - enabling AWS Kinesis stream

The table itself didn’t change. The only change, which I needed is to enable the Kinesis stream feature for this table. Unfortunately, the Terraform AWS provider doesn’t have this property yet implemented for the DynamoDB at the moment. So we need to do it  either manually or manually via terraform:

1
2
3
4
5
6
7
resource "null_resource" "assign_kinesis_stream_to_dynamo_db" {
  count = var.use_kinesis ? 1 : 0
  provisioner "local-exec" {
    command = "aws dynamodb enable-kinesis-streaming-destination --table-name ${module.TransactionsTable.table_name} --stream-arn ${module.StreamProcessingKinesis[0].aws_kinesis_arn} --profile ${var.aws_profile}"
  }
  depends_on = [module.TransactionsTable, module.StreamProcessingKinesis]
}

You might need to update the AWS CLI version because this DynamoDb sub-command was added not so far ago.

The local-exec is not so convenient as you can expect, so use it with high attention and switch to the proper param, when it will be implemented.

Kinesis configuration

Now it’s time to check how can we configure processing events from DynamoDb via Kinesis.

Actually, I am going to create both the Kinesis stream and Kinesis Firehose delivery stream in one place, because they are quite tightly coupled. They have even one dashboard in the AWS Console.

The Kinesis stream:

1
2
3
4
resource "aws_kinesis_stream" "dynamodb_data_stream" {
  name = "dynamodb_data_stream"
  shard_count = 1
}

Not so difficult, right? Actually, when we assign it to the DynamoDb implicitly the IAM Role being created for this resource, you might need to explicitly configure it, if you like.

Next, we need to create the Kinesis Firehose to upload events from the above stream to the destination point, which is the same ES Domain from the previous article:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
resource "aws_s3_bucket" "bucket" {
  bucket = "tf-stream-bucket"
  acl    = "private"
}

resource "aws_kinesis_firehose_delivery_stream" "kinesis_to_es" {
  depends_on = [aws_iam_role_policy.dyanmo-stream-firehose-policy]
  name = "data-from-dynamoDb-to-es"
  destination = "elasticsearch"

  kinesis_source_configuration {
    kinesis_stream_arn = aws_kinesis_stream.dynamodb_data_stream.arn
    role_arn = aws_iam_role.firehose_processing_role.arn
  }

  s3_configuration {
    role_arn           = aws_iam_role.firehose_processing_role.arn
    bucket_arn         = aws_s3_bucket.bucket.arn
    buffer_size        = 10
    buffer_interval    = 400
    compression_format = "GZIP"
  }

  elasticsearch_configuration {
    domain_arn = var.domain_arn
    role_arn   = aws_iam_role.firehose_processing_role.arn
    index_name = "transactions"
    index_rotation_period = "NoRotation"
  }
}

Few words here:

  • Firehose uses S3 as some kind of buffer for the message, so it requires an S3 bucket to being created.
  • In the ES Domain destination I selected the NoRotation option for the index rotation because the default one is rotation per day.

Well, the last on, but not list Let’s take a look at the Firehose IAM role and its policies:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
resource "aws_iam_role" "firehose_processing_role" {
  name = "firehose_processing_role"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

resource "aws_iam_role_policy" "dyanmo-stream-firehose-policy" {
  role = aws_iam_role.firehose_processing_role.id

  policy = <<-EOF
  {
    "Version": "2012-10-17",
    "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "es:*"
          ],
          "Resource": [
            "${var.domain_arn}",
            "${var.domain_arn}/*"
          ]
      },
      {
        "Sid": "",
        "Effect": "Allow",
        "Action": [
            "kinesis:SubscribeToShard",
            "kinesis:DescribeStreamSummary",
            "kinesis:DescribeStreamConsumer",
            "kinesis:GetShardIterator",
            "kinesis:GetRecords",
            "kinesis:DescribeStream",
            "kinesis:ListTagsForStream"
        ],
        "Resource": "${aws_kinesis_stream.dynamodb_data_stream.arn}"
      },
      {
        "Sid": "",
        "Effect": "Allow",
        "Action": [
            "kinesis:ListStreams",
            "kinesis:ListShards",
            "kinesis:DescribeLimits",
            "kinesis:ListStreamConsumers"
        ],
        "Resource": "*"
      }
    ]
  }
  EOF
}

We need access in both directions: read data from AWS Kinesis stream as a source and write data to the Amazon Elasticsearch Domain as a destination point.

The Test

Well, the testing is the same as in the previous part.

1
terraform apply

after a few minutes, you can find in the console something like that:

1
es_kibana_host = search-dynamostream-ph7dni5uop7gcepdpxhhhyar3i.eu-central-1.es.amazonaws.com/_plugin/kibana/

Before going to the specified URL, let’s put a row in the DynamoDB table:

1
2
3
aws dynamodb put-item \
--table-name UsersTransactions \
--item userId={N=3},transactionId={N=51},accountId={N=3},amount={N=11},shortDescription={S="Final mega test 2"}

All the stages of processing event could be seen in the Amazon CloudWatch logs and metrics, for Kinesis stream, Firehose, and ES Domain.

Conclusion

In comparison with the previous page this option shifts the focus more to the environment setup and keeps the code base almost dry for the application. The only thing here is that the services we used were designed to process quite dense flow of data. Having Kinesis + Firehose for processing one message per day may look as overengineering in comparison with the AWS Lambda option, which sleeps and awakes only when it has something to process.

These are the cons which I see here, but having a zero-code solution with Kinesis services look much more perspective for me in general if you have quite dense data flow.