Cloud Functionsを使ってCloud Storageにアップロードされたオブジェクトを解凍する
2023.12.2
はじめに
お久しぶりです。第三SAチーム所属のreonoです。
この記事はNHN テコラス Advent Calendar 2023の2日目の記事です。
今回はGoogle CloudのCloud Storageにアップロードしたファイルをトリガーに解凍する方法についてご紹介します。
クラウドを利用する際、データ転送量を削減するためにファイルを圧縮してアップロードすることが一般的です。
圧縮ファイルをバッチ的に処理する方法はDataflowのBulk Decompress Cloud Storage Filesテンプレートがありましたが、ファイルのアップロードごとに解凍を行いたかったためCloud Functionsを利用しました。
フロー図
処理の流れとしては以下の流れとなります。
- ローカルからCloud Storageに対して圧縮ファイルをアップロードする。
- Cloud StorageにアップロードされたことをCloud Pub/Subに対して通知を行う。
- Cloud Pub/SubがCloud Functionsをトリガーし、解凍を行う。
- 解凍されたオブジェクトをアップロードする。
コード
以下がサンプルコードです。
from google.cloud import storage import os import gzip import logging import json import re import sys class JsonFormatter: def format(self, record): return json.dumps(vars(record)) def Unzipper(event, context): logging.basicConfig() logging.getLogger().handlers[0].setFormatter(JsonFormatter()) logger = logging.getLogger(__name__) logger.setLevel(os.environ.get("LOG_LEVEL", "INFO")) try: logger.info(f"event={event}") # Download # イベントからバケットとオブジェクト名を取得 bucket_name = event["bucket"] # pathname = 'folder/file.log.gz' pathname = event["name"] # 拡張子が.gzでないものは処理を終了 if bool(re.match(r".*\.gz$", pathname)) != True: sys.exit() # ファイル名を取得 # filename = 'file.log.gz' filename = (re.search(r"([^/]*)(\.log)(\.gz)", pathname)).group(0) except Exception as e: logger.error(f"error={e}") try: # tmp配下にファイルをダウンロード storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(pathname) blob.download_to_filename("/tmp/" + filename) except Exception as e: logger.error(f"error={e}") try: # Unzip with gzip.open("/tmp/" + filename, "rb") as f: file_content = f.read() logger.info(f"file_content = {file_content}") # newpathでpathから.gzの拡張子を削除。 # newnameでファイル名から.gzの拡張子を削除。 newpath = pathname.replace(".gz", "") newname = filename.replace(".gz", "") with open("/tmp/" + newname, mode="w") as z: # 解凍したgzipをutf-8でデコードしてtmp配下の新しいファイルに書き込む z.write(file_content.decode("utf-8")) except Exception as e: logger.error(f"error={e}") try: # Upload # 環境変数よりUNZIP_BUCKET_NAMEを取得する unzip_bucket_name = os.environ.get("UNZIP_BUCKET_NAME") unzip_bucket = storage_client.get_bucket(unzip_bucket_name) logger.info(f"unzip_bucket={unzip_bucket}") # アップロードする先のフォルダパスを設定 unzip_blob = unzip_bucket.blob(newpath) # ローカルのオブジェクトをアップロード unzip_blob.upload_from_filename("/tmp/" + newname) os.remove("/tmp/" + newname) except Exception as e: logger.error(f"error={e}")
Cloud StorageのCloud Pub/Subへの通知にSuffixでフィルターすることができないため、正規表現で拡張子を取得してgzの場合のみ処理を行うようにしました。
try: logger.info(f"event={event}") # Download # イベントからバケットとオブジェクト名を取得 bucket_name = event["bucket"] # pathname = 'folder/file.log.gz' pathname = event["name"] # 拡張子が.gzでないものは処理を終了 if bool(re.match(r".*\.gz$", pathname)) != True: sys.exit() # ファイル名を取得 # filename = 'file.log.gz' filename = (re.search(r"([^/]*)(\.log)(\.gz)", pathname)).group(0) except Exception as e: logger.error(f"error={e}")
設定
以下のリソースを設定する必要があります。
- Cloud Functions作成
- Cloud Pub/Sub作成
- Cloud Storage通知
Cloud Functions作成
以下の設定で作成しました。
- ランタイム
- トリガー(Cloud Pub/Sub)
Cloud Functionsの作成画面でCloud Pub/Subも作成できます。- イベントタイプ
google.cloud.storage.object.v1.finalized - イベントデータのコンテンツ タイプ
application/json - 任意のバケット名
- イベントタイプ
Cloud Storage通知
以下のように設定することでCloud Pub/Subに対して通知されるようになります。
Cloud Shellから実行できます。
詳細については公式のリファレンスを参照ください。
cloud storage buckets notifications create gs://{バケット名} --topic={作成したCloud Pub/Sub名} --event-types=OBJECT_FINALIZE --object-prefix={圧縮ファイルを格納するディレクトリパス}
Cloud SDK Reference -gcloud storage buckets notifications create
実行結果
実行時間
実行時間の例:
- 解凍後のファイルサイズ54.8KB:3秒
- 解凍後のファイルサイズ30.7MB:6秒
ファイルサイズの制約
圧縮前のサイズが約190MBの場合、メモリが上限に達して解凍できませんでした。
Cloud Functionsのメモリサイズよりも十分に小さいサイズのファイルを解凍する必要があります。
まとめ
Cloud FunctionsでCloud Storageにアップロードされたファイルの解凍方法をご紹介しました。
こちらの方法では大きいファイルの解凍には適していないため、大きめのファイルを解凍される際は
DataflowのBulk Decompress Cloud Storage Filesテンプレートなど他の方法を検討することをお勧めします。
Cloud FunctionsでPythonを動かすことを初めて試しましたが、使えるとできることの幅が増えると感じました。
他のチームメンバーのようにスクリプトを作成・活用できるように、これからも学習を進めていきたいと考えています!
テックブログ新着情報のほか、AWSやGoogle Cloudに関するお役立ち情報を配信中!
Follow @twitter2022年4月、NHNテコラスに新卒入社。大学時代は山に登ったり、インドに行ったりしてました。
Recommends
こちらもおすすめ
-
インナーブランディングで100点をとった話
2023.12.1
-
Mountpoint for Amazon S3のパフォーマンス検証
2023.12.13
-
Amazon Inspectorを利用したAmazon ECRの脆弱性情報の検出
2023.12.23
-
ひとりぼっちのPostgreSQLカンファレンスの感想
2023.12.24
-
エンジニアとして働く上で大切にしているマインド
2023.12.21
Special Topics
注目記事はこちら
データ分析入門
これから始めるBigQuery基礎知識
2024.02.28
AWSの料金が 10 %割引になる!
『AWSの請求代行リセールサービス』
2024.07.16