KFP の文脈でパイプラインとは有向非巡回グラフ(DAG)で定義された一連の機械学習ワークフローのことです。
KFP では Python を使って、パイプラインの実行内容、入力(パラメータ)を定義します。定義したパイプラインは Web UI を使ってアップロードし、組織内で共有することができます。
以下は Web UI から確認できる実際のパイプラインの例です。
Example
以下が公式で公開されているサンプルの 1 つです。ここではイメージを掴んでもらうことが目的なため一つ一つ詳細な説明はしませんが、このように Python を用いて Pipeline を定義することができます。
DAG で機械学習ワークフローを定義するものには XML などのファイルで静的に DAG を定義するものと、Kubeflow のように Python などのプログラミング言語を使って定義するもの 2 種類があります。前者のメリットは固有のプログラミング言語の知識が入らないため導入が比較的簡単なことです。一方で、後者は動的な処理や複雑なプログラミングの処理を定義できるというメリットがあります。
# Reference: https://github.com/kubeflow/pipelines/blob/master/samples/v2/hello_world.py
import os
from kfp import dsl
from kfp import compiler
# In tests, we install a KFP package from the PR under test. Users should not
# normally need to specify `kfp_package_path` in their component definitions.
_KFP_PACKAGE_PATH = os.getenv('KFP_PACKAGE_PATH')
@dsl.component(kfp_package_path=_KFP_PACKAGE_PATH)
def hello_world(text: str) -> str:
print(text)
return text
@dsl.pipeline(name='hello-world', description='A simple intro pipeline')
def pipeline_hello_world(text: str = 'hi there'):
"""Pipeline that passes small pipeline parameter string to consumer op."""
consume_task = hello_world(
text=text) # Passing pipeline parameter as argument to consumer op
if __name__ == "__main__":
# execute only if run as a script
compiler.Compiler().compile(
pipeline_func=pipeline_hello_world,
package_path='hello_world_pipeline.json')