業界・業務から探す
導入目的・課題から探す
データ・AIについて学ぶ
News
Hakkyについて
ウェビナーコラム
◆トップ【Hakkyの社内Wiki】Strapi
AI

執筆者:Handbook編集部

celeryの基本的な使用方法

Celery について

プロジェクトウェブサイト によると、以下のように説明がありました。

Celery は、膨大な量のメッセージを処理するためのシンプルで柔軟かつ信頼性の高い分散システムであり、そのようなシステムを維持するために必要なツールを運用者に提供するものである。

リアルタイム処理に特化したタスクキューであり、タスクのスケジューリングもサポートする。

インストール

poetry を使う場合には、 pyproject.tomlceleryflower を追記して poetry install します。例としては次のような形です。

[tool.poetry.dependencies]
celery = "5.2.6"
redis = "3.5.3"
flower = "1.0.0"

Celery の構成

Celery を実行する worker に加えて、Broker と Backend が必要です。さらに、モニタ用の flower も用意することもできます。

Broker と Backend

Celery を使用する際には、メッセージキューであるブローカーと、処理結果を保持するバックエンドを用意する必要があります。

  • メッセージブローカー:メッセージを格納、引き出しできるデータストアです
    • Redis や RabbitMQ などを利用可能です
  • バックエンド:処理結果の保持を担当します
    • ブローカーと同様に Redis などを用いることもできるほか、PostgreSQL や MySQL などを採用して永続性を強化することもできます

Broker と backend の例

本記事ではブローカーとバックエンドの両方で Redis を使用します。例えば、docker-compose.yml で以下のようなサービスを定義して起動しておけばよいです。

version: "3"
services:
  redis:
    image: redis:6-alpine
    hostname: redis
    ports:
      - 6379:6379
    expose:
      - "6379"

worker

Celery のワーカー用のコンテナを用意します。

workerの例

  • Dockerfile

    FROM python:3.9-buster
    
    WORKDIR /root
    
    ENV POETRY_HOME="/opt/poetry" \
        POETRY_VERSION=1.1.14
    ENV PATH="$POETRY_HOME/bin::$PATH"
    
    COPY poetry.lock pyproject.toml ./
    RUN apt-get clean && apt-get update && \
        apt-get install -y sudo && \
        pip install -U pip && \
        curl -sSL https://install.python-poetry.org | python3 - && \
        poetry config virtualenvs.create false && \
        poetry install --no-dev
    
    COPY . /root/
    
    RUN rm -rf /var/cache/apk/*
    EXPOSE 5000
    
  • dokcer-compose.yml

    version: "3"
    services:
      # ...redis...
    
      worker:
        build:
          context: .
          dockerfile: Dockerfile
        hostname: worker
        command: celery --app app.app.celery worker --loglevel=info
        environment:
          - CELERY_BROKER_URL=redis://redis:6379/0
          - CELERY_RESULT_BACKEND=redis://redis:6379/0
        depends_on:
          - redis
    

Flower によるモニタリング

Flower は Celery の処理状況をモニタリングするためのライブラリです。ワーカーの稼働状況やタスクの処理状況をダッシュボードとしてブラウザから確認でき、ワーカーの再起動やオートスケーリングの設定などを行うことも可能です。

Flower 用コンテナの例

Dockerfileworker の例と同じものを使用します)

version: "3"
services:
  # ...redis...

  # ...worker...

  dashboard:
    build:
      context: .
      dockerfile: Dockerfile
    command: celery --app app.app.celery flower --port=5555 --broker=redis://redis:6379/0
    ports:
      - 5556:5555
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
    depends_on:
      - redis
      - worker

使い方

基本的には以下のような使い方をします

  1. タスクの定義
  2. タスクの実行開始
  3. タスクの結果取得

タスクの定義

Celery の import

from celery import Celery

celery = Celery(__name__)

backend と broker URL の指定

例:ここでは backend と broker の設定は broker と backend の例に従うものとします

celery.conf.broker_url = os.environ.get('CELERY_BROKER_URL',
                                       'redis://redis:6379')
celery.conf.result_backend = os.environ.get('CELERY_BACKEND_URL',
                                           'redis://redis:6379')

タスク本体の定義

@celery.task(name='tasks.hoge')
def hoge(arg_1, arg_2, ..., arg_n):
    """
    ここに何らかの計算処理を書いてください
      - args を引数に取り、result を返す処理を記述します
    """

    return result

タスクの実行開始

  1. タスク本体の定義 で関数として定義したタスク (hoge) に対して delay メソッドを呼び出すと、タスクの処理が開始されます

  2. delay メソッドには、hoge が受け取る引数を渡しましょう

  3. delay メソッドの返り値の id 要素を参照することで、実行開始されたタスクの ID が分かります(この ID はタスクの結果の取得で使います)

    • たとえば、POST メソッドの処理内部で delay メソッドを呼び出して取得したタスク ID をクライアント側に返すことで、タスクの進行状況や計算結果の確認に利用させることができます
    import hoge
    
    task = hoge.delay(arg_1, arg_2, ..., arg_n)
    
    response = {
      "task_id": task.id
    }
    

タスクの結果の取得

  1. celery の import で定義した celery オブジェクトの AsyncResult メソッドを呼び出すことで、タスクの進行状況や計算結果を取得できます

  2. AsyncResult メソッドの返り値に格納されている各値を参照しましょう

    1. task_id 要素にはタスク ID が格納されます
    2. status 要素にはタスクの進行状況が格納されます
    3. result 要素には計算結果が格納されます
  3. 例えば、以下のように使用します

    result = celery.AsyncResult(task_id)
    
    status = {
      "predict_id": result.task_id,
      "status": result.status,
      "predict_result": result.status,
    }
    

参考資料

info
備考

Hakky ではエンジニアを募集中です!まずは話してみたいなどでも構いませんので、ぜひお気軽に採用ページからお問い合わせくださいませ。

2025年06月02日に最終更新
読み込み中...