業界・業務から探す
導入目的・課題から探す
データ・AIについて学ぶ
News
Hakkyについて
ウェビナーコラム
◆トップ【データ基盤】
クラウドDWHを比較
BigQuery Data Transfer Service
Cloud ComposerData Catalog
VPC Service Controls
データの保守運用
AI

執筆者:Handbook編集部

Pub/SubとCloud Functionsを利用してGCSバケット間でオブジェクトを同期する方法

Pub/Sub と Cloud Functions を利用して GCS バケット間でオブジェクトを同期する方法

この記事では、Pub/Sub と Cloud Functions を利用して GCS バケット間でオブジェクトを同期する方法について紹介します。
Cloud Functions 以外のリソースは全て Terraform でデプロイし、Cloud Functions のみ CLI でデプロイします。

構成図

サンプルソースディレクトリ構造

この記事で使用する各種ソースのディレクトリ構造は以下の通りです。

/
├── function/
│       ├── main.py
│       └── requirements.txt
├── storage/
│       ├── upload1.txt
│       ├── upload2.txt
│       └── upload3.txt
├── provider.tf
├── main.tf
└── variables.tf

Cloud Functions

Cloud Fuctions にデプロイする Python コードと requirements.txt は以下の通りです。

main.py の処理フローの概要

  1. pubsub からメッセージを pull する
  2. 取得済みのメッセージを ack する
  3. メッセージに含まれるオブジェクト名を使って GCS 間でオブジェクトをコピーする

main.py

from cgitb import strong
import json
import time
from typing import Dict, List

from google.cloud import pubsub
from google.cloud import storage

MAX_PUBSUB_PULL = 10
# Pubsub pull interval sec
PUBSUB_PULL_INTERVAL_SEC = 5
# Approximate max pubsub messages to get
APPROXIMATE_MAX_MSGS = 300

# For Pubsub
PROJECT_ID = "hakky-sandbox-jin"
TOPIC = "pubsub-demo-sub"
# For Storage
BUCKET_NAME = "pubsub-demo-src-bucket"
DESTINATION_BUCKET_NAME = "pubsub-demo-dst-bucket"

class PubSubClient:
    def __init__(self) -> None:
        self.subscriber = pubsub.SubscriberClient()
        self.subscription_path = self.subscriber.subscription_path(PROJECT_ID, TOPIC)

    def ack(self, ack_ids: List[str]) -> None:
        self.subscriber.acknowledge(
            request={"subscription": self.subscription_path, "ack_ids": ack_ids}
        )

    def _get_msgs(self) -> List[Dict[str, str]]:
        response = self.subscriber.pull(
            request={"subscription": self.subscription_path, "max_messages": 100}
        )
        msgs = []
        for message in response.received_messages:
            ack_id = message.ack_id
            msg_str = message.message.data.decode("utf-8")
            try:
                msg = json.loads(msg_str)
                msg["ack_id"] = ack_id  # append ack_id
                msgs.append(msg)
                StorageClient(msg["name"])
                print(f"msg={msg}")
            except Exception as e:
                print(e)
                print(f"msg_str={msg_str}")
                self.ack([ack_id])
        return msgs

    def get_msgs(self, num_msgs: int = APPROXIMATE_MAX_MSGS) -> List[Dict[str, str]]:
        msgs = []
        for i in range(MAX_PUBSUB_PULL):
            print(f"pubsub pull: trial={i+1}")
            _msgs = self._get_msgs()
            msgs.extend(_msgs)
            if len(msgs) > num_msgs:
                print(f"exceed num_msgs: {len(msgs)}")
                break
            time.sleep(PUBSUB_PULL_INTERVAL_SEC)
        print(f"num_msgs: {len(msgs)}")
        return msgs

class StorageClient:
    def __init__(self, blob_name: str) -> None:
        self.storage_client = storage.Client()
        self.source_bucket = self.storage_client.bucket(BUCKET_NAME)
        self.source_blob = self.source_bucket.blob(blob_name)
        self.destination_bucket = self.storage_client.bucket(DESTINATION_BUCKET_NAME)
        self.copy_blob(blob_name, blob_name)

    def copy_blob(self, blob_name: str, destination_blob_name: str) -> None:
        self.blob_copy = self.source_bucket.copy_blob(
            self.source_blob, self.destination_bucket, destination_blob_name
        )
        print("Blob {} copied {} to {}.".format(blob_name, BUCKET_NAME, DESTINATION_BUCKET_NAME))

def main(request):
    pubsub = PubSubClient()
    msgs = pubsub.get_msgs()
    print(msgs)

    return msgs

requirements.txt

google-cloud-pubsub
google-cloud-storage

Cloud Functions デプロイ

CLI で Cloud Functions をデプロイします。
CLI で Cloud Functions をデプロイすることで、新バージョンのデプロイの手間を省くための CI/CD 環境を構築しやすくなるメリットがあります。

$ gcloud functions deploy demo-function \
	--trigger-http \
	--region=REGION \
	--runtime=python38 \
	--source=asia-northeast1/ \
	--entry-point=main

Terraform

Cloud Functions 以外のリソースは下記の Terraform コードを使用してデプロイします。
provider.tf

provider "google" {
  project = var.project
  region  = var.region
  zone    = var.zone
}

main.tf

data "google_storage_project_service_account" "gcs_service_account" {
  project = var.project
}

data "google_cloudfunctions_function" "pubsub_client_function" {
  name   = var.function
  region = var.region
}

resource "google_storage_bucket" "src_bucket" {
  for_each      = var.pubsub
  name          = each.value.src_bucket
  location      = "ASIA"
  force_destroy = true
}

resource "google_storage_bucket" "dst_bucket" {
  for_each      = var.pubsub
  name          = each.value.dst_bucket
  location      = "ASIA"
  force_destroy = true
}

resource "google_pubsub_topic" "topic" {
  for_each = var.pubsub
  name     = each.value.name
}

resource "google_pubsub_subscription" "subscription" {
  for_each = var.pubsub
  name     = "${each.value.name}-sub"
  topic    = google_pubsub_topic.topic[each.key].name

  # 7 days
  message_retention_duration = "604800s"

  # 10 minutes
  ack_deadline_seconds = 600
}

resource "google_storage_notification" "notification" {
  for_each       = var.pubsub
  bucket         = each.value.src_bucket
  payload_format = "JSON_API_V1"
  topic          = google_pubsub_topic.topic[each.key].id
  event_types    = ["OBJECT_FINALIZE"]
  depends_on = [
    google_pubsub_topic_iam_binding.binding_pubsub,
    google_storage_bucket.src_bucket
  ]
}

resource "google_pubsub_topic_iam_binding" "binding_pubsub" {
  for_each = var.pubsub
  members  = ["serviceAccount:${data.google_storage_project_service_account.gcs_service_account.email_address}"]
  role     = "roles/pubsub.publisher"
  topic    = google_pubsub_topic.topic[each.key].id
}

resource "google_cloud_scheduler_job" "scheduler" {
  for_each  = var.pubsub
  name      = each.value.scheduler
  schedule  = "*/5 * * * *"
  time_zone = "Asia/Tokyo"

  http_target {
    http_method = "POST"
    uri         = data.google_cloudfunctions_function.pubsub_client_function.https_trigger_url
  }
}

output "gcs_service_account" {
  value = data.google_storage_project_service_account.gcs_service_account.project
}

output "pubsub_client_function_name" {
  value = data.google_cloudfunctions_function.pubsub_client_function.name
}

variables.tf

variable "project" {
  description = "A name of a GCP Project"
  type    = string
  default = "hakky-sandbox-jin"
}

variable "region" {
  description = "A region"
  type    = string
  default = "asia-northeast1"
}

variable "zone" {
  description = "A zone"
  type    = string
  default = "asia-northeast1-a"
}

variable "function" {
  description = "A name of cloud functions"
  type    = string
  default = "demo-function"
}

variable "pubsub" {
  description = "Pubsub variables"
  type = map(any)
  default = {
    pubsub = {
      name       = "pubsub-demo"
      src_bucket = "pubsub-demo-src-bucket"
      dst_bucket = "pubsub-demo-dst-bucket"
      scheduler  = "pubsub-demo-scheduler"
    }
  }
}

Terraform apply

上記 Terraform を環境に適用する際は下記コマンドを実行します。

$ terraform apply

作成されるリソースの概要

| address | mode | type | | :-------------------------------------------------------------- | :------ | :------------------------------------- | ----- | | data.google_cloudfunctions_function.pubsub_client_function | data | google_cloudfunctions_function | | data.google_compute_default_service_account.gce_service_account | data | google_compute_default_service_account | | data.google_storage_project_service_account.gcs_service_account | data | google_storage_project_service_account | | google_cloud_scheduler_job.scheduler["pubsub"] | managed | google_cloud_scheduler_job | | google_pubsub_subscription.subscription["pubsub"] | managed | google_pubsub_subscription | | google_pubsub_topic.topic["pubsub"] | managed | google_pubsub_topic | topic | | google_pubsub_topic_iam_binding.binding_pubsub["pubsub"] | managed | google_pubsub_topic_iam_binding | | google_storage_bucket.dst_bucket["pubsub"] | managed | google_storage_bucket | | google_storage_bucket.src_bucket["pubsub"] | managed | google_storage_bucket | | google_storage_notification.notification["pubsub"] | managed | google_storage_notification |

動作確認

GCS(src)にサンプルファイルをアップロード

下記コマンドを実行して、storage ディレクトリ配下にある upload1.txt、upload2.txt、upload3.txt を GCS へアップロードします。

$ gsutil cp storage/upload1.txt storage/upload2.txt storage/upload3.txt gs://pubsub-demo-src-bucket
gsutil cp storage/upload1.txt storage/upload2.txt storage/upload3.txt gs://pubsub-demo-src-bucket
Copying file://storage/upload1.txt [Content-Type=text/plain]...
Copying file://storage/upload2.txt [Content-Type=text/plain]...
Copying file://storage/upload3.txt [Content-Type=text/plain]...
\ [3 files][   75.0 B/   75.0 B]
Operation completed over 3 objects/75.0 B.

GCS へアップロードされました。

Cloud Functions ログ

5 分おきに起動する Cloud Scheduler により Cloud Function が起動できていることがわかります。

GCS(dst)確認

Cloud Fuction が GCS へファイルを転送したか確認します。 upload1.txt、upload2.txt、upload3.txt が pubsub-demo-dst-bucket に存在するため転送されていることがわかります。

参考

2025年07月06日に最終更新
読み込み中...