Amazon DynamoDB のデータアーカイブソリューションを Terraform で実装する

AWS

2022.5.16

Topics

AWS が公開しているデータベースブログにて紹介されていた DynamoDB のデータアーカイブを Terraform で実装してみた内容です。
TTL と AmazonKinesis の統合を使用して、AmazonDynamoDB から AmazonS3 にデータをアーカイブする| AWS データベースブログ

ソリューションの概要について

このソリューションは、DynamoDB TTL を使用して DynamoDB テーブルから期限切れのアイテムを自動的に削除し、KinesisStreams を使用して期限切れのアイテムをキャプチャします。新しいアイテムが Kinesis ストリームに追加されると(つまり、TTL が古いアイテムを削除すると)、それらは Amazon KinesisDataFirehose 配信ストリームに送信されます。Kinesis Data Firehose は、長期アーカイブのためにデータを Amazon Simple Storage Service(Amazon S3)にロードするためのシンプルで完全に管理されたソリューションを提供します。
TTL と AmazonKinesis の統合を使用して、AmazonDynamoDB から AmazonS3 にデータをアーカイブする| AWS データベースブログ

DynamoDBソリューション構成図

ソリューション手順の概要は以下の通りです。

  1. DynamoDB TTL を有効にします
    1. これにより、DynamoDB が有効切れのレコードを自動で削除してくれます
  2. DynamoDB の Kinesis データストリームを有効にします
    1. これにより、データが Kinesis データストリームにストリーミングされ、さらにアーカイブ処理が行われます
  3. Kinesis Data Firehose を Kinesis Data Streams に接続します
    1. Lambda でデータ変換を実行して TTL レコードを選択します
  4. TTL によって処理されたレコードのデータを S3 バケットへ配信します

このソリューションを使うことで、IoT などで大量にデータを保存するが時間経過によって不要になったデータを DynamoDB よりコストの安い S3 に保存することでストレージのコストの節約が行なえます。

Terraform について

Terraform の作成するリソースは以下の通りです。

  1. DynamoDB 編
    1. テーブル作成:aws_dynamodb_table
    2. アイテム追加:aws_dynamodb_table_item
    3. DynamoDB と Kinesis Data Streams を連携:aws_dynamodb_kinesis_streaming_destination
  2. S3 編
    1. バケット作成:aws_s3_bucket
  3. Lambda 編
    1. データ変換プログラム作成:aws_lambda_function
  4. Kinesis 編
    1. Kinesis データストリーム作成:aws_kinesis_stream
    2. Kinesis Data Firehose に各リソースを連携:aws_kinesis_firehose_delivery_stream

前提

Terraform バージョンについて以下の通りです。

C:\Users>terraform --version
Terraform v1.1.6
on windows_amd64
+ provider registry.terraform.io/hashicorp/archive v2.2.0
+ provider registry.terraform.io/hashicorp/aws v3.75.1

ディレクトリ構成は以下の通りです。

C:.
│  DynamoDB.tf
│  Kinesis.tf
│  Lambda.tf
│  main.tf
│  S3.tf
│  variables.tf
└─contents
        lambda.zip
        lambda_function.py

main.tf&variables.tf

main.tfvariables.tfについては以下の通りです。

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 3.0"
    }
  }
}
provider "aws" {
  region  = "us-east-1"
  profile = "default"
  default_tags {
    tags = {
      "Name" = var.tag_name
    }
  }
}

各変数はお好みで変更してください。

variable "project_name" {
  default     = "cold-airflow-dynamodb-ttl"
  type        = string
  description = "project name"
}

variable "lambda_file_name" {
  default = "/contents/lambda_function.py"
}

variable "lambda_file_zip_name" {
  default = "/contents/lambda.zip"
}

variable "handler_name" {
  type        = string
  description = "lambda function name"
  default     = "lambda_function.lambda_handler"
}
variable "tag_name" {
  type        = string
  default     = "cold-airflow"
  description = "Resource Tag Name"
}

DynamoDB TTL 有効したテーブルを作成

TTL の設定は簡単で、TTL の列を指定するだけです。

aws_dynamodb_table | Resources | hashicorp/aws | Terraform Registry

resource "aws_dynamodb_table" "basic-dynamodb-table" {
  name           = "${var.project_name}-table"
  billing_mode   = "PROVISIONED"
  read_capacity  = 20
  write_capacity = 20
  hash_key       = "UserId"
  range_key      = "GameTitle"

  attribute {
    name = "UserId"
    type = "S"
  }

  attribute {
    name = "GameTitle"
    type = "S"
  }

  ttl {
    attribute_name = "ExpiryDate"
    enabled        = true
  }
}

データ挿入時に、ExpiryDate列で有効期限のタイムスタンプを設定すると、DynamoDB が自動で削除してくれます。
仕組み: DynamoDB の有効期限 (TTL) – Amazon DynamoDB

アイテム追加

Terraform では、DynamoDB テーブルアイテムを追加するリソースがあります。
aws_dynamodb_table_item | Resources | hashicorp/aws | Terraform Registry

ExpiryDateには削除されるいい感じの時間を適当にタイムスタンプに変換して設定しています。

resource "aws_dynamodb_table_item" "example1" {
  table_name = aws_dynamodb_table.basic-dynamodb-table.name
  hash_key   = aws_dynamodb_table.basic-dynamodb-table.hash_key
  range_key  = aws_dynamodb_table.basic-dynamodb-table.range_key
  item       = <<ITEM
{
  "UserId": {"S": "test"},
  "GameTitle": {"S": "1"},
  "ExpiryDate": {"N": "1652142987"}
}
ITEM
}
resource "aws_dynamodb_table_item" "example2" {
  table_name = aws_dynamodb_table.basic-dynamodb-table.name
  hash_key   = aws_dynamodb_table.basic-dynamodb-table.hash_key
  range_key  = aws_dynamodb_table.basic-dynamodb-table.range_key
  item       = <<ITEM
{
  "UserId": {"S": "test"},
  "GameTitle": {"S": "2"},
  "ExpiryDate": {"N": "1652142987"}
}
ITEM
}

TTL 削除待機するため一度 apply

TTL で削除されるのには少し時間がかかるため一度ここで apply をしてデータを確認します。

C:\Users>aws dynamodb scan --table-name cold-airflow-dynamodb-ttl-table
{
    "Items": [
        {
            "UserId": {
                "S": "test"
            },
            "ExpiryDate": {
                "N": "1652142987"
            },
            "GameTitle": {
                "S": "1"
            }
        },
        {
            "UserId": {
                "S": "test"
            },
            "ExpiryDate": {
                "N": "1652142987"
            },
            "GameTitle": {
                "S": "2"
            }
        }
    ],
    "Count": 2,
    "ScannedCount": 2,
    "ConsumedCapacity": null
}

DynamoDB の Kinesis データストリームを有効

Terraform のサンプルをそのまま使用しています。

aws_kinesis_stream | Resources | hashicorp/aws | Terraform Registry

resource "aws_kinesis_stream" "test_stream" {
  name             = "${var.project_name}-kinesis-stream"
  shard_count      = 1
  retention_period = 48

  shard_level_metrics = [
    "IncomingBytes",
    "OutgoingBytes",
  ]

  stream_mode_details {
    stream_mode = "PROVISIONED"
  }
}

さらに、Kinesis Data Streams と DynamoDB の連携を行います。

resource "aws_dynamodb_kinesis_streaming_destination" "example" {
  stream_arn = aws_kinesis_stream.test_stream.arn
  table_name = aws_dynamodb_table.basic-dynamodb-table.name
}

Kinesis Data Streams を使用して DynamoDB の変更をキャプチャーする – Amazon DynamoDB

Lambda でデータ変換を実装

Kinesis Data Streams と DynamoDB の連携では、テーブルのデータに対するすべての変更をキャプチャしたデータレコードが送信されてきます。
そのため、TTL で削除されたレコードかどうかを判定しているのがこの Lambda です。

data "archive_file" "sample_function" {
  type        = "zip"
  source_file = "${path.module}${var.lambda_file_name}"
  output_path = "${path.module}${var.lambda_file_zip_name}"
}

data "aws_iam_policy_document" "AWSLambdaTrustPolicy" {
  statement {
    actions = ["sts:AssumeRole"]
    effect  = "Allow"
    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "function_role" {
  name               = "${var.project_name}-function_role"
  assume_role_policy = data.aws_iam_policy_document.AWSLambdaTrustPolicy.json
}

resource "aws_iam_role_policy_attachment" "lambda_policy" {
  role       = aws_iam_role.function_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

resource "aws_lambda_function" "test_lambda" {
  filename         = data.archive_file.sample_function.output_path
  function_name    = "${var.project_name}-lambda-function"
  role             = aws_iam_role.function_role.arn
  handler          = var.handler_name
  publish          = true
  source_code_hash = data.archive_file.sample_function.output_base64sha256
  runtime          = "python3.9"
  timeout          = 60
}

resource "aws_lambda_permission" "allow_cloudwatch" {
  statement_id  = "AllowExecutionFromCloudWatch"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.test_lambda.function_name
  principal     = "events.amazonaws.com"
}

データ変換プログラム

AWS 記事にあったコードをそのまま使用しています。

コード内容はシンプルで、Kinesis Data Streams から受け取ったデータをverify_if_expired関数で削除されたレコードをかどうかを判定しています。

from __future__ import print_function
import json
import base64

null = None

def lambda_handler(event, context):
    output = []
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf8')

        ret_val = verify_if_expired(payload)
        if ret_val == True:
            output_record = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': record['data']
            }
            output.append(output_record)

        else:
            output_record = {
                'recordId': record['recordId'],
                'result': 'Dropped',
                'data': null
            }
            output.append(output_record)

    return {'records': output}


def verify_if_expired(payload):
    try:
        parsed_json = json.loads(payload)
        if str(parsed_json["eventName"]).upper() == "REMOVE":
            if parsed_json["userIdentity"]["principalId"] == "dynamodb.amazonaws.com":
                return True
    except Exception as e:
        print(e,"error")
    return False

注意点としては、Kinesis から送られてくるデータはエンコードされているためデコード処理が必要です。

Kinesis Data Firehose を Kinesis Data Streams に接続

TTL によって処理されたレコードの配信先 S3 バケット作成

S3 はデータを受け取るだけなので、サンプルにあるコードで作成します。

aws_s3_bucket | Resources | hashicorp/aws | Terraform Registry

resource "aws_s3_bucket" "bucket" {
  bucket = "${var.project_name}-bucket"
}
resource "aws_s3_bucket_acl" "bucket_acl" {
  bucket = aws_s3_bucket.bucket.id
  acl    = "private"
}

Kinesis Data Firehose に各リソース連携

ここでは今まで作成したリソースを以下3つで連携します。

  • kinesis_source_configuration
    • データソースを定義します
    • Kinesis Data Streams と紐づけます
  • extended_s3_configuration
    • データアウトソース先を定義します
    • S3 と紐づけます
  • processing_configuration
    • データ加工先を定義します
    • Lambda を紐づけます
resource "aws_kinesis_firehose_delivery_stream" "extended_s3_stream" {
  name        = "${var.project_name}-kinesis-firehose"
  destination = "extended_s3"
  kinesis_source_configuration {
    kinesis_stream_arn = aws_kinesis_stream.test_stream.arn
    role_arn           = aws_iam_role.firehose_role.arn
  }
  extended_s3_configuration {
    role_arn   = aws_iam_role.firehose_role.arn
    bucket_arn = aws_s3_bucket.bucket.arn

    processing_configuration {
      enabled = "true"
      processors {
        type = "Lambda"
        parameters {
          parameter_name  = "LambdaArn"
          parameter_value = "${aws_lambda_function.test_lambda.arn}:$LATEST"
        }
      }
    }
  }
}

合わせて、各リソースへの Role も作成します。

resource "aws_iam_role" "firehose_role" {
  name = "${var.project_name}-firehose-role"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Effect": "Allow"
    }
  ]
}
EOF
}

data "aws_iam_policy_document" "firehose_s3_lambda" {
  statement {
    effect = "Allow"
    actions = [
      "s3:AbortMultipartUpload",
      "s3:GetBucketLocation",
      "s3:GetObject",
      "s3:ListBucket",
      "s3:ListBucketMultipartUploads",
      "s3:PutObject"
    ]
    resources = [
      "arn:aws:s3:::${aws_s3_bucket.bucket.id}",
      "arn:aws:s3:::${aws_s3_bucket.bucket.id}/*"
    ]
  }
  statement {
    effect = "Allow"
    actions = [
      "kinesis:*"
    ]
    resources = [
      "*"
    ]
  }
  statement {
    effect = "Allow"
    actions = [
      "lambda:*"
    ]
    resources = [
      "*"
    ]
  }
}

resource "aws_iam_policy" "firehose_policy" {
  name   = "${var.project_name}-firehose-policy"
  policy = data.aws_iam_policy_document.firehose_s3_lambda.json
}

resource "aws_iam_role_policy_attachment" "firehose_att" {
  role       = aws_iam_role.firehose_role.name
  policy_arn = aws_iam_policy.firehose_policy.arn
}

動作確認

TTL を設定したレコードが対象時刻をすぎると削除対象とされますが、厳密な時刻で削除されるわけではないので気長に待ちます。

TTL は通常、有効期限が切れてから 48 時間以内に期限切れのアイテムを削除します。
仕組み:DynamoDB の存続時間(TTL)-Amazon DynamoDB

DynamoDB のモニタリングでレコードが削除されたかどうかを確認できます。

DynamoDBのTTLモニタリング結果

レコードが削除されれば S3 にデータが格納されます。

以下は S3 に保存された JSON データです。

{"awsRegion":"us-east-1","eventID":"91e53bc6-5d30-4852-a13b-99faf9070272","eventName":"REMOVE","userIdentity":{"principalId":"dynamodb.amazonaws.com","type":"Service"},"recordFormat":"application/json","tableName":"cold-airflow-dynamodb-ttl-table","dynamodb":{"ApproximateCreationDateTime":1652145142697,"Keys":{"UserId":{"S":"test"},"GameTitle":{"S":"2"}},"OldImage":{"UserId":{"S":"test"},"ExpiryDate":{"N":"1652142987"},"GameTitle":{"S":"2"}},"SizeBytes":56},"eventSource":"aws:dynamodb"}
{"awsRegion":"us-east-1","eventID":"0144f16f-17ea-493c-91f6-b088db238b69","eventName":"REMOVE","userIdentity":{"principalId":"dynamodb.amazonaws.com","type":"Service"},"recordFormat":"application/json","tableName":"cold-airflow-dynamodb-ttl-table","dynamodb":{"ApproximateCreationDateTime":1652145143698,"Keys":{"UserId":{"S":"test"},"GameTitle":{"S":"1"}},"OldImage":{"UserId":{"S":"test"},"ExpiryDate":{"N":"1652142987"},"GameTitle":{"S":"1"}},"SizeBytes":56},"eventSource":"aws:dynamodb"}

まとめ

このソリューションを活用することで DynamoDB のストレージに対するコスト最適化を行うことができます。

また、実際に作成してみると曖昧な理解をしていた Kinesis の違いについてしっかりと理解できました。

Cold-Airflow

2021年新卒入社。インフラエンジニアです。RDBが三度の飯より好きです。 主にデータベースやAWSのサーバレスについて書く予定です。あと寒いのは苦手です。

Recommends

こちらもおすすめ

Special Topics

注目記事はこちら