ホームページ  >  記事  >  バックエンド開発  >  Python の Flask フレームワーク アプリケーションが Redis キュー データを呼び出す

Python の Flask フレームワーク アプリケーションが Redis キュー データを呼び出す

高洛峰
高洛峰オリジナル
2017-03-03 15:02:302290ブラウズ

タスクの非同期

ブラウザを開いてアドレスを入力し、Enterを押してページを開きます。したがって、HTTP リクエスト (リクエスト) がクライアントからサーバーに送信され、サーバーはリクエストを処理して応答 (レスポンス) コンテンツを返します。

私たちは毎日ウェブを閲覧し、大小さまざまなリクエストをサーバーに送信します。場合によっては、サーバーがリクエストを受信したときに、別のサーバーにもリクエストを送信する必要があることが判明したり、サーバーが別のことを行う必要があることが判明したりするため、最初に送信したリクエストがブロックされる、つまり待機する必要があることがわかります。サーバーが他の作業を完了するために使用します。

多くの場合、サーバーによって実行される追加の処理は、クライアントが待機する必要はありません。これらの追加の処理は非同期で実行できます。非同期タスクを実行するためのツールは数多くあります。主な原則は、通知メッセージを処理することです。通常、通知メッセージにはキュー構造が採用されます。コミュニケーションとビジネス実装のためのメッセージを生成および消費します。

生産、消費、キュー
上記の非同期タスクの実装は、プロデューサー消費モデルに抽象化できます。レストランと同じように、シェフが料理を作り、美食家が食事をします。たくさん作って当分売れない場合は、シェフは休憩しますが、お客様が多くてシェフが休みなく忙しい場合は、お客様はゆっくり待っていただく必要があります。プロデューサーとコンシューマーを実装するには多くの方法があります。Python 標準ライブラリ Queue を使用した小さな例を次に示します。

import random
import time
from Queue import Queue
from threading import Thread

queue = Queue(10)

class Producer(Thread):
  def run(self):
    while True:
      elem = random.randrange(9)
      queue.put(elem)
      print "厨师 {} 做了 {} 饭 --- 还剩 {} 饭没卖完".format(self.name, elem, queue.qsize())
      time.sleep(random.random())

class Consumer(Thread):
  def run(self):
    while True:
      elem = queue.get()
      print "吃货{} 吃了 {} 饭 --- 还有 {} 饭可以吃".format(self.name, elem, queue.qsize())
      time.sleep(random.random())

def main():
  for i in range(3):
    p = Producer()
    p.start()
  for i in range(2):
    c = Consumer()
    c.start()

if __name__ == '__main__':
  main()

おおよその出力は次のとおりです。

厨师 Thread-1 做了 1 饭 --- 还剩 1 饭没卖完
厨师 Thread-2 做了 8 饭 --- 还剩 2 饭没卖完
厨师 Thread-3 做了 3 饭 --- 还剩 3 饭没卖完
吃货Thread-4 吃了 1 饭 --- 还有 2 饭可以吃
吃货Thread-5 吃了 8 饭 --- 还有 1 饭可以吃
吃货Thread-4 吃了 3 饭 --- 还有 0 饭可以吃
厨师 Thread-1 做了 0 饭 --- 还剩 1 饭没卖完
厨师 Thread-2 做了 0 饭 --- 还剩 2 饭没卖完
厨师 Thread-1 做了 1 饭 --- 还剩 3 饭没卖完
厨师 Thread-1 做了 1 饭 --- 还剩 4 饭没卖完
吃货Thread-4 吃了 0 饭 --- 还有 3 饭可以吃
厨师 Thread-3 做了 3 饭 --- 还剩 4 饭没卖完
吃货Thread-5 吃了 0 饭 --- 还有 3 饭可以吃
吃货Thread-5 吃了 1 饭 --- 还有 2 饭可以吃
厨师 Thread-2 做了 8 饭 --- 还剩 3 饭没卖完
厨师 Thread-2 做了 8 饭 --- 还剩 4 饭没卖完

Redis Queue
組み込み商品 使用されるキュー構造。 Redis を使用して同様の操作を実装することもできます。そして簡単な非同期タスクを実行します。

Redis では、メッセージ キューを実行する 2 つの方法が提供されます。 1 つはプロデューサー消費モデルを使用する方法、もう 1 つはパブリッシュ/サブスクライバー モデルを使用する方法です。前者では、1 つ以上のクライアントがメッセージ キューを監視できるようになり、メッセージが到着すると、最初にメッセージを取得した人がそのメッセージを消費し、キューにメッセージが存在しない場合は、コンシューマがリッスンし続けます。後者は、メッセージ チャネルにサブスクライブしている 1 つ以上のクライアントでもあり、パブリッシャがメッセージを公開している限り、すべてのサブスクライバはメッセージを受信でき、サブスクライバは ping されます。

生産および消費モード
主に、redis が提供する blpop を使用してキューデータを取得します。キューにデータがない場合は、ブロックして待機します。つまり、リッスンします。

import redis

class Task(object):
  def __init__(self):
    self.rcon = redis.StrictRedis(host='localhost', db=5)
    self.queue = 'task:prodcons:queue'

  def listen_task(self):
    while True:
      task = self.rcon.blpop(self.queue, 0)[1]
      print "Task get", task

if __name__ == '__main__':
  print 'listen task queue'
  Task().listen_task()

パブリッシュとサブスクライブモデル
Redisのpubsub機能を使用して、サブスクライバーがチャネルにサブスクライブし、パブリッシャーがメッセージをチャネルにパブリッシュします。

import redis


class Task(object):

  def __init__(self):
    self.rcon = redis.StrictRedis(host='localhost', db=5)
    self.ps = self.rcon.pubsub()
    self.ps.subscribe('task:pubsub:channel')

  def listen_task(self):
    for i in self.ps.listen():
      if i['type'] == 'message':
        print "Task get", i['data']

if __name__ == '__main__':
  print 'listen task channel'
  Task().listen_task()

Flask入口
非同期タスク用に2つのバックエンドサービスをそれぞれ実装しました。これらを直接起動することで、Redisキューまたはチャネルのメッセージを監視できます。簡単なテストは次のとおりです。

import redis
import random
import logging
from flask import Flask, redirect

app = Flask(__name__)

rcon = redis.StrictRedis(host='localhost', db=5)
prodcons_queue = 'task:prodcons:queue'
pubsub_channel = 'task:pubsub:channel'

@app.route('/')
def index():

  html = """
<br>
<center><h3>Redis Message Queue</h3>
<br>
<a href="/prodcons">生产消费者模式</a>
<br>
<br>
<a href="/pubsub">发布订阅者模式</a>
</center>
"""
  return html


@app.route(&#39;/prodcons&#39;)
def prodcons():
  elem = random.randrange(10)
  rcon.lpush(prodcons_queue, elem)
  logging.info("lpush {} -- {}".format(prodcons_queue, elem))
  return redirect(&#39;/&#39;)

@app.route(&#39;/pubsub&#39;)
def pubsub():
  ps = rcon.pubsub()
  ps.subscribe(pubsub_channel)
  elem = random.randrange(10)
  rcon.publish(pubsub_channel, elem)
  return redirect(&#39;/&#39;)

if __name__ == &#39;__main__&#39;:
  app.run(debug=True)

スクリプトを開始し、

siege -c10 -r 5 http://127.0.0.1:5000/prodcons
siege -c10 -r 5 http://127.0.0.1:5000/pubsub

を使用して、監視対象のスクリプト入力内の非同期メッセージをそれぞれ確認します。非同期タスクでは、時間のかかる操作を実行できますが、現時点ではこれらのメソッドでは非同期実行結果がわかりません。非同期実行結果を知る必要がある場合は、コルーチン タスクを設計するか、RQ などのツールを使用することを検討してください。セロリとか。

Redis キュー データを呼び出す Python の Flask フレームワーク アプリケーションに関連するその他の記事については、PHP 中国語 Web サイトに注目してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。