Google BigQueryからAmazon Redshiftにデータを移行してみる

AWS

2019.11.29

Topics

こんにちは、データサイエンスチームのmotchieです。

クラウド上のデータウェアハウス(DWH)といえば、Google BigQueryAmazon Redshiftなどが有名です。
DWH間でデータ移行の方法については、
RedshiftからBigQueryに移行する場合、BigQuery Data Transfer Service APIを使う方法等がありますが、BigQueryからRedshiftに移行する場合については、あまり情報が見当たらないように思います。

そこで本記事では、BigQueryからRedshiftへデータを移行する方法を一つご紹介したいと思います。

リソースの全体像

流れ

以下の流れでリソースを作成していきたいと思います。

  • CloudFormationでRedshiftクラスターを作成する
  • GCS経由でBigQueryのデータでS3に出力する
  • Glueを使ってデータをRedshiftに読み込ませる
  • Redshiftに接続してクエリを投げてみる
  • リソースの削除

前提条件として、AWS・GCP上でリソースにアクセスできる十分な権限が必要になります。

CloudFormationでRedshiftクラスターを作成する

まず、AWS上にRedshiftクラスターを作成していきます。
以下のAWSブログのCloudFormationテンプレートを活用したいと思います。

Amazon Web Services ブログ:AWS CloudFormation を使用して Amazon Redshift クラスターの作成を自動化する

上記のブログのテンプレートを活用することで、
AWSのベストプラクティスに沿って設計されたRedshiftクラスターを一発で作成することができます。
設計とテンプレートの詳細については、上記のブログをご覧ください。

各テンプレートのパラメーターについては、基本的にデフォルトの値を使用したいと思います。
ただし、今回の用途では、

  • GCSからS3へデータをコピーするための認証情報とバケット
  • GlueからRedshiftへアクセスするためのネットワーク設定
    を追加する必要があるため、上記のテンプレートをカスタマイズして、リソースを追加したいと思います。

テンプレートの取得

以下の3つのCloudFormationテンプレートを取得します。

  • aws-vpc-blog.template
  • linux-bastion-blog.templat
  • redshift-blog.template
aws s3 cp s3://aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/aws-vpc-blog.template ./
aws s3 cp s3://aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/linux-bastion-blog.template ./
aws s3 cp s3://aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/redshift-blog.template ./

テンプレートの修正

まず、linux-bastion-blog.template (Bastionスタック用テンプレート)を修正していきます。
なお、修正後のテンプレートの内容は以下で確認できます。
linux-bastion-blog.template

      - Name: AutoScalingGroupName
        Value: !Ref BastionAutoScalingGroup

  BigQueryDataS3Bucket:
    Type: 'AWS::S3::Bucket'
    DeletionPolicy: Delete
    Properties: 
      AccessControl: Private
      BucketName: !Join [ "-", [ !Ref 'AWS::StackName', "bigquery-data-bucket", !Ref 'AWS::AccountId' ] ]

  S3UserForBoto:
    Type: AWS::IAM::User
    Properties: 
      Policies: 
        - PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Action:
                  - 's3:PutObject'
                Effect: Allow
                Resource: !Join
                  - "/"
                  - - !GetAtt BigQueryDataS3Bucket.Arn
                    - "*" 
          PolicyName: s3-put-policy-from-gcs-for-boto
      UserName: !Join [ "-", [ !Ref 'AWS::StackName', "s3-user-for-boto", !Ref 'AWS::Region' ] ] 

  AccessKeyForBoto:
    Type: AWS::IAM::AccessKey
    Properties: 
      UserName: !Ref S3UserForBoto

  SecretsForBoto:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: !Sub "secrets-for-boto-user-${AWS::StackName}"
      SecretString: !Sub '{"aws_access_key_id": "${AccessKeyForBoto}", "aws_secret_access_key": "${AccessKeyForBoto.SecretAccessKey}"}'

Outputs:

上記の修正によって、以下のリソースが追加されます。

  • データ格納用S3バケット
  • gsutilコマンド用のIAMユーザー・認証情報
  • 認証情報格納用のSecretsManager

また、Redshiftスタックでデータ格納用S3バケット名が必要なため、以下の修正によって、値を出力するようにします。

  BastionSecurityGroupID:
    Description: Use this Security Group to reference incoming traffic from the SSH bastion host/instance
    Value: !Ref BastionSecurityGroup
    Export:
      Name: !Sub '${AWS::StackName}-BastionSecurityGroupID'
   
  BigQueryDataS3BucketName:
    Description: Bucket name will be imported in redshift cluter stack 
    Value: !Ref BigQueryDataS3Bucket
    Export:
      Name: !Sub '${AWS::StackName}-BigQueryDataS3BucketName'

次に、redshift-blog.template (Redshiftスタック用テンプレート) を修正していきます。
なお、修正後のテンプレートの内容は以下で確認できます。
redshift-blog.template

        -
          Key: Compliance
          Value: !Ref TagCompliance

  RedshiftSecurityGroupIngress:
    Type: 'AWS::EC2::SecurityGroupIngress'
    Properties:
      IpProtocol: "-1"
      SourceSecurityGroupId: !Ref RedshiftSecurityGroup
      GroupId: !Ref RedshiftSecurityGroup
 
  RedshiftLoggingS3Bucket:

GlueからVPC内のRedshiftにアクセスする際、ENIを利用します。
AWS ドキュメント:VPC の JDBC データストアに接続する
Redshift・Glueが利用するセキュリティグループに対して、すべてのTCP ポートに対する自己参照のインバウンドルールを追加します。

テンプレートの修正は以上です。
それでは、各スタックをデプロイしていきます。

スタックの作成

VPCスタック、Bastionスタック、Redshiftスタックの順で作成します。

まずパラメータをまとめて作成しておきます。
スタック名などは同じで問題ないですが、EMAIL_NOTIFICATION_LISTKEY_PAIR_NAME (事前にEC2キーペアの作成が必要です)は自身の値に変更してください。

# VPC stack parameters
VPC_STACK_NAME=techblog-vpc-stack
# Redshift stack parameters
REDSHIFT_STACK_NAME=techblog-redshift-stack
MASTER_USER_PASSWORD="TestPass2019"
GLUE_CATALOG_DB_NAME="bq-test-db"
#Bastion server stack parameters
BASTION_STACK_NAME=techblog-bastion-stack
EMAIL_NOTIFICATION_LIST="YOUR_EMAIL_ADDRESS"
KEY_PAIR_NAME="YOUR_EC2_KEY_PAIR_NAME"
# your ipaddress with CIDR
REMOTE_ACCESS_CIDR="$(dig +short myip.opendns.com @resolver1.opendns.com)/32"

VPCスタックを作成します。完了まで数分かかります。

aws cloudformation create-stack \
  --stack-name $VPC_STACK_NAME \
  --template-body file://aws-vpc-blog.template \
  --parameters \
    ParameterKey=ClassB,ParameterValue=0 \
  --capabilities CAPABILITY_IAM

マネジメントコンソールでCloudFormationを開き、スタック作成の完了を確認してから、Bastionスタックの作成に移ってください。
Bastionスタックを作成します。以下のコマンドを実行します。

aws cloudformation create-stack\
  --stack-name $BASTION_STACK_NAME\
  --template-body file://linux-bastion-blog.template\
  --parameters\
    ParameterKey=ParentVPCStack,ParameterValue=$VPC_STACK_NAME\
    ParameterKey=NotificationList,ParameterValue=$EMAIL_NOTIFICATION_LIST\
    ParameterKey=KeyPairName,ParameterValue=$KEY_PAIR_NAME\
    ParameterKey=RemoteAccessCIDR,ParameterValue=$REMOTE_ACCESS_CIDR\
    ParameterKey=EnableX11Forwarding,ParameterValue=true\
  --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM

スタックの作成まで数分かかります。
スタックの出力する以下の値は、以降の手順で必要になるため、マネジメントコンソールで確認・取得しておいてください。

最後に、Redshiftスタックを作成します。以下のコマンドを実行します。

aws cloudformation create-stack\
  --stack-name $REDSHIFT_STACK_NAME\
  --template-body file://redshift-blog.template\
  --parameters\
    ParameterKey=ParentVPCStack,ParameterValue=$VPC_STACK_NAME\
    ParameterKey=ParentSSHBastionStack,ParameterValue=$BASTION_STACK_NAME\
    ParameterKey=MasterUserPassword,ParameterValue=$MASTER_USER_PASSWORD\
    ParameterKey=NotificationList,ParameterValue=$EMAIL_NOTIFICATION_LIST\
    ParameterKey=S3BucketForRedshiftIAMRole,ParameterValue=$S3_BUCKET_NAME\
    ParameterKey=GlueCatalogDatabase,ParameterValue=$GLUE_CATALOG_DB_NAME\
    ParameterKey=TagOwner,ParameterValue=$EMAIL_NOTIFICATION_LIST\
  --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM 

同様に、スタックの出力する以下の値は、以降の手順で必要になるため、マネジメントコンソールで確認・取得しておいてください。

以上でAWSのリソースが作成できました。次はGCPに移ります。

GCS経由でBigQueryのデータでS3に出力する

一般公開データセット

BigQueryでは、一般公開データセットと呼ばれる様々なデータがホストされており、
ユーザーは自身のデータと組み合わせて分析を行うことができます。

今回は、BigQueryの一般公開データセットの中から、Google Analytics Sampleのデータを使いたいと思います。

以下のようなクエリが実行できます。後ほど、Redshiftでも同様のクエリを投げてみたいと思います。

GCSバケットの作成

GCPのコンソールを開き、Cloud Shellを起動します。以下のコマンドはCloud Shellから実行します。
GCSのバケットを作成します。バケット名は適宜変更して実行してください。

GCS_BUCKET_NAME="techblog-bq-to-rs-test-1"
gsutil mb -l us-east1 gs://$GCS_BUCKET_NAME/

BigQueryからGCSにエクスポート

データをBigQueryからGCSに出力します。今回は bq extract CLIコマンドを使用します。
なお、出力データが1GBを超える場合は、ワイルドカード * を使用して、データを分割する必要があります。
Google Cloud ドキュメント:テーブルデータのエクスポート

bq --location=US extract --destination_format AVRO --compression SNAPPY 'bigquery-public-data:google_analytics_sample.ga_sessions_20170801' gs://$GCS_BUCKET_NAME/file*.avro

GCSからS3にデータをコピー

gsutilツールを利用して、GCSからS3にデータをコピーします。
Google Cloud ドキュメント:Cloud Storage の相互運用性
AWSマネジメントコンソールでSecret Managersを開き、Bastionスタックで作成した認証情報を取得します。

IAMユーザーの認証情報を ~/.boto に書き込みます。

[Credentials]
aws_access_key_id = XXXXXXXXXXXXXXXXXXXX
aws_secret_access_key = XXXXXXXXXXXXXXXXXXXXXXXXXXXX

GCSからS3にファイルをコピーします。
変数**S3_BUCKET_NAME**には、Bastionスタック作成時 BigQueryDataS3BucketName として出力されていたバケット名を入力してください。

S3_BUCKET_NAME="REPLACE_VALUE_WITH_YOUR_S3_BUCKET_NAME"
gsutil cp gs://$GCS_BUCKET_NAME/*.avro s3://$S3_BUCKET_NAME/

以上で、BigQueryのデータをS3に出力できました。
次にAWSに移って、Glueの設定を行っていきます。

Glueを使ってデータをRedshiftに読み込ませる

AWS Glueを使うことで、様々なデータソースと連携するETLジョブが作成できます。

まず、Glueのクローラを使い、S3データからデータカタログを作成します。

データのクロール

Connectionの設定

GlueからRedshiftへの接続を設定します。

ETLジョブの作成

最後にETLジョブを作成します。

なお、RedshiftはArray・Structのデータ型に対応していないため、ETLで型を変換してからRedshiftに読み込む必要があります。
Arrayはstring型で読み込まれているようです。
Arrayの中身をそれぞれ展開し、適切なデータ型を付与して格納したい場合、自身で変換を追加する必要があります。

今回、hits の列は除外し、customdimmensions のArrayの中身を展開し、long型の customdimensions.index とstring型のcustomdimensions.value としてRedshiftに格納したいと思います。
コード例は以下のようになります。修正箇所をハイライトしています。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "bq-test-db", table_name = "techblog_bastion_stack_bigquery_data_bucket_xxx", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "bq-test-db", table_name = "techblog_bastion_stack_bigquery_data_bucket_xxx", transformation_ctx = "datasource0")
## @type: ExplodeArray
## @args: []
## @return: explodedframe0
## @inputs: [frame = datasource0]
import pyspark.sql.functions as F
from awsglue.dynamicframe import DynamicFrame
df = datasource0.toDF()
df = df.withColumn("exploded_customdimensions", F.explode("customdimensions"))
df = df.drop("customdimensions")
df = df.withColumn("customdimensions.index", df.exploded_customdimensions.index)
df = df.withColumn("customdimensions.value", df.exploded_customdimensions.value)
explodedframe0 = DynamicFrame.fromDF(df, glueContext, "exploded")
## @type: ApplyMapping
## @args: [mappings = [("visitorid", "long", "visitorid", "long"), ("visitnumber", "long", "visitnumber", "long"), ("visitid", "long", "visitid", "long"), ("visitstarttime", "long", "visitstarttime", "long"), ("date", "string", "date", "string"), ("totals.visits", "long", "`totals.visits`", "long"), ("totals.hits", "long", "`totals.hits`", "long"), ("totals.pageviews", "long", "`totals.pageviews`", "long"), ("totals.timeOnSite", "long", "`totals.timeOnSite`", "long"), ("totals.bounces", "long", "`totals.bounces`", "long"), ("totals.transactions", "long", "`totals.transactions`", "long"), ("totals.transactionRevenue", "long", "`totals.transactionRevenue`", "long"), ("totals.newVisits", "long", "`totals.newVisits`", "long"), ("totals.screenviews", "long", "`totals.screenviews`", "long"), ("totals.uniqueScreenviews", "long", "`totals.uniqueScreenviews`", "long"), ("totals.timeOnScreen", "long", "`totals.timeOnScreen`", "long"), ("totals.totalTransactionRevenue", "long", "`totals.totalTransactionRevenue`", "long"), ("totals.sessionQualityDim", "long", "`totals.sessionQualityDim`", "long"), ("trafficsource.referralPath", "string", "`trafficsource.referralPath`", "string"), ("trafficsource.campaign", "string", "`trafficsource.campaign`", "string"), ("trafficsource.source", "string", "`trafficsource.source`", "string"), ("trafficsource.medium", "string", "`trafficsource.medium`", "string"), ("trafficsource.keyword", "string", "`trafficsource.keyword`", "string"), ("trafficsource.adContent", "string", "`trafficsource.adContent`", "string"), ("trafficsource.adwordsClickInfo.campaignId", "long", "`trafficsource.adwordsClickInfo.campaignId`", "long"), ("trafficsource.adwordsClickInfo.adGroupId", "long", "`trafficsource.adwordsClickInfo.adGroupId`", "long"), ("trafficsource.adwordsClickInfo.creativeId", "long", "`trafficsource.adwordsClickInfo.creativeId`", "long"), ("trafficsource.adwordsClickInfo.criteriaId", "long", "`trafficsource.adwordsClickInfo.criteriaId`", "long"), ("trafficsource.adwordsClickInfo.page", "long", "`trafficsource.adwordsClickInfo.page`", "long"), ("trafficsource.adwordsClickInfo.slot", "string", "`trafficsource.adwordsClickInfo.slot`", "string"), ("trafficsource.adwordsClickInfo.criteriaParameters", "string", "`trafficsource.adwordsClickInfo.criteriaParameters`", "string"), ("trafficsource.adwordsClickInfo.gclId", "string", "`trafficsource.adwordsClickInfo.gclId`", "string"), ("trafficsource.adwordsClickInfo.customerId", "long", "`trafficsource.adwordsClickInfo.customerId`", "long"), ("trafficsource.adwordsClickInfo.adNetworkType", "string", "`trafficsource.adwordsClickInfo.adNetworkType`", "string"), ("trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId", "long", "`trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId`", "long"), ("trafficsource.adwordsClickInfo.isVideoAd", "boolean", "`trafficsource.adwordsClickInfo.isVideoAd`", "boolean"), ("trafficsource.isTrueDirect", "boolean", "`trafficsource.isTrueDirect`", "boolean"), ("trafficsource.campaignCode", "string", "`trafficsource.campaignCode`", "string"), ("device.browser", "string", "`device.browser`", "string"), ("device.browserVersion", "string", "`device.browserVersion`", "string"), ("device.browserSize", "string", "`device.browserSize`", "string"), ("device.operatingSystem", "string", "`device.operatingSystem`", "string"), ("device.operatingSystemVersion", "string", "`device.operatingSystemVersion`", "string"), ("device.isMobile", "boolean", "`device.isMobile`", "boolean"), ("device.mobileDeviceBranding", "string", "`device.mobileDeviceBranding`", "string"), ("device.mobileDeviceModel", "string", "`device.mobileDeviceModel`", "string"), ("device.mobileInputSelector", "string", "`device.mobileInputSelector`", "string"), ("device.mobileDeviceInfo", "string", "`device.mobileDeviceInfo`", "string"), ("device.mobileDeviceMarketingName", "string", "`device.mobileDeviceMarketingName`", "string"), ("device.flashVersion", "string", "`device.flashVersion`", "string"), ("device.javaEnabled", "boolean", "`device.javaEnabled`", "boolean"), ("device.language", "string", "`device.language`", "string"), ("device.screenColors", "string", "`device.screenColors`", "string"), ("device.screenResolution", "string", "`device.screenResolution`", "string"), ("device.deviceCategory", "string", "`device.deviceCategory`", "string"), ("geonetwork.continent", "string", "`geonetwork.continent`", "string"), ("geonetwork.subContinent", "string", "`geonetwork.subContinent`", "string"), ("geonetwork.country", "string", "`geonetwork.country`", "string"), ("geonetwork.region", "string", "`geonetwork.region`", "string"), ("geonetwork.metro", "string", "`geonetwork.metro`", "string"), ("geonetwork.city", "string", "`geonetwork.city`", "string"), ("geonetwork.cityId", "string", "`geonetwork.cityId`", "string"), ("geonetwork.networkDomain", "string", "`geonetwork.networkDomain`", "string"), ("geonetwork.latitude", "string", "`geonetwork.latitude`", "string"), ("geonetwork.longitude", "string", "`geonetwork.longitude`", "string"), ("geonetwork.networkLocation", "string", "`geonetwork.networkLocation`", "string"), ("fullvisitorid", "string", "fullvisitorid", "string"), ("userid", "string", "userid", "string"), ("clientid", "string", "clientid", "string"), ("channelgrouping", "string", "channelgrouping", "string"), ("socialengagementtype", "string", "socialengagementtype", "string"), ("customdimensions.index", "long", "`customdimensions.index`", "long"), ("customdimensions.value", "string", "`customdimensions.value`", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = explodedframe0]
applymapping1 = ApplyMapping.apply(frame = explodedframe0, mappings = [("visitorid", "long", "visitorid", "long"), ("visitnumber", "long", "visitnumber", "long"), ("visitid", "long", "visitid", "long"), ("visitstarttime", "long", "visitstarttime", "long"), ("date", "string", "date", "string"), ("totals.visits", "long", "`totals.visits`", "long"), ("totals.hits", "long", "`totals.hits`", "long"), ("totals.pageviews", "long", "`totals.pageviews`", "long"), ("totals.timeOnSite", "long", "`totals.timeOnSite`", "long"), ("totals.bounces", "long", "`totals.bounces`", "long"), ("totals.transactions", "long", "`totals.transactions`", "long"), ("totals.transactionRevenue", "long", "`totals.transactionRevenue`", "long"), ("totals.newVisits", "long", "`totals.newVisits`", "long"), ("totals.screenviews", "long", "`totals.screenviews`", "long"), ("totals.uniqueScreenviews", "long", "`totals.uniqueScreenviews`", "long"), ("totals.timeOnScreen", "long", "`totals.timeOnScreen`", "long"), ("totals.totalTransactionRevenue", "long", "`totals.totalTransactionRevenue`", "long"), ("totals.sessionQualityDim", "long", "`totals.sessionQualityDim`", "long"), ("trafficsource.referralPath", "string", "`trafficsource.referralPath`", "string"), ("trafficsource.campaign", "string", "`trafficsource.campaign`", "string"), ("trafficsource.source", "string", "`trafficsource.source`", "string"), ("trafficsource.medium", "string", "`trafficsource.medium`", "string"), ("trafficsource.keyword", "string", "`trafficsource.keyword`", "string"), ("trafficsource.adContent", "string", "`trafficsource.adContent`", "string"), ("trafficsource.adwordsClickInfo.campaignId", "long", "`trafficsource.adwordsClickInfo.campaignId`", "long"), ("trafficsource.adwordsClickInfo.adGroupId", "long", "`trafficsource.adwordsClickInfo.adGroupId`", "long"), ("trafficsource.adwordsClickInfo.creativeId", "long", "`trafficsource.adwordsClickInfo.creativeId`", "long"), ("trafficsource.adwordsClickInfo.criteriaId", "long", "`trafficsource.adwordsClickInfo.criteriaId`", "long"), ("trafficsource.adwordsClickInfo.page", "long", "`trafficsource.adwordsClickInfo.page`", "long"), ("trafficsource.adwordsClickInfo.slot", "string", "`trafficsource.adwordsClickInfo.slot`", "string"), ("trafficsource.adwordsClickInfo.criteriaParameters", "string", "`trafficsource.adwordsClickInfo.criteriaParameters`", "string"), ("trafficsource.adwordsClickInfo.gclId", "string", "`trafficsource.adwordsClickInfo.gclId`", "string"), ("trafficsource.adwordsClickInfo.customerId", "long", "`trafficsource.adwordsClickInfo.customerId`", "long"), ("trafficsource.adwordsClickInfo.adNetworkType", "string", "`trafficsource.adwordsClickInfo.adNetworkType`", "string"), ("trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId", "long", "`trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId`", "long"), ("trafficsource.adwordsClickInfo.isVideoAd", "boolean", "`trafficsource.adwordsClickInfo.isVideoAd`", "boolean"), ("trafficsource.isTrueDirect", "boolean", "`trafficsource.isTrueDirect`", "boolean"), ("trafficsource.campaignCode", "string", "`trafficsource.campaignCode`", "string"), ("device.browser", "string", "`device.browser`", "string"), ("device.browserVersion", "string", "`device.browserVersion`", "string"), ("device.browserSize", "string", "`device.browserSize`", "string"), ("device.operatingSystem", "string", "`device.operatingSystem`", "string"), ("device.operatingSystemVersion", "string", "`device.operatingSystemVersion`", "string"), ("device.isMobile", "boolean", "`device.isMobile`", "boolean"), ("device.mobileDeviceBranding", "string", "`device.mobileDeviceBranding`", "string"), ("device.mobileDeviceModel", "string", "`device.mobileDeviceModel`", "string"), ("device.mobileInputSelector", "string", "`device.mobileInputSelector`", "string"), ("device.mobileDeviceInfo", "string", "`device.mobileDeviceInfo`", "string"), ("device.mobileDeviceMarketingName", "string", "`device.mobileDeviceMarketingName`", "string"), ("device.flashVersion", "string", "`device.flashVersion`", "string"), ("device.javaEnabled", "boolean", "`device.javaEnabled`", "boolean"), ("device.language", "string", "`device.language`", "string"), ("device.screenColors", "string", "`device.screenColors`", "string"), ("device.screenResolution", "string", "`device.screenResolution`", "string"), ("device.deviceCategory", "string", "`device.deviceCategory`", "string"), ("geonetwork.continent", "string", "`geonetwork.continent`", "string"), ("geonetwork.subContinent", "string", "`geonetwork.subContinent`", "string"), ("geonetwork.country", "string", "`geonetwork.country`", "string"), ("geonetwork.region", "string", "`geonetwork.region`", "string"), ("geonetwork.metro", "string", "`geonetwork.metro`", "string"), ("geonetwork.city", "string", "`geonetwork.city`", "string"), ("geonetwork.cityId", "string", "`geonetwork.cityId`", "string"), ("geonetwork.networkDomain", "string", "`geonetwork.networkDomain`", "string"), ("geonetwork.latitude", "string", "`geonetwork.latitude`", "string"), ("geonetwork.longitude", "string", "`geonetwork.longitude`", "string"), ("geonetwork.networkLocation", "string", "`geonetwork.networkLocation`", "string"), ("fullvisitorid", "string", "fullvisitorid", "string"), ("userid", "string", "userid", "string"), ("clientid", "string", "clientid", "string"), ("channelgrouping", "string", "channelgrouping", "string"), ("socialengagementtype", "string", "socialengagementtype", "string"),("customdimensions.index", "long", "`customdimensions.index`", "long"), ("customdimensions.value", "string", "`customdimensions.value`", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "techblog-redshift-connection", connection_options = {"dbtable": "techblog_bastion_stack_bigquery_data_bucket_xxx", "database": "rsdev01"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "techblog-redshift-connection", connection_options = {"dbtable": "techblog_bastion_stack_bigquery_data_bucket_xxx", "database": "rsdev01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()

ETLジョブが完了し、データがRedshiftに読み込まれました。

Redshiftに接続してクエリを投げてみる

Redshiftに接続し、クエリを投げてみたいと思います。
Bastionサーバにssh接続のコマンド、Redshiftへの接続コマンドは、それぞれBastionスタック・Redshiftスタックの出力で確認できます。

Bastionサーバにssh接続します。

ssh -i "your-keypair-name.pem" ec2-user@xxx.xxx.xxx.xxx

Redshiftに接続します。パスワードはスタック作成時に自身で設定したもの(MASTER_USER_PASSWORD)になります。

psql -h rsdev01-redshiftmain.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com -p 8200 -U rsadmin -d rsdev01

Redshiftにテーブルが作成されていることを確認します。

rsdev01-techblog-redshift-stack.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com:8200 rsadmin@rsdev01=# \d
                                  List of relations
 schema |                           name                           | type  |  owner
--------+----------------------------------------------------------+-------+---------
 public | techblog_bastion_stack_bigquery_data_bucket_xxxx| table | rsadmin
(1 row)
rsdev01-techblog-redshift-stack.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com:8200 rsadmin@rsdev01=# \d techblog_bastion_stack_bigquery_data_bucket_xxx
                Table "public.techblog_bastion_stack_bigquery_data_bucket_xx"
                             Column                              |           Type           | Modifiers
-----------------------------------------------------------------+--------------------------+-----------
 visitorid                                                       | bigint                   |
 visitnumber                                                     | bigint                   |
 visitid                                                         | bigint                   |
 visitstarttime                                                  | bigint                   |
 date                                                            | character varying(65535) |
 totals.visits                                                   | bigint                   |
 totals.hits                                                     | bigint                   |
 totals.pageviews                                                | bigint                   |
 totals.timeonsite                                               | bigint                   |
 totals.bounces                                                  | bigint                   |
 totals.transactions                                             | bigint                   |
 totals.transactionrevenue                                       | bigint                   |
 totals.newvisits                                                | bigint                   |
 totals.screenviews                                              | bigint                   |
 totals.uniquescreenviews                                        | bigint                   |
 totals.timeonscreen                                             | bigint                   |
 totals.totaltransactionrevenue                                  | bigint                   |
 totals.sessionqualitydim                                        | bigint                   |
 trafficsource.referralpath                                      | character varying(65535) |
 trafficsource.campaign                                          | character varying(65535) |
 trafficsource.source                                            | character varying(65535) |
 trafficsource.medium                                            | character varying(65535) |
 trafficsource.keyword                                           | character varying(65535) |
 trafficsource.adcontent                                         | character varying(65535) |
 trafficsource.adwordsclickinfo.campaignid                       | bigint                   |
 trafficsource.adwordsclickinfo.adgroupid                        | bigint                   |
 trafficsource.adwordsclickinfo.creativeid                       | bigint                   |
 trafficsource.adwordsclickinfo.criteriaid                       | bigint                   |
 trafficsource.adwordsclickinfo.page                             | bigint                   |
 trafficsource.adwordsclickinfo.slot                             | character varying(65535) |
 trafficsource.adwordsclickinfo.criteriaparameters               | character varying(65535) |
 trafficsource.adwordsclickinfo.gclid                            | character varying(65535) |
 trafficsource.adwordsclickinfo.customerid                       | bigint                   |
 trafficsource.adwordsclickinfo.adnetworktype                    | character varying(65535) |
 trafficsource.adwordsclickinfo.targetingcriteria.boomuserlistid | bigint                   |
 trafficsource.adwordsclickinfo.isvideoad                        | boolean                  |
 trafficsource.istruedirect                                      | boolean                  |
 trafficsource.campaigncode                                      | character varying(65535) |
 device.browser                                                  | character varying(65535) |
 device.browserversion                                           | character varying(65535) |
 device.browsersize                                              | character varying(65535) |
 device.operatingsystem                                          | character varying(65535) |
 device.operatingsystemversion                                   | character varying(65535) |
 device.ismobile                                                 | boolean                  |
 device.mobiledevicebranding                                     | character varying(65535) |
 device.mobiledevicemodel                                        | character varying(65535) |
 device.mobileinputselector                                      | character varying(65535) |
 device.mobiledeviceinfo                                         | character varying(65535) |
 device.mobiledevicemarketingname                                | character varying(65535) |
 device.flashversion                                             | character varying(65535) |
 device.javaenabled                                              | boolean                  |
 device.language                                                 | character varying(65535) |
 device.screencolors                                             | character varying(65535) |
 device.screenresolution                                         | character varying(65535) |
 device.devicecategory                                           | character varying(65535) |
 geonetwork.continent                                            | character varying(65535) |
 geonetwork.subcontinent                                         | character varying(65535) |
 geonetwork.country                                              | character varying(65535) |
 geonetwork.region                                               | character varying(65535) |
 geonetwork.metro                                                | character varying(65535) |
 geonetwork.city                                                 | character varying(65535) |
 geonetwork.cityid                                               | character varying(65535) |
 geonetwork.networkdomain                                        | character varying(65535) |
 geonetwork.latitude                                             | character varying(65535) |
 geonetwork.longitude                                            | character varying(65535) |
 geonetwork.networklocation                                      | character varying(65535) |
 fullvisitorid                                                   | character varying(65535) |
 userid                                                          | character varying(65535) |
 clientid                                                        | character varying(65535) |
 channelgrouping                                                 | character varying(65535) |
 socialengagementtype                                            | character varying(65535) |
 customdimensions.index                                          | bigint                   |
 customdimensions.value                                          | character varying(65535) |

BigQueryと同様のクエリを投げてみます。

SELECT
  "device.browser",
  SUM ( "totals.transactions" ) AS total_transactions
FROM
  techblog_bastion_stack_bigquery_data_bucket_xxx
GROUP BY
  "device.browser"
ORDER BY
  total_transactions DESC;

以下の通り、同じ実行結果が確認できました。

      device.browser      | total_transactions
--------------------------+--------------------
 Opera                    |
 Internet Explorer        |
 YaBrowser                |
 Safari (in-app)          |
 Mozilla Compatible Agent |
 Opera Mini               |
 Edge                     |
 UC Browser               |
 Android Browser          |
 Android Webview          |
 Nokia Browser            |
 Chrome                   |                 40
 Safari                   |                  3
 Firefox                  |                  1
(14 rows)

以上でBigQueryからRedshiftへの移行手順は終了です。

リソースの削除

削除が必要なリソースは以下の通りです。

  • GCSバケット (GCS_BUCKET_NAME)
  • S3バケット (S3_BUCKET_NAME )の中身(スタック削除時に中身が空である必要があります)
  • Redshiftスタック・Bastionスタック・VPCスタック(スタック間に依存関係があるため、この順番で一つずつ削除してください)

不要なコストを避けるため、検証後はAWS/GCPのマネジメントコンソールからリソースの削除をお願いします。

まとめ

この記事では、BigQueryからRedshiftにデータを移行する流れをご紹介しました。
少しでも参考になれば幸いです。

motchie

2017年4月、NHNテコラスに新卒入社。データサイエンスチームに所属し、AWSを活用したデータ分析サービスの設計開発を担当。 注力分野は自然言語処理、好きなAWSサービスはEMR、SageMaker。お洒落な音楽が好き。

Recommends

こちらもおすすめ

Special Topics

注目記事はこちら