RabbitMQとCeleryを使ってDjangoでジョブキューしてみる(2)

RabbitMQとCeleryを使ってDjangoでジョブキューしてみる(1)
前回でPythonからCeleryを利用してジョブキューイングをすることができるようになりました。今回はDjangoでジョブキューイングします。

django-celeryのインストール

$ sudo pip install django-celery

django-celeryはDjangoからCeleryを利用するためのモジュールです。続いてceleryを使いたいDjangoプロジェクトのsettings.pyのINSTALLED_APPSにdjceleryを追加します。

# settings.py
INSTALL_APPS = (
    'django.contrib.auth',
    ...
    'myapp',
    'djcelery', # 追加
)

また、前回のceleryconfig.pyに書いた内容もsettings.pyに追加します。その時djceleryをインポートします。

# settings.py

import djcelery
djcelery.setup_loader()

# message brokerの設定.
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "guest"
BROKER_PASSWORD = "guest"
BROKER_VHOST = "/"

# バックエンドを指定。今回はRabbitMQに対してAMQPというプロトコルで接続する
CELERY_RESULT_BACKEND = "amqp"

# workerの設定
## 平行度 CPUの数に近づけるといいらしい。省略するとCPU/coreが使われる。
#CELERYD_CONCURRENCY
## ログの出力先。省略すると標準エラー出力が選ばれる
CELERYD_LOG_FILE = "celeryd.log"
## ログのレベル
CELERYD_LOG_LEVEL = "INFO" # DEBUG, INFO, WARNING, ERROR or CRITICAL

tasks.pyの作成

django-celeryではINSTALL_APPSでインストールされているアプリケーションの中にあるtasks.pyを自動的にロードします。myappアプリケーションにtasks.pyを作成してみます。

from celery.task import Task
from celery.decorators import task

@task
def add(x, y):
    logger = Task.get_logger()
    logger.info("Adding %s + %s" % (x, y))

celerydの起動

celeryのコマンドは全てpython manage.py ***の形で呼び出すことができます。

$ python manage.py celeryd

これでcelerydが起動されました。端末から応答が返ってきませんが気にせずに新しい端末を立ち上げましょう。

ジョブキューイングしてみる

これでdjangoプロジェクトの中の好きな場所で

from myapp.tasks import add
add.delay(1, 1)

を呼ぶことでceleryにジョブを投げることができるようになりました。

$ python manage.py shell
Python 2.6.5
>>> from myapp.tasks import add
>>> r = add.delay(5, 6)
>>> r.ready()
True
>>> r.successful()
True
>>> r.result
11

ログを見てみる

Djangoプロジェクトのディレクトリにceleryd.logが出来ているはずです

$ cat celeryd.log
[2010-11-12 07:40:30,402: WARNING/MainProcess] Configuration ->
    . broker -> amqp://guest@localhost:5672/
    . queues ->
        . celery -> exchange:celery (direct) binding:celery
    . concurrency -> 2
    . loader -> djcelery.loaders.DjangoLoader
    . logfile -> celeryd.log@INFO
    . events -> OFF
    . beat -> OFF
    . tasks ->
	. myapp.tasks.add
[2010-11-12 07:40:30,421: INFO/PoolWorker-1] child process calling self.run()
[2010-11-12 07:40:30,425: WARNING/MainProcess] celery@yuku-laptop has started.
[2010-11-12 07:40:30,424: INFO/PoolWorker-2] child process calling self.run()
[2010-11-12 07:40:46,562: INFO/MainProcess] Got task from broker: myapp.tasks.add[1598d1b5-bf82-47e4-a9ee-75e950eab637]
[2010-11-12 07:40:46,601: WARNING/PoolWorker-1] [2010-11-12 07:40:46,601: INFO/PoolWorker-1] [-?-(-?-)] Adding 5 + 6
[2010-11-12 07:40:46,601: INFO/PoolWorker-1] Adding 5 + 6
[2010-11-12 07:40:46,693: INFO/MainProcess] Task myapp.tasks.add[1598d1b5-bf82-47e4-a9ee-75e950eab637] processed: 11