Google BigQueryからAmazon Redshiftにデータを移行してみる
更新履歴
– 2020/1/8 記事内容の修正を行いました。
はじめに
こんにちは。データサイエンスチームのmotchieです。
データウェアハウス(DWH)を使うことで、大規模なデータに対する高速なクエリ処理が実現し、BIを初めとした様々なデータ活用が可能になります。
クラウドDWHといえば、Amazon RedshiftやGoogle BigQueryなどが有名です。
re:Invent2019では、Redshiftの新機能 Amazon Redshift RA3 ノードが発表されました。
RA3ノードはAWS Nitro Systemベースの次世代コンピュートインスタンスで、頻繁にアクセスされるデータはノード上のSSD、それ以外はS3へ自動で配置されるマネージドストレージを備えています。
RA3ノードによって、以下のように、Redshiftは大きな進歩を遂げました。
・従来のDS2ノードに比べて、同じコストで2倍のパフォーマンスと2倍のストレージ容量
・他のクラウドDWHに比べて、最大3倍の処理速度
・処理能力(インスタンス数)とストレージ(S3/SSD)の柔軟なスケーリング
更に、Advanced Query Accelerator (AQUA) for Amazon Redshift(※執筆現在プレビュー期間)によって、他のクラウドDWHに比べて最大10倍もの処理速度が実現するとのことです。
クラウドDWHは日々進歩を続けています。
さて、今回の記事では、クラウドDWH間でのデータ移行について紹介したいと思います。
RedshiftからBigQueryに移行するケース(AWS->GCP)では、BigQuery Data Transfer Service APIを使う方法がありますが、
BigQueryからRedshiftに移行する場合(GCP->AWS)については、あまり情報が見当たらないように思います。
そこでこの記事では、BigQueryからRedshiftにデータを移行する手順を1つご紹介します。
具体的には、BigQueryのデータをGoogle Cloud Storage(GCS)経由でAmazon S3に出力し、AWS GlueでETLを行い、Redshiftにデータを格納してみたいと思います。
なお、この記事では、ゼロからRedshiftクラスターを立ち上げ、BigQueryで公開されているデータセットを使って移行を行うため、
事前に稼働中のRedshiftクラスターや自前のBigQueryデータがない場合でもお試しいただけます。
リソースの全体像
流れ
- CloudFormationテンプレートでRedshiftクラスターを作成する
- BigQueryの一般公開データセットを、GCS経由でS3に出力する
- S3のデータにGlueでETLを行い、Redshiftにデータを格納する
- Redshiftに接続して、クエリを投げてみる
- リソースの削除
前提条件として、AWS・GCP上でリソースにアクセスできる十分な権限が必要になります。
1. CloudFormationテンプレートでRedshiftクラスターを作成する
以下のAWSブログのAWS CloudFormationテンプレートをベースに、Redshiftクラスターを作成していきたいと思います。
Amazon Web Services ブログ:AWS CloudFormation を使用して Amazon Redshift クラスターの作成を自動化する
上記のテンプレートを活用することで、ベストプラクティスに沿って設計されたRedshiftクラスターが一発で作成できます。
設計とテンプレートの詳細については、上記ブログ記事をご確認ください。
1.1 テンプレートの取得
上記ブログ記事のCloudFormationテンプレートを取得します。
・ aws-vpc-blog.template
・ linux-bastion-blog.templat
・ redshift-blog.template
aws s3 cp s3://aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/aws-vpc-blog.template ./ aws s3 cp s3://aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/linux-bastion-blog.template ./ aws s3 cp s3://aws-bigdata-blog/artifacts/Automate_Redshift_Cluster_Creation_CloudFormation/redshift-blog.template ./
1.2 テンプレートにリソースを追加
今回、BigQueryのデータをRedshiftに移行する際に、追加のリソースが必要になります。
・GCSからS3へデータをコピーするための、IAMユーザー認証情報とS3バケット
・GlueからRedshiftへアクセスするためのネットワーク設定
テンプレートをカスタマイズして、リソースを追加してからデプロイしたいと思います。
1.2.1 linux-bastion-blog.template
まず、linux-bastion-blog.template
(Bastionスタック用テンプレート)を修正していきます。
修正後のテンプレートは以下で確認できます。
追加したリソースを赤色でハイライトしています。内容は以下の通りです。
・ データ格納用S3バケット
・ gsutilコマンド用のIAMユーザー・認証情報
・ 認証情報をセキュアに保存する用のAWS Secrets Manager
テンプレートの修正箇所をハイライトしています。
- Name: AutoScalingGroupName Value: !Ref BastionAutoScalingGroup # --- BigQueryDataS3Bucket: Type: 'AWS::S3::Bucket' DeletionPolicy: Delete Properties: AccessControl: Private BucketName: !Join [ "-", [ !Ref 'AWS::StackName', "bigquery-data-bucket", !Ref 'AWS::AccountId' ] ] S3UserForBoto: Type: AWS::IAM::User Properties: Policies: - PolicyDocument: Version: 2012-10-17 Statement: - Action: - 's3:PutObject' Effect: Allow Resource: !Join - "/" - - !GetAtt BigQueryDataS3Bucket.Arn - "*" PolicyName: s3-put-policy-from-gcs-for-boto UserName: !Join [ "-", [ !Ref 'AWS::StackName', "s3-user-for-boto", !Ref 'AWS::Region' ] ] AccessKeyForBoto: Type: AWS::IAM::AccessKey Properties: UserName: !Ref S3UserForBoto SecretsForBoto: Type: AWS::SecretsManager::Secret Properties: Name: !Sub "secrets-for-boto-user-${AWS::StackName}" SecretString: !Sub '{"aws_access_key_id": "${AccessKeyForBoto}", "aws_secret_access_key": "${AccessKeyForBoto.SecretAccessKey}"}' # --- Outputs:
Redshiftスタックでデータ格納用S3バケット名が必要なため、以下の修正によって値を出力しています。
BastionSecurityGroupID: Description: Use this Security Group to reference incoming traffic from the SSH bastion host/instance Value: !Ref BastionSecurityGroup Export: Name: !Sub '${AWS::StackName}-BastionSecurityGroupID' # --- BigQueryDataS3BucketName: Description: Bucket name will be imported in redshift cluter stack Value: !Ref BigQueryDataS3Bucket Export: Name: !Sub '${AWS::StackName}-BigQueryDataS3BucketName' # ---
1.2.2 redshift-blog.template
次に、redshift-blog.template
(Redshiftスタック用テンプレート) を修正していきます。
修正後のテンプレートは以下で確認できます。
追加したネットワーク設定が用いられる経路を赤色でハイライトしています。
GlueからVPC内のRedshiftにアクセスする際、ネットワークインターフェース(ENI)が必要になります。
AWS ドキュメント:VPC の JDBC データストアに接続する
Redshift・Glueが利用するセキュリティグループに対して、すべてのTCP ポートに対する自己参照のインバウンドルールを追加しています。なお、ENI本体は、GlueからRedshiftへの接続を作成し、ETLジョブを実行する際に作成されます。
テンプレートの修正箇所をハイライトしています。
- Key: Compliance Value: !Ref TagCompliance # --- RedshiftSecurityGroupIngress: Type: 'AWS::EC2::SecurityGroupIngress' Properties: IpProtocol: "-1" SourceSecurityGroupId: !Ref RedshiftSecurityGroup GroupId: !Ref RedshiftSecurityGroup MyGlueIAMRole: Type: 'AWS::IAM::Role' Properties: RoleName: !Sub "AWSGlueServiceRole-${AWS::StackName}" AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-Glue-Read-S3-Policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - s3:GetBucketLocation - s3:GetObject - s3:ListMultipartUploadParts - s3:ListBucket - s3:ListBucketMultipartUploads Resource: - !Join - "" - - "arn:aws:s3:::" - {'Fn::ImportValue': !Sub '${ParentSSHBastionStack}-BigQueryDataS3BucketName'} - "/*" # --- RedshiftLoggingS3Bucket:
テンプレートの修正は以上です。
それでは、各スタックをデプロイしていきます。
1.3 スタックの作成
VPCスタック、Bastionスタック、Redshiftスタックの順で作成します。
まずパラメータをまとめて作成しておきます。
スタック名などは同じで問題ないですが、EMAIL_NOTIFICATION_LIST
と KEY_PAIR_NAME
(事前にEC2キーペアの作成が必要です)は自身の値に変更してください。
また、MASTER_USER_PASSWORD
の値は、Glueの接続設定とRedshiftへアクセスする際に必要になります。自身で設定した値を失念しないよう、気を付けてください。
# 自身の値に置き換えて実行してください EMAIL_NOTIFICATION_LIST="YOUR_EMAIL_ADDRESS" KEY_PAIR_NAME="YOUR_EC2_KEY_PAIR_NAME" # VPCスタック用パラメータ VPC_STACK_NAME=techblog-vpc-stack # Redshiftスタック用パラメータ REDSHIFT_STACK_NAME=techblog-redshift-stack MASTER_USER_PASSWORD="TestPass2019" GLUE_CATALOG_DB_NAME="bq-test-db" # Bastionスタック用パラメータ BASTION_STACK_NAME=techblog-bastion-stack # 踏み台サーバへのSSH接続元を制限するため、自身のIPAddressを取得、CIDR表記で指定 REMOTE_ACCESS_CIDR="$(dig +short myip.opendns.com @resolver1.opendns.com)/32"
VPCスタックを作成します。完了まで数分かかります。
aws cloudformation create-stack \ --stack-name $VPC_STACK_NAME \ --template-body file://aws-vpc-blog.template \ --parameters \ ParameterKey=ClassB,ParameterValue=0 \ --capabilities CAPABILITY_IAM
マネジメントコンソールでCloudFormationを開き、スタック作成の完了を確認してから、Bastionスタックの作成に移ってください。
Bastionスタックを作成します。以下のコマンドを実行します。
aws cloudformation create-stack\ --stack-name $BASTION_STACK_NAME\ --template-body file://linux-bastion-blog.template\ --parameters\ ParameterKey=ParentVPCStack,ParameterValue=$VPC_STACK_NAME\ ParameterKey=NotificationList,ParameterValue=$EMAIL_NOTIFICATION_LIST\ ParameterKey=KeyPairName,ParameterValue=$KEY_PAIR_NAME\ ParameterKey=RemoteAccessCIDR,ParameterValue=$REMOTE_ACCESS_CIDR\ ParameterKey=EnableX11Forwarding,ParameterValue=true\ --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM
スタックの作成まで数分かかります。
スタックの出力する以下の値は、以降の手順で必要になるため、マネジメントコンソールで確認・取得しておいてください。
最後に、Redshiftスタックを作成します。以下のコマンドを実行します。
aws cloudformation create-stack\ --stack-name $REDSHIFT_STACK_NAME\ --template-body file://redshift-blog.template\ --parameters\ ParameterKey=ParentVPCStack,ParameterValue=$VPC_STACK_NAME\ ParameterKey=ParentSSHBastionStack,ParameterValue=$BASTION_STACK_NAME\ ParameterKey=MasterUserPassword,ParameterValue=$MASTER_USER_PASSWORD\ ParameterKey=NotificationList,ParameterValue=$EMAIL_NOTIFICATION_LIST\ ParameterKey=S3BucketForRedshiftIAMRole,ParameterValue=$S3_BUCKET_NAME\ ParameterKey=GlueCatalogDatabase,ParameterValue=$GLUE_CATALOG_DB_NAME\ ParameterKey=TagOwner,ParameterValue=$EMAIL_NOTIFICATION_LIST\ --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM
同様に、スタックの出力する以下の値は、以降の手順で必要になるため、マネジメントコンソールで確認・取得しておいてください。
以上でAWSのリソースが作成できました。作成したリソースを赤色でハイライトしています。Glue関連のリソースは後ほど手順3で作成します。
2. BigQueryの一般公開データセットを、GCS経由でS3に出力する
次はGCPに移ってバケットの作成、データの移行を行っていきます。下の構成図において、赤色でハイライトした部分が関連するリソースになります。
2.1 一般公開データセット
BigQueryでは、一般公開データセットと呼ばれる様々なデータがあらかじめ用意されており、
ユーザーは自身のデータと組み合わせて分析を行うことができます。
今回は、BigQueryの一般公開データセットの中から、Google Analytics Sampleのデータを使いたいと思います。
以下のようなクエリが実行できます。後ほど、Redshiftでも同様のクエリを投げてみたいと思います。
SELECT device.browser, SUM ( totals.transactions ) AS total_transactions FROM `bigquery-public-data.google_analytics_sample.ga_sessions_20170801` GROUP BY device.browser ORDER BY total_transactions DESC
2.2 GCSバケットの作成
GCPのコンソールを開き、Cloud Shellを起動します。以下のコマンドはCloud Shellから実行します。
GCSのバケットを作成します。バケット名は適宜変更して実行してください。
GCS_BUCKET_NAME="techblog-bq-to-rs-test-1" gsutil mb -l us-east1 gs://$GCS_BUCKET_NAME/
Creating gs://techblog-bq-to-rs-test-1/…
2.3 BigQueryからGCSにエクスポート
データをBigQueryからGCSに出力します。今回は bq extract コマンドを使用します。
なお、出力データが1GBを超える場合は、ワイルドカード *
を使用して、データを分割する必要があることに注意してください。
テーブルデータのエクスポート:エクスポートの制限事項
bq --location=US extract --destination_format AVRO --compression SNAPPY 'bigquery-public-data:google_analytics_sample.ga_sessions_20170801' gs://$GCS_BUCKET_NAME/file*.avro
Waiting on bqjob_r53a25b3547d65fb_0000016f7a31fa56_1 … (2s) Current status: DONE
2.4 GCSからS3にデータをコピー
gsutilツールを利用して、GCSからS3にデータをコピーすることが出来ます。
そのためには、S3にアクセスできるIAMユーザー認証情報を.boto構成ファイル(~/.boto
)に追加する必要があります。詳細については、以下のドキュメントをご覧ください。
Cloud Storage の相互運用性
gsutil config: Additional Configuration-Controllable Features
AWSマネジメントコンソールでSecret Managersを開き、先ほどBastionスタック内で作成したIAMユーザー認証情報を取得します。
Cloud Shellに戻って、お好きなエディターで.boto構成ファイル(~/.boto
)を開き、IAMユーザーの認証情報を書き込みます。
なお、S3バケットは東京リージョン(ap-northeast-1
)での作成を想定し、s3_host
の値を設定しています。他のリージョンにおけるエンドポイントの値については、以下をご確認ください。
Amazon Simple Storage Service エンドポイントとクォータ
[Credentials] aws_access_key_id = XXXXXXXXXXXXXXXXXXXX aws_secret_access_key = XXXXXXXXXXXXXXXXXXXXXXXXXXXX s3_host = s3.ap-northeast-1.amazonaws.com
gsutilを使って、GCSからS3にファイルをコピーします。
変数 S3_BUCKET_NAME
には、Bastionスタック作成時 BigQueryDataS3BucketName
として出力されていたバケット名を入力してください。AWSマネジメントコンソール > CloudFormation > スタック > 自身のBastionスタック名 > 出力
のタブで確認できます。
S3_BUCKET_NAME="REPLACE_VALUE_WITH_YOUR_S3_BUCKET_NAME" gsutil cp gs://$GCS_BUCKET_NAME/*.avro s3://$S3_BUCKET_NAME/
Copying gs://techblog-bq-to-rs-test-1/file000000000000.avro [Content-Type=application/octet-stream]…
– [1 files][ 2.8 MiB/ 2.8 MiB]
Operation completed over 1 objects/2.8 MiB.
以上で、BigQueryのデータをS3に出力できました。
次は再びAWSのマネジメントコンソールに戻って、Glueの設定を行っていきます。
3. S3のデータにGlueでETLを行い、Redshiftにデータを格納する
AWS Glueは、データの抽出、変換、ロードという、ETL処理をスムーズに行うためのマネージドサービスです。
今回、Glueを使って、S3のデータからデータカタログを構築、Redshiftへの接続を設定、ETLジョブを作成、実行し、S3のデータを変換してRedshiftにロードしてみたいと思います。
以下の構成図において、関連するリソースを赤色でハイライトしています。
3.1 データカタログの構築
S3にコピーされたBigQuery一般公開データセットをGlueクローラでクロールし、データカタログを構築します。
AWSのマネジメントコンソールから AWS Glue > クローラ
を開きます。
以下の画像の手順で、クローラの設定を行っていきます。
クローラの実行は1分ほどで完了します。
構築されたデータカタログはテーブルとして確認できます。
3.2 Connectionの設定
次に、GlueからRedshiftへの接続(Connection)を設定します。
接続の設定が上手くいっているか、テストしてみます。
無事に接続の設定が完了しました。
3.3 ETLジョブの作成
最後にGlueジョブを作成し、S3からRedshiftへデータのETLを行います。
データソースの選択では、先ほどGlueクローラを使って構築したデータカタログのテーブルを選択します。
なお、RedshiftはArray・Structのデータ型に対応していないため、ETLで型を変換してからRedshiftに読み込む必要があります。
AWSドキュメント:Amazon Redshift:サポートされていない PostgreSQL データ型
下の画像の通り、デフォルトでArrayはstring型へ変換されているようです。
Arrayの中身を展開し、適切なデータ型を付与して格納したい場合、自身で変換を追加する必要があります。
今回、hits
の列は除外し、customdimmensions
のArrayの中身を展開し、long型の customdimensions.index
とstring型のcustomdimensions.value
としてRedshiftに格納してみたいと思います。
コード例は以下のようになります。GlueのDynamicFrameからSparkのDataFrameに一旦変換し、Sparkのexplode関数でArrayの中身を展開した後、GlueのDynamicFrameに戻しています。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [TempDir, JOB_NAME] args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "bq-test-db", table_name = "techblog_bastion_stack_bigquery_data_bucket_xxx", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "bq-test-db", table_name = "techblog_bastion_stack_bigquery_data_bucket_xxx", transformation_ctx = "datasource0") ## @type: ExplodeArray ## @args: [] ## @return: explodedframe0 ## @inputs: [frame = datasource0] import pyspark.sql.functions as F from awsglue.dynamicframe import DynamicFrame df = datasource0.toDF() df = df.withColumn("exploded_customdimensions", F.explode("customdimensions")) df = df.drop("customdimensions") df = df.withColumn("customdimensions.index", df.exploded_customdimensions.index) df = df.withColumn("customdimensions.value", df.exploded_customdimensions.value) explodedframe0 = DynamicFrame.fromDF(df, glueContext, "exploded") ## @type: ApplyMapping ## @args: [mappings = [("visitorid", "long", "visitorid", "long"), ("visitnumber", "long", "visitnumber", "long"), ("visitid", "long", "visitid", "long"), ("visitstarttime", "long", "visitstarttime", "long"), ("date", "string", "date", "string"), ("totals.visits", "long", "`totals.visits`", "long"), ("totals.hits", "long", "`totals.hits`", "long"), ("totals.pageviews", "long", "`totals.pageviews`", "long"), ("totals.timeOnSite", "long", "`totals.timeOnSite`", "long"), ("totals.bounces", "long", "`totals.bounces`", "long"), ("totals.transactions", "long", "`totals.transactions`", "long"), ("totals.transactionRevenue", "long", "`totals.transactionRevenue`", "long"), ("totals.newVisits", "long", "`totals.newVisits`", "long"), ("totals.screenviews", "long", "`totals.screenviews`", "long"), ("totals.uniqueScreenviews", "long", "`totals.uniqueScreenviews`", "long"), ("totals.timeOnScreen", "long", "`totals.timeOnScreen`", "long"), ("totals.totalTransactionRevenue", "long", "`totals.totalTransactionRevenue`", "long"), ("totals.sessionQualityDim", "long", "`totals.sessionQualityDim`", "long"), ("trafficsource.referralPath", "string", "`trafficsource.referralPath`", "string"), ("trafficsource.campaign", "string", "`trafficsource.campaign`", "string"), ("trafficsource.source", "string", "`trafficsource.source`", "string"), ("trafficsource.medium", "string", "`trafficsource.medium`", "string"), ("trafficsource.keyword", "string", "`trafficsource.keyword`", "string"), ("trafficsource.adContent", "string", "`trafficsource.adContent`", "string"), ("trafficsource.adwordsClickInfo.campaignId", "long", "`trafficsource.adwordsClickInfo.campaignId`", "long"), ("trafficsource.adwordsClickInfo.adGroupId", "long", "`trafficsource.adwordsClickInfo.adGroupId`", "long"), ("trafficsource.adwordsClickInfo.creativeId", "long", "`trafficsource.adwordsClickInfo.creativeId`", "long"), ("trafficsource.adwordsClickInfo.criteriaId", "long", "`trafficsource.adwordsClickInfo.criteriaId`", "long"), ("trafficsource.adwordsClickInfo.page", "long", "`trafficsource.adwordsClickInfo.page`", "long"), ("trafficsource.adwordsClickInfo.slot", "string", "`trafficsource.adwordsClickInfo.slot`", "string"), ("trafficsource.adwordsClickInfo.criteriaParameters", "string", "`trafficsource.adwordsClickInfo.criteriaParameters`", "string"), ("trafficsource.adwordsClickInfo.gclId", "string", "`trafficsource.adwordsClickInfo.gclId`", "string"), ("trafficsource.adwordsClickInfo.customerId", "long", "`trafficsource.adwordsClickInfo.customerId`", "long"), ("trafficsource.adwordsClickInfo.adNetworkType", "string", "`trafficsource.adwordsClickInfo.adNetworkType`", "string"), ("trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId", "long", "`trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId`", "long"), ("trafficsource.adwordsClickInfo.isVideoAd", "boolean", "`trafficsource.adwordsClickInfo.isVideoAd`", "boolean"), ("trafficsource.isTrueDirect", "boolean", "`trafficsource.isTrueDirect`", "boolean"), ("trafficsource.campaignCode", "string", "`trafficsource.campaignCode`", "string"), ("device.browser", "string", "`device.browser`", "string"), ("device.browserVersion", "string", "`device.browserVersion`", "string"), ("device.browserSize", "string", "`device.browserSize`", "string"), ("device.operatingSystem", "string", "`device.operatingSystem`", "string"), ("device.operatingSystemVersion", "string", "`device.operatingSystemVersion`", "string"), ("device.isMobile", "boolean", "`device.isMobile`", "boolean"), ("device.mobileDeviceBranding", "string", "`device.mobileDeviceBranding`", "string"), ("device.mobileDeviceModel", "string", "`device.mobileDeviceModel`", "string"), ("device.mobileInputSelector", "string", "`device.mobileInputSelector`", "string"), ("device.mobileDeviceInfo", "string", "`device.mobileDeviceInfo`", "string"), ("device.mobileDeviceMarketingName", "string", "`device.mobileDeviceMarketingName`", "string"), ("device.flashVersion", "string", "`device.flashVersion`", "string"), ("device.javaEnabled", "boolean", "`device.javaEnabled`", "boolean"), ("device.language", "string", "`device.language`", "string"), ("device.screenColors", "string", "`device.screenColors`", "string"), ("device.screenResolution", "string", "`device.screenResolution`", "string"), ("device.deviceCategory", "string", "`device.deviceCategory`", "string"), ("geonetwork.continent", "string", "`geonetwork.continent`", "string"), ("geonetwork.subContinent", "string", "`geonetwork.subContinent`", "string"), ("geonetwork.country", "string", "`geonetwork.country`", "string"), ("geonetwork.region", "string", "`geonetwork.region`", "string"), ("geonetwork.metro", "string", "`geonetwork.metro`", "string"), ("geonetwork.city", "string", "`geonetwork.city`", "string"), ("geonetwork.cityId", "string", "`geonetwork.cityId`", "string"), ("geonetwork.networkDomain", "string", "`geonetwork.networkDomain`", "string"), ("geonetwork.latitude", "string", "`geonetwork.latitude`", "string"), ("geonetwork.longitude", "string", "`geonetwork.longitude`", "string"), ("geonetwork.networkLocation", "string", "`geonetwork.networkLocation`", "string"), ("fullvisitorid", "string", "fullvisitorid", "string"), ("userid", "string", "userid", "string"), ("clientid", "string", "clientid", "string"), ("channelgrouping", "string", "channelgrouping", "string"), ("socialengagementtype", "string", "socialengagementtype", "string"), ("customdimensions.index", "long", "`customdimensions.index`", "long"), ("customdimensions.value", "string", "`customdimensions.value`", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = explodedframe0] applymapping1 = ApplyMapping.apply(frame = explodedframe0, mappings = [("visitorid", "long", "visitorid", "long"), ("visitnumber", "long", "visitnumber", "long"), ("visitid", "long", "visitid", "long"), ("visitstarttime", "long", "visitstarttime", "long"), ("date", "string", "date", "string"), ("totals.visits", "long", "`totals.visits`", "long"), ("totals.hits", "long", "`totals.hits`", "long"), ("totals.pageviews", "long", "`totals.pageviews`", "long"), ("totals.timeOnSite", "long", "`totals.timeOnSite`", "long"), ("totals.bounces", "long", "`totals.bounces`", "long"), ("totals.transactions", "long", "`totals.transactions`", "long"), ("totals.transactionRevenue", "long", "`totals.transactionRevenue`", "long"), ("totals.newVisits", "long", "`totals.newVisits`", "long"), ("totals.screenviews", "long", "`totals.screenviews`", "long"), ("totals.uniqueScreenviews", "long", "`totals.uniqueScreenviews`", "long"), ("totals.timeOnScreen", "long", "`totals.timeOnScreen`", "long"), ("totals.totalTransactionRevenue", "long", "`totals.totalTransactionRevenue`", "long"), ("totals.sessionQualityDim", "long", "`totals.sessionQualityDim`", "long"), ("trafficsource.referralPath", "string", "`trafficsource.referralPath`", "string"), ("trafficsource.campaign", "string", "`trafficsource.campaign`", "string"), ("trafficsource.source", "string", "`trafficsource.source`", "string"), ("trafficsource.medium", "string", "`trafficsource.medium`", "string"), ("trafficsource.keyword", "string", "`trafficsource.keyword`", "string"), ("trafficsource.adContent", "string", "`trafficsource.adContent`", "string"), ("trafficsource.adwordsClickInfo.campaignId", "long", "`trafficsource.adwordsClickInfo.campaignId`", "long"), ("trafficsource.adwordsClickInfo.adGroupId", "long", "`trafficsource.adwordsClickInfo.adGroupId`", "long"), ("trafficsource.adwordsClickInfo.creativeId", "long", "`trafficsource.adwordsClickInfo.creativeId`", "long"), ("trafficsource.adwordsClickInfo.criteriaId", "long", "`trafficsource.adwordsClickInfo.criteriaId`", "long"), ("trafficsource.adwordsClickInfo.page", "long", "`trafficsource.adwordsClickInfo.page`", "long"), ("trafficsource.adwordsClickInfo.slot", "string", "`trafficsource.adwordsClickInfo.slot`", "string"), ("trafficsource.adwordsClickInfo.criteriaParameters", "string", "`trafficsource.adwordsClickInfo.criteriaParameters`", "string"), ("trafficsource.adwordsClickInfo.gclId", "string", "`trafficsource.adwordsClickInfo.gclId`", "string"), ("trafficsource.adwordsClickInfo.customerId", "long", "`trafficsource.adwordsClickInfo.customerId`", "long"), ("trafficsource.adwordsClickInfo.adNetworkType", "string", "`trafficsource.adwordsClickInfo.adNetworkType`", "string"), ("trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId", "long", "`trafficsource.adwordsClickInfo.targetingCriteria.boomUserlistId`", "long"), ("trafficsource.adwordsClickInfo.isVideoAd", "boolean", "`trafficsource.adwordsClickInfo.isVideoAd`", "boolean"), ("trafficsource.isTrueDirect", "boolean", "`trafficsource.isTrueDirect`", "boolean"), ("trafficsource.campaignCode", "string", "`trafficsource.campaignCode`", "string"), ("device.browser", "string", "`device.browser`", "string"), ("device.browserVersion", "string", "`device.browserVersion`", "string"), ("device.browserSize", "string", "`device.browserSize`", "string"), ("device.operatingSystem", "string", "`device.operatingSystem`", "string"), ("device.operatingSystemVersion", "string", "`device.operatingSystemVersion`", "string"), ("device.isMobile", "boolean", "`device.isMobile`", "boolean"), ("device.mobileDeviceBranding", "string", "`device.mobileDeviceBranding`", "string"), ("device.mobileDeviceModel", "string", "`device.mobileDeviceModel`", "string"), ("device.mobileInputSelector", "string", "`device.mobileInputSelector`", "string"), ("device.mobileDeviceInfo", "string", "`device.mobileDeviceInfo`", "string"), ("device.mobileDeviceMarketingName", "string", "`device.mobileDeviceMarketingName`", "string"), ("device.flashVersion", "string", "`device.flashVersion`", "string"), ("device.javaEnabled", "boolean", "`device.javaEnabled`", "boolean"), ("device.language", "string", "`device.language`", "string"), ("device.screenColors", "string", "`device.screenColors`", "string"), ("device.screenResolution", "string", "`device.screenResolution`", "string"), ("device.deviceCategory", "string", "`device.deviceCategory`", "string"), ("geonetwork.continent", "string", "`geonetwork.continent`", "string"), ("geonetwork.subContinent", "string", "`geonetwork.subContinent`", "string"), ("geonetwork.country", "string", "`geonetwork.country`", "string"), ("geonetwork.region", "string", "`geonetwork.region`", "string"), ("geonetwork.metro", "string", "`geonetwork.metro`", "string"), ("geonetwork.city", "string", "`geonetwork.city`", "string"), ("geonetwork.cityId", "string", "`geonetwork.cityId`", "string"), ("geonetwork.networkDomain", "string", "`geonetwork.networkDomain`", "string"), ("geonetwork.latitude", "string", "`geonetwork.latitude`", "string"), ("geonetwork.longitude", "string", "`geonetwork.longitude`", "string"), ("geonetwork.networkLocation", "string", "`geonetwork.networkLocation`", "string"), ("fullvisitorid", "string", "fullvisitorid", "string"), ("userid", "string", "userid", "string"), ("clientid", "string", "clientid", "string"), ("channelgrouping", "string", "channelgrouping", "string"), ("socialengagementtype", "string", "socialengagementtype", "string"),("customdimensions.index", "long", "`customdimensions.index`", "long"), ("customdimensions.value", "string", "`customdimensions.value`", "string")], transformation_ctx = "applymapping1") ## @type: ResolveChoice ## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"] ## @return: resolvechoice2 ## @inputs: [frame = applymapping1] resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2") ## @type: DropNullFields ## @args: [transformation_ctx = "dropnullfields3"] ## @return: dropnullfields3 ## @inputs: [frame = resolvechoice2] dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") ## @type: DataSink ## @args: [catalog_connection = "techblog-redshift-connection", connection_options = {"dbtable": "techblog_bastion_stack_bigquery_data_bucket_xxx", "database": "rsdev01"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"] ## @return: datasink4 ## @inputs: [frame = dropnullfields3] datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "techblog-redshift-connection", connection_options = {"dbtable": "techblog_bastion_stack_bigquery_data_bucket_xxx", "database": "rsdev01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4") job.commit()
修正箇所をハイライトしています。修正箇所以外の部分には、テータカタログのテーブル名、GlueのConnection名、RedshiftのDB名など、環境ごとに異なる値が含まれているため、修正箇所以外も含めてコピペするとエラーになる恐れがあります。
ハイライトされた行の内容が同じになるよう修正後、 保存
-> ダイヤグラムの生成
を実行すると、画面左側のタブで ExplodeArray
の変換ステップが追加されます。
ジョブの実行
より、ETLジョブを実行します。
ジョブ実行から数分ほどで実行ステータスがSucceededに変わり、ジョブが完了し、データがRedshiftに読み込まれます。
4. Redshiftに接続してクエリを投げてみる
Redshiftに接続し、クエリを投げてみたいと思います。
Bastionサーバにssh接続するコマンド、Redshiftの接続コマンド/DB名/ユーザー名は、それぞれBastionスタック・Redshiftスタックの出力で確認できます。
Bastionサーバにssh接続します。
ssh -i "your-keypair-name.pem" ec2-user@xxx.xxx.xxx.xxx
Redshiftに接続します。パスワードはスタック作成時に自身で設定したもの(MASTER_USER_PASSWORD
変数で指定した値)になります。
psql -h rsdev01-redshiftmain.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com -p 8200 -U rsadmin -d rsdev01
Redshiftにテーブルが作成されていることを確認します。
rsdev01-techblog-redshift-stack.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com:8200 rsadmin@rsdev01=# \d List of relations schema | name | type | owner --------+----------------------------------------------------------+-------+--------- public | techblog_bastion_stack_bigquery_data_bucket_xxxx| table | rsadmin (1 row)
rsdev01-techblog-redshift-stack.cg35ndlm2xxz.ap-northeast-1.redshift.amazonaws.com:8200 rsadmin@rsdev01=# \d techblog_bastion_stack_bigquery_data_bucket_xxx Table "public.techblog_bastion_stack_bigquery_data_bucket_xx" Column | Type | Modifiers -----------------------------------------------------------------+--------------------------+----------- visitorid | bigint | visitnumber | bigint | visitid | bigint | visitstarttime | bigint | date | character varying(65535) | totals.visits | bigint | totals.hits | bigint | totals.pageviews | bigint | totals.timeonsite | bigint | totals.bounces | bigint | totals.transactions | bigint | totals.transactionrevenue | bigint | totals.newvisits | bigint | totals.screenviews | bigint | totals.uniquescreenviews | bigint | totals.timeonscreen | bigint | totals.totaltransactionrevenue | bigint | totals.sessionqualitydim | bigint | trafficsource.referralpath | character varying(65535) | trafficsource.campaign | character varying(65535) | trafficsource.source | character varying(65535) | trafficsource.medium | character varying(65535) | trafficsource.keyword | character varying(65535) | trafficsource.adcontent | character varying(65535) | trafficsource.adwordsclickinfo.campaignid | bigint | trafficsource.adwordsclickinfo.adgroupid | bigint | trafficsource.adwordsclickinfo.creativeid | bigint | trafficsource.adwordsclickinfo.criteriaid | bigint | trafficsource.adwordsclickinfo.page | bigint | trafficsource.adwordsclickinfo.slot | character varying(65535) | trafficsource.adwordsclickinfo.criteriaparameters | character varying(65535) | trafficsource.adwordsclickinfo.gclid | character varying(65535) | trafficsource.adwordsclickinfo.customerid | bigint | trafficsource.adwordsclickinfo.adnetworktype | character varying(65535) | trafficsource.adwordsclickinfo.targetingcriteria.boomuserlistid | bigint | trafficsource.adwordsclickinfo.isvideoad | boolean | trafficsource.istruedirect | boolean | trafficsource.campaigncode | character varying(65535) | device.browser | character varying(65535) | device.browserversion | character varying(65535) | device.browsersize | character varying(65535) | device.operatingsystem | character varying(65535) | device.operatingsystemversion | character varying(65535) | device.ismobile | boolean | device.mobiledevicebranding | character varying(65535) | device.mobiledevicemodel | character varying(65535) | device.mobileinputselector | character varying(65535) | device.mobiledeviceinfo | character varying(65535) | device.mobiledevicemarketingname | character varying(65535) | device.flashversion | character varying(65535) | device.javaenabled | boolean | device.language | character varying(65535) | device.screencolors | character varying(65535) | device.screenresolution | character varying(65535) | device.devicecategory | character varying(65535) | geonetwork.continent | character varying(65535) | geonetwork.subcontinent | character varying(65535) | geonetwork.country | character varying(65535) | geonetwork.region | character varying(65535) | geonetwork.metro | character varying(65535) | geonetwork.city | character varying(65535) | geonetwork.cityid | character varying(65535) | geonetwork.networkdomain | character varying(65535) | geonetwork.latitude | character varying(65535) | geonetwork.longitude | character varying(65535) | geonetwork.networklocation | character varying(65535) | fullvisitorid | character varying(65535) | userid | character varying(65535) | clientid | character varying(65535) | channelgrouping | character varying(65535) | socialengagementtype | character varying(65535) | customdimensions.index | bigint | customdimensions.value | character varying(65535) |
BigQueryと同様のクエリを投げてみます。
SELECT "device.browser", SUM ( "totals.transactions" ) AS total_transactions FROM techblog_bastion_stack_bigquery_data_bucket_xxx GROUP BY "device.browser" ORDER BY total_transactions DESC;
以下の通り、同じ実行結果が確認できました。
device.browser | total_transactions --------------------------+-------------------- Opera | Internet Explorer | YaBrowser | Safari (in-app) | Mozilla Compatible Agent | Opera Mini | Edge | UC Browser | Android Browser | Android Webview | Nokia Browser | Chrome | 40 Safari | 3 Firefox | 1 (14 rows)
以上でBigQueryからRedshiftへの移行手順は終了です。
5. リソースの削除
削除が必要なリソースは以下の通りです。
- GCSバケット (
GCS_BUCKET_NAME
) - Glueのジョブ/接続/クローラ
- CloudFormationのRedshiftスタック・Bastionスタック・VPCスタック
AWSリソースの削除に関して、いくつか注意点があります。
* Glueのジョブ/接続/クローラは、CloudFormationを使わず手動で追加したため、AWSマネジメントコンソール > Glue
を開き、各リソースを手動で削除してください。
* Redshiftスタック・Bastionスタック・VPCスタックを削除する際、スタック間に依存関係があるため、この順番で一つずつ削除してください
* Bastionスタックを削除する前に、S3バケット (S3_BUCKET_NAME
)の中身を空にしておいてください(AWSマネジメントコンソール > S3 > バケットを選択して「空にする」をクリック
)
* Redshiftスタックをを削除する前に、Glueジョブ時に作成されたENIを手動で削除してください(AWSマネジメントコンソール > EC2 > ネットワーク&セキュリティ > ネットワークインターフェース
)。
不要なコストを避けるため、検証後はAWS/GCPのマネジメントコンソールからリソースの削除をお願いします。
まとめ
この記事では、Google BigQueryからAmazon Redshiftにデータを移行する流れをご紹介しました。
AWSブログのCloudFormationテンプレートを使ってRedshift環境を構築し、
BigQueryの一般公開データセットのデータを、bq extract コマンドでGCSに出力、
.boto構成ファイルに認証情報を追加し、gsutilツールでGCSからS3にデータをコピー、
Glueを使って、S3からRedshiftへETLジョブを実行し、Redshiftにクエリを投げてデータを確認しました。
この記事の内容が少しでも参考になれば幸いです。
テックブログ新着情報のほか、AWSやGoogle Cloudに関するお役立ち情報を配信中!
Follow @twitter2017年4月、NHNテコラスに新卒入社。データサイエンスチームに所属し、AWSを活用したデータ分析サービスの設計開発を担当。
Recommends
こちらもおすすめ
-
【初心者向け】BigQueryって聞いたことあるけど、どんなサービス?
2023.12.22
-
CLI で覚える Google BigQuery
2020.1.30
Special Topics
注目記事はこちら
データ分析入門
これから始めるBigQuery基礎知識
2024.02.28
AWSの料金が 10 %割引になる!
『AWSの請求代行リセールサービス』
2024.07.16