業界・業務から探す
導入目的・課題から探す
データ・AIについて学ぶ
News
Hakkyについて
ウェビナーコラム
◆トップ【Hakkyの社内Wiki】Strapi
クラウドとオンプレの違いAIシステム導入時におすすめのクラウドシステムスクレイピングのためのプロキシサーバのAPI
TerraformでGCPからAWSのリソースにアクセスするGoogle Cloudとは?
AI

執筆者:Handbook編集部

BigQuery から、Cloud SQL へのデータの連携

この記事では、BigQuery から Cloud SQL へのデータ連携の方法について解説します。

全体像

以下のような手順で BigQuery から Cloud SQL にデータを渡すことができます[^1]

  1. BigQuery でデータを CSV ファイルにエクスポートし、そのファイルを Googl Cloud Storage (GCS) に保存します[^2]
  2. GCS から Cloud SQL に CSV をインポートします[^3]

この手順は単純に見えますが、Workflows でプロセスを順序づけることが望ましいでしょう。 なぜならば、BigQuery は複数のファイルに大量のデータをエクスポートする一方で、Cloud Storage には一度に 1 つのファイルしかインポートできないという制約があるからです。

次の節から、このパイプラインの内容について記述します。

BigQuery データのエクスポート

BigQuery は最大 1 GB のデータを 1 つのファイルにエクスポートできます[^4]。 1 GB を超えるデータをエクスポートする場合は、データが複数のファイルにエクスポートされ、さまざまなサイズのファイルになります。

この際、単一のエクスポート ジョブで複数のテーブルからデータをエクスポートすることはできません。

その他、データのエクスポート時の制限事項を確認したい方は、エクスポートの制限事項をご参照ください。

EXPORT DATA ステートメントを用いたデータのエクスポート

この記事では、BigQuery からデータを GCS にエクスポートするために、EXPORT DATA ステートメントを使用します。

さらに、クエリを実行するためのワークフローコネクタとして、googleapis.bigquery.v2.jobs.query が用意されているので、これも利用します。

よって、BigQuery からデータを GCS にエクスポートするステップは、ワークフローコード上で以下のように記述されます。

main:
  steps:
    - assignStep:
        assign:
          - bucket: "エクスポート先のバケット名を記入してください。"
          - projectid: "プロジェクトIDを記入してください。"
          - prefix: "GCS 上にエクスポートされるファイルの名前につけたいプレフィックスを指定してください。"
          - query: "ここに、BigQuery で実行したいクエリを記述してください。そのクエリの結果が CSV としてエクスポートされます。"
          # ... その他のパラメータ ...
    - export-query:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${projectid}
          body:
            query: ${"EXPORT DATA OPTIONS( uri='gs://" + bucket + "/" + prefix + "*.csv', format='CSV', overwrite=true,header=false) AS " + query}
            useLegacySql: false
    # ... 以降のステップ ...

なお、このワークフローのこのステップを実行するためには、以下のロールが必要です。

  • bigquery.dataViewer: BigQuery 上のデータにアクセスするために必要です
  • bigquery.jobUser: ジョブを作成するために必要です
  • storage.objectAdmin: GCS にファイルをエクスポートするために必要です

GCS から Cloud SQL への CSV のインポート

BigQuery から GCS にエクスポートしたデータを Cloud SQL にインポートするには、GCS 上の複数の CSV を Cloud SQL にインポートする必要があります。

単一 CSV ファイルのインポート

単一の CSV ファイルを Cloud SQL にインポートするには、Cloud SQL Admin API のインポート用 API を呼び出せばよいです。これは、以下のようなワークフローコードで実行できます。

- callImport:
    call: http.post
    args:
      url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectid + "/instances/" + instance + "/import"}
      auth:
        type: OAuth2 # WorkflowがAPIコールに認証ヘッダーを追加できるようにするために必要
      body:
        importContext:
          uri: ${file}
          database: ${databaseschema}
          fileType: CSV
          csvImportOptions:
            table: ${importtable}

ただし、このワークフローを実行するサービスアカウントは、Cloud SQL の管理者でなければなりません。

また、この操作は、非同期であり、かつ、非同期処理用のコネクタが用意されていないため、以下のような対処が必要になります。

  • インポート操作の状態を定期的に確認することで、インポートの終了を確認します。
  • ジョブが終了したら、他のファイルを反復処理するか、終了するかのどちらかの操作を行います。

以上を踏まえると、 CSV ファイルを Cloud SQL にインポートするためのワークフローコードは以下のように記述できます。

import_file:
  params:
    - projectid
    - instance
    - databaseschema
    - importtable
    - file
  steps:
    - callImport:
        call: http.post
        args:
          url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectid + "/instances/" + instance + "/import"}
          auth:
            type: OAuth2
          body:
            importContext:
              uri: ${file}
              database: ${databaseschema}
              fileType: CSV
              csvImportOptions:
                table: ${importtable}
        result: operation
    - chekoperation:
        switch:
          - condition: ${operation.body.status != "DONE"}
            next: wait
        next: completed
    - completed:
        return: "done"
    - wait:
        call: sys.sleep
        args:
          seconds: 1
        next: getoperation
    - getoperation:
        call: http.get
        args:
          url: ${operation.body.selfLink}
          auth:
            type: OAuth2
        result: operation
        next: chekoperation

複数 CSV ファイルをインポートする

BigQuery から GCS にデータをエクスポートすると複数のファイルに分割される可能性があるので、Cloud SQL にデータをインポートするには、それらのファイルを走査するようにワークフローを定義する必要があります。

そこで、ここではバケット内のオブジェクトをリストするためのワークフローコネクタ googleapis.storage.v1.objects.list を用います。

このワークフローコネクタを利用するとき、対象のバケット内に大量のファイルがある場合、レスポンスが数ページに及ぶことがあります。この場合、ファイルだけでなく、ページも含めて繰り返し処理する必要があることに注意してください。

以下のワークフローコードでは、pagetoken で指定されたある1ページのみに対してのデータのインポート処理を定義します。

list_file_to_import:
  params:
    - pagetoken
    - bucket
    - prefix
    - projectid
    - instance
    - databaseschema
    - importtable
  steps:
    - list-files:
        call: googleapis.storage.v1.objects.list
        args:
          bucket: ${bucket}
          pageToken: ${pagetoken}
          prefix: ${prefix}
        result: listResult
    - process-files:
        for:
          value: file
          in: ${listResult.items}
          steps:
            - wait-import:
                call: import_file
                args:
                  projectid: ${projectid}
                  instance: ${instance}
                  databaseschema: ${databaseschema}
                  importtable: ${importtable}
                  file: ${"gs://" + bucket + "/" + file.name}
    - return-step:
        return: ${listResult}

全 CSV ファイルをインポートする

単一 CSV ファイルのインポート, 複数 CSV ファイルをインポートする を踏まえて、対象の CSV ファイルをすべてインポートするワークフローコードは以下のようになります。

main:
  steps:
    - assignStep:
        assign:
          - bucket: "エクスポート先のバケット名を記入してください。"
          - projectid: "プロジェクトIDを記入してください。"
          - prefix: "GCS 上にエクスポートされるファイルの名前につけたいプレフィックスを指定してください。"
          - query: "ここに、BigQuery で実行したいクエリを記述してください。そのクエリの結果が CSV としてエクスポートされます。"
          - instance: "Cloud SQL のインスタンス名>"
          - databaseschema: "<Cloud SQL のインポート先データベース名>"
          - importtable: "<Cloud SQL のインポート先テーブル名>"
          - listResult:
              nextPageToken: ""
    - export-query:
        # ...略...
    - importfiles:
        call: list_file_to_import
        args:
          pagetoken: ${listResult.nextPageToken}
          bucket: ${bucket}
          prefix: ${prefix}
          projectid: ${projectid}
          instance: ${instance}
          databaseschema: ${databaseschema}
          importtable: ${importtable}
        result: listResult
    - missing-files:
        switch:
          - condition: ${"nextPageToken" in listResult}
            next: importfiles

ワークフローコード全体を確認したい方は、https://github.com/guillaumeblaquiere/workflow-bq-to-cloudsql/blob/main/import.yaml を参照してください。

参考

[^1]: Replicate data from BigQuery to Cloud SQL with Cloud Workflow [^2]: テーブルデータのエクスポート [^3]: CSV ファイルからのデータのインポート [^4]: テーブルデータのエクスポート

info
備考

Hakky ではエンジニアを募集中です!まずは話してみたいなどでも構いませんので、ぜひお気軽に採用ページからお問い合わせくださいませ。

2025年06月15日に最終更新
読み込み中...