Cloud Functionsを使ってCloud Storageにアップロードされたオブジェクトを解凍する

Google Cloud

2023.12.2

Topics

はじめに

お久しぶりです。第三SAチーム所属のreonoです。
この記事はNHN テコラス Advent Calendar 2023の2日目の記事です。

今回はGoogle CloudのCloud Storageにアップロードしたファイルをトリガーに解凍する方法についてご紹介します。
クラウドを利用する際、データ転送量を削減するためにファイルを圧縮してアップロードすることが一般的です。
圧縮ファイルをバッチ的に処理する方法はDataflowのBulk Decompress Cloud Storage Filesテンプレートがありましたが、ファイルのアップロードごとに解凍を行いたかったためCloud Functionsを利用しました。

フロー図

処理の流れとしては以下の流れとなります。

  1. ローカルからCloud Storageに対して圧縮ファイルをアップロードする。
  2. Cloud StorageにアップロードされたことをCloud Pub/Subに対して通知を行う。
  3. Cloud Pub/SubがCloud Functionsをトリガーし、解凍を行う。
  4. 解凍されたオブジェクトをアップロードする。

コード

以下がサンプルコードです。

from google.cloud import storage
import os
import gzip
import logging
import json
import re
import sys

class JsonFormatter:
    def format(self, record):
        return json.dumps(vars(record))

def Unzipper(event, context):
    logging.basicConfig()
    logging.getLogger().handlers[0].setFormatter(JsonFormatter())

    logger = logging.getLogger(__name__)
    logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
    try:
        logger.info(f"event={event}")
        # Download
        # イベントからバケットとオブジェクト名を取得
        bucket_name = event["bucket"]

        # pathname = 'folder/file.log.gz'
        pathname = event["name"]

        # 拡張子が.gzでないものは処理を終了
        if bool(re.match(r".*\.gz$", pathname)) != True:
       sys.exit()

        # ファイル名を取得
        # filename = 'file.log.gz'
        filename = (re.search(r"([^/]*)(\.log)(\.gz)", pathname)).group(0)
    except Exception as e:
        logger.error(f"error={e}")

    try:
        # tmp配下にファイルをダウンロード
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        blob = bucket.blob(pathname)
        blob.download_to_filename("/tmp/" + filename)
    except Exception as e:
        logger.error(f"error={e}")

    try:
        # Unzip
        with gzip.open("/tmp/" + filename, "rb") as f:
            file_content = f.read()
            logger.info(f"file_content = {file_content}")
            # newpathでpathから.gzの拡張子を削除。
            # newnameでファイル名から.gzの拡張子を削除。
            newpath = pathname.replace(".gz", "")
            newname = filename.replace(".gz", "")
            with open("/tmp/" + newname, mode="w") as z:
                # 解凍したgzipをutf-8でデコードしてtmp配下の新しいファイルに書き込む
                z.write(file_content.decode("utf-8"))
    except Exception as e:
        logger.error(f"error={e}")

    try:
        # Upload
        # 環境変数よりUNZIP_BUCKET_NAMEを取得する
        unzip_bucket_name = os.environ.get("UNZIP_BUCKET_NAME")
        unzip_bucket = storage_client.get_bucket(unzip_bucket_name)
        logger.info(f"unzip_bucket={unzip_bucket}")
        # アップロードする先のフォルダパスを設定
        unzip_blob = unzip_bucket.blob(newpath)
        # ローカルのオブジェクトをアップロード
        unzip_blob.upload_from_filename("/tmp/" + newname)
        os.remove("/tmp/" + newname)
    except Exception as e:
        logger.error(f"error={e}")

Cloud StorageのCloud Pub/Subへの通知にSuffixでフィルターすることができないため、正規表現で拡張子を取得してgzの場合のみ処理を行うようにしました。

try:
    logger.info(f"event={event}")
    # Download
    # イベントからバケットとオブジェクト名を取得
    bucket_name = event["bucket"]

    # pathname = 'folder/file.log.gz'
    pathname = event["name"]

    # 拡張子が.gzでないものは処理を終了
    if bool(re.match(r".*\.gz$", pathname)) != True:
       sys.exit()

    # ファイル名を取得
    # filename = 'file.log.gz'
    filename = (re.search(r"([^/]*)(\.log)(\.gz)", pathname)).group(0)
except Exception as e:
    logger.error(f"error={e}")

設定

以下のリソースを設定する必要があります。

  • Cloud Functions作成
  • Cloud Pub/Sub作成
  • Cloud Storage通知

Cloud Functions作成

以下の設定で作成しました。

  • ランタイム
  • トリガー(Cloud Pub/Sub)
    Cloud Functionsの作成画面でCloud Pub/Subも作成できます。

    • イベントタイプ
      google.cloud.storage.object.v1.finalized
    • イベントデータのコンテンツ タイプ
      application/json
    • 任意のバケット名

Cloud Storage通知

以下のように設定することでCloud Pub/Subに対して通知されるようになります。
Cloud Shellから実行できます。
詳細については公式のリファレンスを参照ください。

cloud storage buckets notifications create gs://{バケット名} --topic={作成したCloud Pub/Sub名} --event-types=OBJECT_FINALIZE --object-prefix={圧縮ファイルを格納するディレクトリパス}

Cloud SDK Reference -gcloud storage buckets notifications create

実行結果

実行時間

実行時間の例:

  • 解凍後のファイルサイズ54.8KB:3秒
  • 解凍後のファイルサイズ30.7MB:6秒

ファイルサイズの制約

圧縮前のサイズが約190MBの場合、メモリが上限に達して解凍できませんでした。
Cloud Functionsのメモリサイズよりも十分に小さいサイズのファイルを解凍する必要があります。

まとめ

Cloud FunctionsでCloud Storageにアップロードされたファイルの解凍方法をご紹介しました。
こちらの方法では大きいファイルの解凍には適していないため、大きめのファイルを解凍される際は
DataflowのBulk Decompress Cloud Storage Filesテンプレートなど他の方法を検討することをお勧めします。

Cloud FunctionsでPythonを動かすことを初めて試しましたが、使えるとできることの幅が増えると感じました。
他のチームメンバーのようにスクリプトを作成・活用できるように、これからも学習を進めていきたいと考えています!

reono

2022年4月、NHNテコラスに新卒入社。大学時代は山に登ったり、インドに行ったりしてました。

Recommends

こちらもおすすめ

Special Topics

注目記事はこちら