業界・業務から探す
導入目的・課題から探す
データ・AIについて学ぶ
News
Hakkyについて
ウェビナーコラム
◆トップ【データ基盤】
データハブとは
Ajust
データの保守運用
AI

執筆者:Handbook編集部

dbtのpythonモデルについて

はじめに

この記事では、BigQuery 上のデータを dbt の python モデルを使用して ELT 処理を行う方法を紹介します。python モデルを使用することで、python を使用して ELT 処理を行うことが可能になります。

dbt python モデルとは

dbt python モデルは、dbt が python を直接実行できるようにする拡張機能です。SQL の代わりに python でモデルを作成することはもちろんのこと、外部 API の実行結果をモデルに含めたり、ELT 処理の中に動的な処理を組み込むことが簡単にできるようになります。

Dataproc or Fal

dbt python モデルを Bigquery で使用する際の接続方法には、Dataproc と Fal の 2 通りがあります。 この記事では簡単に実装可能な Fal を使用しますが、以下に両方の特徴について記載しておきます。

Google DataprocFAL
概要BigQuery と組み合わせて使用する場合、dbt はデフォルトで Dataproc を使用して python コードを実行しようとする。Dataproc は Google Cloud の Hadoop と Spark のフルマネージドクラスターサービスで、一般的にビッグデータ処理ワークロードの実行に適している。FAL を使用すると、python モデルはローカルで実行されるため簡単に Bigquery と連携することができる。
利点・ビッグデータ処理
・クラスター内の複数のノード間で分散データ処理可能
・簡単に dbt python モデルと BQ を連携できる
注意点Dataproc の設計やコスト等考慮すべきことが多いsql モデルは BQ クエリジョブとして実行されるが、python モデルの内部処理ではローカルで実行された後、PARQUET を LOAD させる動きとなるため、モデルによっては dbt 実行環境への負荷が大きい
互換性BigQuerydbt がサポートする一般的なデータウェアハウス(Postgres, Redshift, BigQuery, Snowflake など)

使ってみる

それでは、ここからは実際に dbt python モデルを使用して、Bigquery の ELT 処理を実装してみます。

構成

今回の構成は以下のようなシンプルなものです。
Cloud Functions で簡易的な API を用意して、python モデルで単純なデータ処理と API リクエスト処理を実装してみます。 Bigquery のサンプルデータは、dbt 入門の「bigquery にデータを用意」で紹介しているデータを使用します。

コード

Cloud Functions

まずは API となる Cloud Functions です。 リクエストのデータをそのまま返します。

import pandas as pd
from pandas.io.json import json_normalize
import json

def return_data(request):
  request_json = request.get_json()

  df = json_normalize(request_json)
  print(df)

  return json.dumps(request_json)

dbt

profiles.yml
poc:
  target: poc_with_fal
  outputs:
    poc_with_fal:
      type: fal
      db_profile: poc
    poc:
      type: bigquery
      method: oauth
      project: <project>
      dataset: <dataset>

customers.sql

python モデルの参照先となる customers モデルです。

{{
  config(
    materialized = 'table'
  )
}}

with customers as (
    select
        id as customer_id,
        first_name,
        last_name
    from dbt_python.raw_customers
),
orders as (
    select
        id as order_id,
        user_id as customer_id,
        order_date,
        status
    from dbt_python.raw_orders
),
customer_orders as (
    select
        customer_id,
        min(order_date) as first_order_date,
        max(order_date) as most_recent_order_date,
        count(order_id) as number_of_orders
    from orders
    group by 1

),
final as (
    select
        customers.customer_id,
        customers.first_name,
        customers.last_name,
        customer_orders.first_order_date,
        customer_orders.most_recent_order_date,
        coalesce(customer_orders.number_of_orders, 0) as number_of_orders
    from customers
    left join customer_orders using (customer_id)
)

select * from final

customers_py.py

こちらが python モデルです。 customers モデルの内容を少し変更したものを Cloud Functions へポストし、そのレスポンス内容をそのまま実行結果としています。

import pandas as pd
import requests
import json

url = '<cloud functins endpoint url>'
headers = {'Content-Type': 'application/json'}

def model(dbt, fal):

    customers_df = dbt.ref('customers')

    customers_df['name'] = customers_df['first_name'] + " " + customers_df['last_name']
    customers_df = customers_df.drop(columns=['first_name', 'last_name'])
    customers_df = customers_df[['customer_id', 'name', 'first_order_date', 'most_recent_order_date', 'number_of_orders']]

    json_payload = customers_df.to_json(orient='records')
    response = requests.post(url, headers=headers, data=json_payload)
    print(response.status_code)

    response_data = response.json()
    df = pd.DataFrame(response_data)

    print(df)

    return df

実行結果

実行してみると、SQL モデルのcustomersと、python モデルのcustomers_pyが Bigquery に反映されます。

API 用の Cloud Functions も実行されたログがあります。

まとめ

この記事では、BigQuery 上のデータを dbt の python モデルを使用して ELT 処理を行う方法を紹介しました。 dbt python モデルを使用することで、複雑なアーキテクチャや処理フローを構築することなく単純なクエリ処理以外の処理を ELT に導入することが可能かと思います。

参考

info
備考

Hakky ではエンジニアを募集中です!まずは話してみたいなどでも構いませんので、ぜひお気軽に採用ページからお問い合わせくださいませ。

2025年07月06日に最終更新
読み込み中...