業界・業務から探す
導入目的・課題から探す
データ・AIについて学ぶ
News
Hakkyについて
ウェビナーコラム
◆トップ【データ基盤】
データハブとは
Ajust
データの保守運用
AI

執筆者:Handbook編集部

DataHubでS3のデータをingestする方法

この記事では、オープンソースのモダンなデータカタログのDataHubでS3上のデータをUI, CLIを用いてIngestします。 本記事ではDataHubはEKSであることを前提として以降説明します。

UI編

Secretの登録

Ingestion > Secretsから登録を行います。

S3上のデータの登録

Ingestion > Sources > Create new sourceから登録を行います。

データソースのタイプにS3は存在しないため、Otherを選択し、以下のようにyamlを書いて設定します。 先ほど登録したsecretsを用い以下のように登録します。

source:
    type: s3
    config:
        path_specs:
            -
                include: 's3://xxxxxxxxx/yyyy/*.csv'
        aws_config:
            aws_access_key_id: '${aws_access_key_id}'
            aws_secret_access_key: '${aws_secret_access_key}'
            aws_region: ap-northeast-1
        env: sandbox
        profiling:
            enabled: false

CLI編

Python環境のセットアップ

現時点ではPython バージョン3.11の利用が推奨されています。

以下のコマンドを実施し操作に必要なPythonパッケージをインストールします。

python3 -m pip install --upgrade pip wheel setuptools
python3 -m pip install --upgrade acryl-datahub

バージョンを表示させて、datahubパッケージが正常にインストールされたかを確認します。

python3 -m datahub version

その他連携に必要なパッケージのインストールは公式ドキュメントのInstalling Pluginsを参照願います。 ここではREST API用のパッケージとs3用のパッケージを以下のコマンドでインストールします。

python3 -m pip install 'acryl-datahub[datahub-rest]'
python3 -m pip install 'acryl-datahub[s3]'

GMSのエンドポイント確認

以下のコマンドでGMSのエンドポイントを確認します。

kubectl get svc

以下のような出力が得られるので、datahub-datahub-gmsのEXTERNAL-IPを使ってアクセスします。

NAME                                  TYPE           CLUSTER-IP       EXTERNAL-IP                                                                   PORT(S)                         AGE
datahub-acryl-datahub-actions         ClusterIP      xxx.xx.xx.xx     <none>                                                                        9093/TCP,8000/TCP               28m
datahub-datahub-frontend              LoadBalancer   xxx.xx.xx.xx     <lb_name>.<region>.elb.amazonaws.com   9002:30837/TCP,4318:30426/TCP   28m
datahub-datahub-gms                   LoadBalancer   xxx.xx.xx.xx     <lb_name>.<region>.elb.amazonaws.com   8080:30386/TCP,4318:31400/TCP   28m
elasticsearch-master                  ClusterIP      xxx.xx.xx.xx     <none>                                                                        9200/TCP,9300/TCP               34m
elasticsearch-master-headless         ClusterIP      None             <none>                                                                        9200/TCP,9300/TCP               34m
kubernetes                            ClusterIP      xxx.xx.xx.xx     <none>                                                                        443/TCP                         41m
prerequisites-cp-schema-registry      ClusterIP      xxx.xx.xx.xx     <none>                                                                        8081/TCP,5556/TCP               34m
prerequisites-kafka                   ClusterIP      xxx.xx.xx.xx     <none>                                                                        9092/TCP                        34m
prerequisites-kafka-broker-headless   ClusterIP      None             <none>                                                                        9094/TCP,9092/TCP               34m
prerequisites-mysql                   ClusterIP      xxx.xx.xx.xx     <none>                                                                        3306/TCP                        34m
prerequisites-mysql-headless          ClusterIP      None             <none>                                                                        3306/TCP                        34m
prerequisites-zookeeper               ClusterIP      xxx.xx.xx.xx     <none>                                                                        2181/TCP,2888/TCP,3888/TCP      34m
prerequisites-zookeeper-headless      ClusterIP      None             <none>                                                                        2181/TCP,2888/TCP,3888/TCP      34m

アクセスTokenの生成

以下の画像のようにUI (設定 > Access Tokens > Create new token) からTokenを事前に生成しコピーしておきます。

Ingestの実行

UIの時と同様にレシピをyamlで作成します。 ここではUI編で作成したレシピに加えてREST APIへのsinkの設定が必要となります。 "Your API key"の部分は上で作成したtokenをコピーして使用してください。

source:
    type: s3
    config:
        path_specs:
            -
                include: 's3://xxxxxxxxx/yyyy/*.csv'
        aws_config:
            aws_access_key_id: <Yor aws_access_key_id>
            aws_secret_access_key: <Your aws_secret_access_key>
            aws_region: ap-northeast-1
        env: sandbox
        profiling:
            enabled: false

# Recipe sink configuration.
sink:
  type: "datahub-rest"
  config:
    server: "http://<lb_name>.<region>.elb.amazonaws.com:8080"
    token: <Your API key>

ingestの実行。

datahub ingest -c <path/to/recipe.yml>

SDK編

Python環境のセットアップ

パッケージとしてはCIL編と同様、acryl-datahub[datahub-rest], acryl-datahub[s3]が必要となります。 パッケージのインストールについてはCLI編を参照願います。

Ingestionの実行

以下のようなPythonコードを準備し、Python scriptを実行します。 CLI編で作成したyamlファイルとtokenを再利用します。

from datahub.ingestion.run.pipeline import Pipeline
import yaml


config_file_path = "<path/to/recipe.yml>"

def load_config(file_path):
    with open(file_path, 'r') as file:
        config = yaml.safe_load(file)
    return config


# The pipeline configuration is similar to the recipe YAML files provided to the CLI tool.
pipeline = Pipeline.create(
    load_config(config_file_path)
)

# Run the pipeline and report the results.
pipeline.run()
pipeline.pretty_print_summary()

まとめ

この記事では、DataHubでS3のデータをingestする方法ついて紹介しました。

参考

info
備考

Hakky ではエンジニアを募集中です!まずは話してみたいなどでも構いませんので、ぜひお気軽に採用ページからお問い合わせくださいませ。

2025年07月06日に最終更新
読み込み中...