ホームページ  >  記事  >  バックエンド開発  >  Python 環境で非同期タスク キュー パッケージ Celery をインストールして使用する

Python 環境で非同期タスク キュー パッケージ Celery をインストールして使用する

高洛峰
高洛峰オリジナル
2017-03-02 17:10:591707ブラウズ

1. はじめに

celery (セロリ) は、分散メッセージングに基づいた非同期タスク キュー/ジョブ キューです。リアルタイム操作に重点を置いていますが、優れたスケジューリング サポートも備えています。
celery は、毎日何百万ものタスクを処理するために実稼働システムで使用されています。
celery は Python で書かれていますが、プロトコルはどの言語でも実装できます。 Webhook を介して他の言語で実装することもできます。
推奨されるメッセージ ブローカーは RabbitMQ ですが、Redis、Beanstalk、MongoDB、CouchDB、およびデータベース (SQLAlchemy または Django ORM を使用) のサポートは限定的です。
Celery は、Django、Pylons、Flask を簡単に統合できます。django-celery、celery-pylons、および Flask-Celery アドオン パッケージを使用するだけです。

2. インストール
上記の概念に従って、次のものをインストールする必要があります: RabbitMQ、SQLAlchemy、Celery
インストール方法も非常に簡単です: RabbitMQ:
mac:

brew install rabbitmq

linux:

sudo apt-get install rabbitmq-server

残りの 2 つはすべて Python のものです。MySQL ドライバーをインストールしたことがない学生は、MySQL-python をインストールする必要があるかもしれません。
インストールが完了したら、サービスを開始します:

$ rabbitmq-server[回车]

開始後はウィンドウを閉じずに、下の新しいウィンドウ(タブ)を操作してください

3. 簡単なケース
以前のRabbitMQがインストールされていることを確認してください。始められました。
引き続き公式 Web サイトの例ですが、次の内容の task.py ファイルを任意のディレクトリに作成します:

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
  return x + y

同じレベルのディレクトリで実行します:

$ celery -A tasks worker --loglevel=info

このコマンドは、次のことを意味します。ワーカーを開始し、タスクをタスク (add(x,y)) に入れて、タスクをキューに入れます。
ウィンドウを開いたままにして、新しいウィンドウを開いて、Python または ipython の対話モードに入ります。

>>> from tasks import add
>>> add.delay(4, 4)

この時点までで、既に celery を使用してタスクを実行できます。追加タスクは単に で呼び出されます。上記の Python インタラクティブ モードでは、4,4 パラメータを渡します。
しかし、この時点で問題が発生しました。このタスクの実行結果とステータス、完了したかどうかを突然知りたいとします。したがって、バックエンドを設定する必要があります。
前のtasks.pyのコードを次のように変更します:

# coding:utf-8
import subprocess
from time import sleep

from celery import Celery

backend = 'db+mysql://root:@192.168.0.102/celery'
broker = 'amqp://guest@192.168.0.102:5672'

app = Celery('tasks', backend=backend, broker=broker)


@app.task
def add(x, y):
  sleep(10)
  return x + y


@app.task
def hostname():
  return subprocess.check_output(['hostname'])

バックエンドの追加に加えて、マルチサーバー操作をテストするために上記のwhoメソッドも追加されます。変更が完了した後も、以前と同様に起動されます。
Python インタラクション モデルも入力します:

>>> from tasks import add, hostname
>>> r = add.delay(4, 4)
>>> r.ready() # 10s内执行,会输出False,因为add中sleep了10s
>>>
>>> r = hostname.delay()
>>> r.result # 输出你的hostname

4. 複数のサーバーのテスト
上記のテストが完了すると、Celery は分散タスク管理と呼ばれるので、その分散はどこに反映されるのかという疑問が生じました。そのタスクはどのように実行されるのでしょうか?どのマシンで実行されましたか?
現在のサーバー上の celery サービスをシャットダウンせずに、同じ方法で別のサーバーに Celery をインストールして開始します。

$ celery -A tasks worker --loglevel=info

開始した前のサーバー上の Celery サービスの出力を確認します。 ホスト名サーバーが Rabbitmq に接続されている場合。
次に、Python 対話モードに入ります:

>>> from tasks import hostname
>>>
>>> for i in range(10):
...   r = hostname.delay()
...   print r.result # 输出你的hostname
>>>

入力内容を確認し、2 つのサーバーで celery サービスを開始したときの出力を観察します。

5. RabbitMQ のリモート接続の問題
最初のテスト中に、リモート サーバーはローカルの RabbitMQ サービスに接続できませんでした。その後、ファイル /usr/local/etc/ にアクセス許可を設定する必要があることが判明しました。 Rabbitmq/rabbitmq-env.conf、NODE_IP_ADDRESS=127.0.0.1 の IP を 0.0.0.0 に変更します。

6. 要約
この記事では、引き続き分散使用に焦点を当てながら、Celery の使用方法を簡単に紹介します。不快な点は、拡張するときにタスクを直接共有するのではなく、コード (tasks.py) を再デプロイする必要があることです。おそらく、Celery はさまざまなワーカーを照合するためにタスクを使用します。まだよくわかっていないので、詳しく使ってみてからお話します。


Python 環境での非同期タスク キュー パッケージ Celery のインストールと使用に関するその他の関連記事については、PHP 中国語 Web サイトに注目してください。


声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。