ホームページ >バックエンド開発 >Python チュートリアル >Python の Flask フレームワーク アプリケーションが Redis キュー データを呼び出す
タスクの非同期
ブラウザを開いてアドレスを入力し、Enterを押してページを開きます。したがって、HTTP リクエスト (リクエスト) がクライアントからサーバーに送信され、サーバーはリクエストを処理して応答 (レスポンス) コンテンツを返します。
私たちは毎日ウェブを閲覧し、大小さまざまなリクエストをサーバーに送信します。場合によっては、サーバーがリクエストを受信したときに、別のサーバーにもリクエストを送信する必要があることが判明したり、サーバーが別のことを行う必要があることが判明したりするため、最初に送信したリクエストがブロックされる、つまり待機する必要があることがわかります。サーバーが他の作業を完了するために使用します。
多くの場合、サーバーによって実行される追加の処理は、クライアントが待機する必要はありません。これらの追加の処理は非同期で実行できます。非同期タスクを実行するためのツールは数多くあります。主な原則は、通知メッセージを処理することです。通常、通知メッセージにはキュー構造が採用されます。コミュニケーションとビジネス実装のためのメッセージを生成および消費します。
生産、消費、キュー
上記の非同期タスクの実装は、プロデューサー消費モデルに抽象化できます。レストランと同じように、シェフが料理を作り、美食家が食事をします。たくさん作って当分売れない場合は、シェフは休憩しますが、お客様が多くてシェフが休みなく忙しい場合は、お客様はゆっくり待っていただく必要があります。プロデューサーとコンシューマーを実装するには多くの方法があります。Python 標準ライブラリ Queue を使用した小さな例を次に示します。
import random import time from Queue import Queue from threading import Thread queue = Queue(10) class Producer(Thread): def run(self): while True: elem = random.randrange(9) queue.put(elem) print "厨师 {} 做了 {} 饭 --- 还剩 {} 饭没卖完".format(self.name, elem, queue.qsize()) time.sleep(random.random()) class Consumer(Thread): def run(self): while True: elem = queue.get() print "吃货{} 吃了 {} 饭 --- 还有 {} 饭可以吃".format(self.name, elem, queue.qsize()) time.sleep(random.random()) def main(): for i in range(3): p = Producer() p.start() for i in range(2): c = Consumer() c.start() if __name__ == '__main__': main()
おおよその出力は次のとおりです。
厨师 Thread-1 做了 1 饭 --- 还剩 1 饭没卖完 厨师 Thread-2 做了 8 饭 --- 还剩 2 饭没卖完 厨师 Thread-3 做了 3 饭 --- 还剩 3 饭没卖完 吃货Thread-4 吃了 1 饭 --- 还有 2 饭可以吃 吃货Thread-5 吃了 8 饭 --- 还有 1 饭可以吃 吃货Thread-4 吃了 3 饭 --- 还有 0 饭可以吃 厨师 Thread-1 做了 0 饭 --- 还剩 1 饭没卖完 厨师 Thread-2 做了 0 饭 --- 还剩 2 饭没卖完 厨师 Thread-1 做了 1 饭 --- 还剩 3 饭没卖完 厨师 Thread-1 做了 1 饭 --- 还剩 4 饭没卖完 吃货Thread-4 吃了 0 饭 --- 还有 3 饭可以吃 厨师 Thread-3 做了 3 饭 --- 还剩 4 饭没卖完 吃货Thread-5 吃了 0 饭 --- 还有 3 饭可以吃 吃货Thread-5 吃了 1 饭 --- 还有 2 饭可以吃 厨师 Thread-2 做了 8 饭 --- 还剩 3 饭没卖完 厨师 Thread-2 做了 8 饭 --- 还剩 4 饭没卖完
Redis Queue
組み込み商品 使用されるキュー構造。 Redis を使用して同様の操作を実装することもできます。そして簡単な非同期タスクを実行します。
Redis では、メッセージ キューを実行する 2 つの方法が提供されます。 1 つはプロデューサー消費モデルを使用する方法、もう 1 つはパブリッシュ/サブスクライバー モデルを使用する方法です。前者では、1 つ以上のクライアントがメッセージ キューを監視できるようになり、メッセージが到着すると、最初にメッセージを取得した人がそのメッセージを消費し、キューにメッセージが存在しない場合は、コンシューマがリッスンし続けます。後者は、メッセージ チャネルにサブスクライブしている 1 つ以上のクライアントでもあり、パブリッシャがメッセージを公開している限り、すべてのサブスクライバはメッセージを受信でき、サブスクライバは ping されます。
生産および消費モード
主に、redis が提供する blpop を使用してキューデータを取得します。キューにデータがない場合は、ブロックして待機します。つまり、リッスンします。
import redis class Task(object): def __init__(self): self.rcon = redis.StrictRedis(host='localhost', db=5) self.queue = 'task:prodcons:queue' def listen_task(self): while True: task = self.rcon.blpop(self.queue, 0)[1] print "Task get", task if __name__ == '__main__': print 'listen task queue' Task().listen_task()
パブリッシュとサブスクライブモデル
Redisのpubsub機能を使用して、サブスクライバーがチャネルにサブスクライブし、パブリッシャーがメッセージをチャネルにパブリッシュします。
import redis class Task(object): def __init__(self): self.rcon = redis.StrictRedis(host='localhost', db=5) self.ps = self.rcon.pubsub() self.ps.subscribe('task:pubsub:channel') def listen_task(self): for i in self.ps.listen(): if i['type'] == 'message': print "Task get", i['data'] if __name__ == '__main__': print 'listen task channel' Task().listen_task()
Flask入口
非同期タスク用に2つのバックエンドサービスをそれぞれ実装しました。これらを直接起動することで、Redisキューまたはチャネルのメッセージを監視できます。簡単なテストは次のとおりです。
import redis import random import logging from flask import Flask, redirect app = Flask(__name__) rcon = redis.StrictRedis(host='localhost', db=5) prodcons_queue = 'task:prodcons:queue' pubsub_channel = 'task:pubsub:channel' @app.route('/') def index(): html = """ <br> <center><h3>Redis Message Queue</h3> <br> <a href="/prodcons">生产消费者模式</a> <br> <br> <a href="/pubsub">发布订阅者模式</a> </center> """ return html @app.route('/prodcons') def prodcons(): elem = random.randrange(10) rcon.lpush(prodcons_queue, elem) logging.info("lpush {} -- {}".format(prodcons_queue, elem)) return redirect('/') @app.route('/pubsub') def pubsub(): ps = rcon.pubsub() ps.subscribe(pubsub_channel) elem = random.randrange(10) rcon.publish(pubsub_channel, elem) return redirect('/') if __name__ == '__main__': app.run(debug=True)
スクリプトを開始し、
siege -c10 -r 5 http://127.0.0.1:5000/prodcons siege -c10 -r 5 http://127.0.0.1:5000/pubsub
を使用して、監視対象のスクリプト入力内の非同期メッセージをそれぞれ確認します。非同期タスクでは、時間のかかる操作を実行できますが、現時点ではこれらのメソッドでは非同期実行結果がわかりません。非同期実行結果を知る必要がある場合は、コルーチン タスクを設計するか、RQ などのツールを使用することを検討してください。セロリとか。
Redis キュー データを呼び出す Python の Flask フレームワーク アプリケーションに関連するその他の記事については、PHP 中国語 Web サイトに注目してください。