Amazon DynamoDB のデータアーカイブソリューションを Terraform で実装する
2022.5.16
AWS が公開しているデータベースブログにて紹介されていた DynamoDB のデータアーカイブを Terraform で実装してみた内容です。
TTL と AmazonKinesis の統合を使用して、AmazonDynamoDB から AmazonS3 にデータをアーカイブする| AWS データベースブログ
ソリューションの概要について
このソリューションは、DynamoDB TTL を使用して DynamoDB テーブルから期限切れのアイテムを自動的に削除し、KinesisStreams を使用して期限切れのアイテムをキャプチャします。新しいアイテムが Kinesis ストリームに追加されると(つまり、TTL が古いアイテムを削除すると)、それらは Amazon KinesisDataFirehose 配信ストリームに送信されます。Kinesis Data Firehose は、長期アーカイブのためにデータを Amazon Simple Storage Service(Amazon S3)にロードするためのシンプルで完全に管理されたソリューションを提供します。
TTL と AmazonKinesis の統合を使用して、AmazonDynamoDB から AmazonS3 にデータをアーカイブする| AWS データベースブログ
ソリューション手順の概要は以下の通りです。
- DynamoDB TTL を有効にします
- これにより、DynamoDB が有効切れのレコードを自動で削除してくれます
- DynamoDB の Kinesis データストリームを有効にします
- これにより、データが Kinesis データストリームにストリーミングされ、さらにアーカイブ処理が行われます
- Kinesis Data Firehose を Kinesis Data Streams に接続します
- Lambda でデータ変換を実行して TTL レコードを選択します
- TTL によって処理されたレコードのデータを S3 バケットへ配信します
このソリューションを使うことで、IoT などで大量にデータを保存するが時間経過によって不要になったデータを DynamoDB よりコストの安い S3 に保存することでストレージのコストの節約が行なえます。
Terraform について
Terraform の作成するリソースは以下の通りです。
- DynamoDB 編
- テーブル作成:aws_dynamodb_table
- アイテム追加:aws_dynamodb_table_item
- DynamoDB と Kinesis Data Streams を連携:aws_dynamodb_kinesis_streaming_destination
- S3 編
- バケット作成:aws_s3_bucket
- Lambda 編
- データ変換プログラム作成:aws_lambda_function
- Kinesis 編
- Kinesis データストリーム作成:aws_kinesis_stream
- Kinesis Data Firehose に各リソースを連携:aws_kinesis_firehose_delivery_stream
前提
Terraform バージョンについて以下の通りです。
C:\Users>terraform --version Terraform v1.1.6 on windows_amd64 + provider registry.terraform.io/hashicorp/archive v2.2.0 + provider registry.terraform.io/hashicorp/aws v3.75.1
ディレクトリ構成は以下の通りです。
C:. │ DynamoDB.tf │ Kinesis.tf │ Lambda.tf │ main.tf │ S3.tf │ variables.tf └─contents lambda.zip lambda_function.py
main.tf&variables.tf
main.tf
とvariables.tf
については以下の通りです。
terraform { required_providers { aws = { source = "hashicorp/aws" version = "~> 3.0" } } } provider "aws" { region = "us-east-1" profile = "default" default_tags { tags = { "Name" = var.tag_name } } }
各変数はお好みで変更してください。
variable "project_name" { default = "cold-airflow-dynamodb-ttl" type = string description = "project name" } variable "lambda_file_name" { default = "/contents/lambda_function.py" } variable "lambda_file_zip_name" { default = "/contents/lambda.zip" } variable "handler_name" { type = string description = "lambda function name" default = "lambda_function.lambda_handler" } variable "tag_name" { type = string default = "cold-airflow" description = "Resource Tag Name" }
DynamoDB TTL 有効したテーブルを作成
TTL の設定は簡単で、TTL の列を指定するだけです。
aws_dynamodb_table | Resources | hashicorp/aws | Terraform Registry
resource "aws_dynamodb_table" "basic-dynamodb-table" { name = "${var.project_name}-table" billing_mode = "PROVISIONED" read_capacity = 20 write_capacity = 20 hash_key = "UserId" range_key = "GameTitle" attribute { name = "UserId" type = "S" } attribute { name = "GameTitle" type = "S" } ttl { attribute_name = "ExpiryDate" enabled = true } }
データ挿入時に、ExpiryDate
列で有効期限のタイムスタンプを設定すると、DynamoDB が自動で削除してくれます。
仕組み: DynamoDB の有効期限 (TTL) – Amazon DynamoDB
アイテム追加
Terraform では、DynamoDB テーブルアイテムを追加するリソースがあります。
aws_dynamodb_table_item | Resources | hashicorp/aws | Terraform Registry
ExpiryDate
には削除されるいい感じの時間を適当にタイムスタンプに変換して設定しています。
resource "aws_dynamodb_table_item" "example1" { table_name = aws_dynamodb_table.basic-dynamodb-table.name hash_key = aws_dynamodb_table.basic-dynamodb-table.hash_key range_key = aws_dynamodb_table.basic-dynamodb-table.range_key item = <<ITEM { "UserId": {"S": "test"}, "GameTitle": {"S": "1"}, "ExpiryDate": {"N": "1652142987"} } ITEM } resource "aws_dynamodb_table_item" "example2" { table_name = aws_dynamodb_table.basic-dynamodb-table.name hash_key = aws_dynamodb_table.basic-dynamodb-table.hash_key range_key = aws_dynamodb_table.basic-dynamodb-table.range_key item = <<ITEM { "UserId": {"S": "test"}, "GameTitle": {"S": "2"}, "ExpiryDate": {"N": "1652142987"} } ITEM }
TTL 削除待機するため一度 apply
TTL で削除されるのには少し時間がかかるため一度ここで apply をしてデータを確認します。
C:\Users>aws dynamodb scan --table-name cold-airflow-dynamodb-ttl-table { "Items": [ { "UserId": { "S": "test" }, "ExpiryDate": { "N": "1652142987" }, "GameTitle": { "S": "1" } }, { "UserId": { "S": "test" }, "ExpiryDate": { "N": "1652142987" }, "GameTitle": { "S": "2" } } ], "Count": 2, "ScannedCount": 2, "ConsumedCapacity": null }
DynamoDB の Kinesis データストリームを有効
Terraform のサンプルをそのまま使用しています。
aws_kinesis_stream | Resources | hashicorp/aws | Terraform Registry
resource "aws_kinesis_stream" "test_stream" { name = "${var.project_name}-kinesis-stream" shard_count = 1 retention_period = 48 shard_level_metrics = [ "IncomingBytes", "OutgoingBytes", ] stream_mode_details { stream_mode = "PROVISIONED" } }
さらに、Kinesis Data Streams と DynamoDB の連携を行います。
resource "aws_dynamodb_kinesis_streaming_destination" "example" { stream_arn = aws_kinesis_stream.test_stream.arn table_name = aws_dynamodb_table.basic-dynamodb-table.name }
Kinesis Data Streams を使用して DynamoDB の変更をキャプチャーする – Amazon DynamoDB
Lambda でデータ変換を実装
Kinesis Data Streams と DynamoDB の連携では、テーブルのデータに対するすべての変更をキャプチャしたデータレコードが送信されてきます。
そのため、TTL で削除されたレコードかどうかを判定しているのがこの Lambda です。
data "archive_file" "sample_function" { type = "zip" source_file = "${path.module}${var.lambda_file_name}" output_path = "${path.module}${var.lambda_file_zip_name}" } data "aws_iam_policy_document" "AWSLambdaTrustPolicy" { statement { actions = ["sts:AssumeRole"] effect = "Allow" principals { type = "Service" identifiers = ["lambda.amazonaws.com"] } } } resource "aws_iam_role" "function_role" { name = "${var.project_name}-function_role" assume_role_policy = data.aws_iam_policy_document.AWSLambdaTrustPolicy.json } resource "aws_iam_role_policy_attachment" "lambda_policy" { role = aws_iam_role.function_role.name policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" } resource "aws_lambda_function" "test_lambda" { filename = data.archive_file.sample_function.output_path function_name = "${var.project_name}-lambda-function" role = aws_iam_role.function_role.arn handler = var.handler_name publish = true source_code_hash = data.archive_file.sample_function.output_base64sha256 runtime = "python3.9" timeout = 60 } resource "aws_lambda_permission" "allow_cloudwatch" { statement_id = "AllowExecutionFromCloudWatch" action = "lambda:InvokeFunction" function_name = aws_lambda_function.test_lambda.function_name principal = "events.amazonaws.com" }
データ変換プログラム
AWS 記事にあったコードをそのまま使用しています。
コード内容はシンプルで、Kinesis Data Streams から受け取ったデータをverify_if_expired関数
で削除されたレコードをかどうかを判定しています。
from __future__ import print_function import json import base64 null = None def lambda_handler(event, context): output = [] for record in event['records']: payload = base64.b64decode(record['data']).decode('utf8') ret_val = verify_if_expired(payload) if ret_val == True: output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': record['data'] } output.append(output_record) else: output_record = { 'recordId': record['recordId'], 'result': 'Dropped', 'data': null } output.append(output_record) return {'records': output} def verify_if_expired(payload): try: parsed_json = json.loads(payload) if str(parsed_json["eventName"]).upper() == "REMOVE": if parsed_json["userIdentity"]["principalId"] == "dynamodb.amazonaws.com": return True except Exception as e: print(e,"error") return False
注意点としては、Kinesis から送られてくるデータはエンコードされているためデコード処理が必要です。
Kinesis Data Firehose を Kinesis Data Streams に接続
TTL によって処理されたレコードの配信先 S3 バケット作成
S3 はデータを受け取るだけなので、サンプルにあるコードで作成します。
aws_s3_bucket | Resources | hashicorp/aws | Terraform Registry
resource "aws_s3_bucket" "bucket" { bucket = "${var.project_name}-bucket" } resource "aws_s3_bucket_acl" "bucket_acl" { bucket = aws_s3_bucket.bucket.id acl = "private" }
Kinesis Data Firehose に各リソース連携
ここでは今まで作成したリソースを以下3つで連携します。
kinesis_source_configuration
- データソースを定義します
- Kinesis Data Streams と紐づけます
extended_s3_configuration
- データアウトソース先を定義します
- S3 と紐づけます
processing_configuration
- データ加工先を定義します
- Lambda を紐づけます
resource "aws_kinesis_firehose_delivery_stream" "extended_s3_stream" { name = "${var.project_name}-kinesis-firehose" destination = "extended_s3" kinesis_source_configuration { kinesis_stream_arn = aws_kinesis_stream.test_stream.arn role_arn = aws_iam_role.firehose_role.arn } extended_s3_configuration { role_arn = aws_iam_role.firehose_role.arn bucket_arn = aws_s3_bucket.bucket.arn processing_configuration { enabled = "true" processors { type = "Lambda" parameters { parameter_name = "LambdaArn" parameter_value = "${aws_lambda_function.test_lambda.arn}:$LATEST" } } } } }
合わせて、各リソースへの Role も作成します。
resource "aws_iam_role" "firehose_role" { name = "${var.project_name}-firehose-role" assume_role_policy = <<EOF { "Version": "2012-10-17", "Statement": [ { "Action": "sts:AssumeRole", "Principal": { "Service": "firehose.amazonaws.com" }, "Effect": "Allow" } ] } EOF } data "aws_iam_policy_document" "firehose_s3_lambda" { statement { effect = "Allow" actions = [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ] resources = [ "arn:aws:s3:::${aws_s3_bucket.bucket.id}", "arn:aws:s3:::${aws_s3_bucket.bucket.id}/*" ] } statement { effect = "Allow" actions = [ "kinesis:*" ] resources = [ "*" ] } statement { effect = "Allow" actions = [ "lambda:*" ] resources = [ "*" ] } } resource "aws_iam_policy" "firehose_policy" { name = "${var.project_name}-firehose-policy" policy = data.aws_iam_policy_document.firehose_s3_lambda.json } resource "aws_iam_role_policy_attachment" "firehose_att" { role = aws_iam_role.firehose_role.name policy_arn = aws_iam_policy.firehose_policy.arn }
動作確認
TTL を設定したレコードが対象時刻をすぎると削除対象とされますが、厳密な時刻で削除されるわけではないので気長に待ちます。
TTL は通常、有効期限が切れてから 48 時間以内に期限切れのアイテムを削除します。
仕組み:DynamoDB の存続時間(TTL)-Amazon DynamoDB
DynamoDB のモニタリングでレコードが削除されたかどうかを確認できます。
レコードが削除されれば S3 にデータが格納されます。
以下は S3 に保存された JSON データです。
{"awsRegion":"us-east-1","eventID":"91e53bc6-5d30-4852-a13b-99faf9070272","eventName":"REMOVE","userIdentity":{"principalId":"dynamodb.amazonaws.com","type":"Service"},"recordFormat":"application/json","tableName":"cold-airflow-dynamodb-ttl-table","dynamodb":{"ApproximateCreationDateTime":1652145142697,"Keys":{"UserId":{"S":"test"},"GameTitle":{"S":"2"}},"OldImage":{"UserId":{"S":"test"},"ExpiryDate":{"N":"1652142987"},"GameTitle":{"S":"2"}},"SizeBytes":56},"eventSource":"aws:dynamodb"} {"awsRegion":"us-east-1","eventID":"0144f16f-17ea-493c-91f6-b088db238b69","eventName":"REMOVE","userIdentity":{"principalId":"dynamodb.amazonaws.com","type":"Service"},"recordFormat":"application/json","tableName":"cold-airflow-dynamodb-ttl-table","dynamodb":{"ApproximateCreationDateTime":1652145143698,"Keys":{"UserId":{"S":"test"},"GameTitle":{"S":"1"}},"OldImage":{"UserId":{"S":"test"},"ExpiryDate":{"N":"1652142987"},"GameTitle":{"S":"1"}},"SizeBytes":56},"eventSource":"aws:dynamodb"}
まとめ
このソリューションを活用することで DynamoDB のストレージに対するコスト最適化を行うことができます。
また、実際に作成してみると曖昧な理解をしていた Kinesis の違いについてしっかりと理解できました。
テックブログ新着情報のほか、AWSやGoogle Cloudに関するお役立ち情報を配信中!
Follow @twitter2021年新卒入社。インフラエンジニアです。RDBが三度の飯より好きです。 主にデータベースやAWSのサーバレスについて書く予定です。あと寒いのは苦手です。
Recommends
こちらもおすすめ
-
Amazon DynamoDB テーブルを別のアカウントに移行する
2022.9.2
Special Topics
注目記事はこちら
データ分析入門
これから始めるBigQuery基礎知識
2024.02.28
AWSの料金が 10 %割引になる!
『AWSの請求代行リセールサービス』
2024.07.16