RabbitMQとCeleryを使ってDjangoでジョブキューしてみる(1)

注意:今回使っているのは
RabbitMQ v2.1.0
Celery v2.1.1
Python v2.6.5
です。バージョンによっては挙動が違うかも知れません。

そもそもメッセージキューイングって何よ?って人はこちら→メッセージキューイングとは
さて、Djangoから使えるジョブキュー(タスクキューとも言う)のモジュールを探したらCeleryなるものを発見しました。Celery自体は単なるPythonモジュールで実際にキューイングを行うためのエンジン(message broker)は色々選べるようです。ドキュメントによればRabbitMQというのがオススメだと書いてあるので今回は素直にこれを採用することにしました。(他にも例えばSQLAlchemyを通してRDBでやったりできるみたい)
ちなみに、バックエンドで働くbrokerへのインタフェースを提供するCeleryのことをbrokerと対比してwokerと呼ぶ様です。

RabbitMQ

f:id:yuku_t:20101018095002p:image
ホームページによるとRabbitMQは

  • 頑健性を持ったメッセージングを提供するmassage broker
  • 使うのが簡単で、主要なOSプログラミング言語をサポート
  • Amazon EC2の上で動かすこともできる
  • HTTPやStompからも使える
  • サーバを再起動してもメッセージが残るから安心
  • Mozilla Public LicenseでライセンスされたオープンソースソフトウェアでErlangで書かれている

などなどの特徴を備えているらしい

インストール

参考:RabbitMQ - Downloading and Installing RabbitMQ

$ sudo apt-get install rabbitmq-server # Debian GNU/Linux and Ubuntu Linux

インストールが完了するとそのままrabbitmq-serverが起動された。サーバが初めて起動されたり、データベースが初期化されなかった場合

virtual host = /
username = guest
password = guest

がRabbitMQに設定されます。
サーバが起動されていない場合は

$ sudo rabbitmqctl start # とか
$ sudo service rabbitmq-server start # とか

で起動できます。

管理

参照:RabbitMQ - rabbitmqctl(1) manual page
rabbitmqctlというコマンドラインツールを使って管理することができます。例えばユーザを新規作成するには

$ rabbitmqctl add_user user_name password

webインタフェースやHTTP APIを提供するプラグインもあるみたい。

Celery

f:id:yuku_t:20101018173356p:image

  • Celeryオープンソースの非同期タスクキュー(ジョブキュー)
  • 当然中身はPython
  • リアルタイムでの処理ができる
  • スケジューリングもできる
  • すでに100万タスク/日の規模のプロダクトで使われている
  • Django、PyloneやFlaskなどで簡単に使える(もともとはDjango用プラグインとして誕生したらしい)
インストール
$ sudo pip install celery
ためしにCeleryを使ってみる

参考:First steps with Celery — Celery 2.6.0rc4 documentation
2つの数の和を計算する簡単な例が載っていました。タスクはPythonモジュールとして定義します。tasks.pyを以下の様に作成します

# -*- coding: utf-8 -*-
from celery.task import Task
from celery.decorators import task

class AddTask(Task):
    def run(self, x, y):
        logger = self.get_logger(task_name=u'クラス')
        logger.info("Adding %s + %s" % (x, y))
        return x + y

@task
def add(x, y):
    logger = Task.get_logger(task_name=u'デコレータ')
    logger.info("Adding %s + %s" % (x, y))
    return x + y

通常のタスクはTaskクラスを継承して定義します。関数から簡単にタスクを作るためのデコレータも用意されています。
CeleryのloggerはPythonのloggingモジュールの薄いラッパーになっているので、普段どおりの気分でログをとることができます。
次にceleryconfig.pyというファイルを作成します。

$ cat celeryconfig.py
# message brokerの設定.
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "guest"
BROKER_PASSWORD = "guest"
BROKER_VHOST = "/"

# バックエンドを指定。今回はRabbitMQに対してAMQPというプロトコルで接続する
CELERY_RESULT_BACKEND = "amqp"

# workerの設定
## 平行度 CPUの数に近づけるといいらしい。省略するとCPU/coreが使われる。
#CELERYD_CONCURRENCY
## ログの出力先。省略すると標準エラー出力が選ばれる
CELERYD_LOG_FILE = "celeryd.log"
## ログのレベル
CELERYD_LOG_LEVEL = "INFO" # DEBUG, INFO, WARNING, ERROR or CRITICAL

# 起動時に読み込むモジュール
CELERY_IMPORTS = ("tasks", ) # 上のtasks.pyのこと

Celeryを起動します。

$ ls
celeryconfig.py   tasks.py
$ celeryd # 詳しいオプションは --help で参照

タスクを実行してみる

>>> from tasks import AddTask, add                                                        
>>> r1 = AddTask().delay(3, 4) # タスクをworkerに渡して遅延評価させる
>>> r1
<AsyncResult: 140b315b-60c6-4846-81d5-a8cb4611d413>
>>> r1.ready() # 処理が終わってなかったらFalseが返ってくる
True                                                           
>>> r1.result # 処理が終わってなかったらNoneが返ってくる
7
>>> r1.get() # 処理が終わってなかったら処理が終わるまでひたすら待ってから返す
7
>>> r1.successful() # 処理がエラー終了していなければTrueを返す
True
>>> r2 = add.delay(5, 6)  # タスクをworkerに渡して遅延評価させる
>>> r2
<AsyncResult: 1d9d4a7f-45c7-47bc-974c-85e9c3363313> 

ここでceleryd.logを見てみると...

$ cat celeryd.log
[...]
[2010-10-18 17:00:21,800: INFO/PoolWorker -9] [クラス(-?-)] Adding 3 + 4
[2010-10-18 17:00:21,899: INFO/MainProcess] Task tasks.AddTask[140b315b-60c6-4846-81d5-a8 cb4611d413] processed: 7  
[...]
[2010-10-18 17:00:30,320: INFO/PoolWorker -5] [デコレータ(-?-)] Adding 5 + 6
[2010-10-18 17:00:30,419: INFO/MainProcess] Task tasks.add[1d9d4a7f-45c7-47bc-974c-85e9c3 363313] processed: 11  
[...]

今回は処理が一瞬で終わってしまうので遅延評価の様子を伺えませんでしたが、どうやらうまく動いているっぽいことが確認できました。

ちょっとした注意

今回はデフォルトのユーザ(guest)を使いましたが、本来ならば自分で定義したユーザを使ってアクセスします。ただ、この時にceleryconfig.pyでユーザの設定などを間違えると、当然ながらcelerydを実行してもworkerが起動されません。が、この時出力されるエラーがなんと

CarrotListener: Connection Error: Socket closed.

ユーザ情報が間違ってるとか、ポートが変とか、そういうのを全てひっくるめてSocket closedと表現されます。エラー文を見ただけだとどこに問題があるのか分かりにくいので注意が必要です。*1

長くなってきたので

ここまでのところでbrokerであるRabbitMQに対してworkerであるCeleryを使うことによりPythonからメッセージキューイングすることができるようになりました。次はいよいよDjangoから使ってみようと思いますが、長くなってきたのでまた次回。

つづき→RabbitMQとCeleryを使ってDjangoでジョブキューしてみる(2) - SELECT * FROM life;

*1:僕はRabbitMQが上手く動いていないのかと勘違いして、大いに時間を無駄にしました.....orz