首頁  >  文章  >  後端開發  >  Python的Flask框架應用程式呼叫Redis隊列數據

Python的Flask框架應用程式呼叫Redis隊列數據

高洛峰
高洛峰原創
2017-03-03 15:02:302348瀏覽

任務非同步化

開啟瀏覽器,輸入位址,按下回車,開啟了頁面。於是一個HTTP請求(request)就由客戶端傳送到伺服器,伺服器處理請求,回傳回應(response)內容。

我們每天都在瀏覽網頁,發送大大小小的請求給伺服器。有時候,伺服器接到了請求,會發現他也需要給另外的伺服器發送請求,或者伺服器也需要做另外一些事情,於是最初們發送的請求就被阻塞了,也就是要等待伺服器完成其他的事情。

更多的時候,伺服器做的額外事情,並不需要客戶端等待,這時候就可以把這些額外的事情異步去做。從事非同步任務的工具有很多。主要原理還是處理通知訊息,針對通知訊息通常採取是佇列結構。生產和消費訊息進行通訊和業務實現。

生產消費與佇列
上述非同步任務的實現,可以抽象化為生產者消費模型。如同一個餐館,廚師在煮飯,吃貨在吃飯。如果廚師做了很多,暫時賣不完,廚師就會休息;如果客戶很多,廚師馬不停蹄的忙碌,客戶則需要慢慢等待。實作生產者與消費者的方式用很多,下面使用Python標準函式庫Queue寫個小例子:

#
import random
import time
from Queue import Queue
from threading import Thread

queue = Queue(10)

class Producer(Thread):
  def run(self):
    while True:
      elem = random.randrange(9)
      queue.put(elem)
      print "厨师 {} 做了 {} 饭 --- 还剩 {} 饭没卖完".format(self.name, elem, queue.qsize())
      time.sleep(random.random())

class Consumer(Thread):
  def run(self):
    while True:
      elem = queue.get()
      print "吃货{} 吃了 {} 饭 --- 还有 {} 饭可以吃".format(self.name, elem, queue.qsize())
      time.sleep(random.random())

def main():
  for i in range(3):
    p = Producer()
    p.start()
  for i in range(2):
    c = Consumer()
    c.start()

if __name__ == '__main__':
  main()

##大概輸出如下:

厨师 Thread-1 做了 1 饭 --- 还剩 1 饭没卖完
厨师 Thread-2 做了 8 饭 --- 还剩 2 饭没卖完
厨师 Thread-3 做了 3 饭 --- 还剩 3 饭没卖完
吃货Thread-4 吃了 1 饭 --- 还有 2 饭可以吃
吃货Thread-5 吃了 8 饭 --- 还有 1 饭可以吃
吃货Thread-4 吃了 3 饭 --- 还有 0 饭可以吃
厨师 Thread-1 做了 0 饭 --- 还剩 1 饭没卖完
厨师 Thread-2 做了 0 饭 --- 还剩 2 饭没卖完
厨师 Thread-1 做了 1 饭 --- 还剩 3 饭没卖完
厨师 Thread-1 做了 1 饭 --- 还剩 4 饭没卖完
吃货Thread-4 吃了 0 饭 --- 还有 3 饭可以吃
厨师 Thread-3 做了 3 饭 --- 还剩 4 饭没卖完
吃货Thread-5 吃了 0 饭 --- 还有 3 饭可以吃
吃货Thread-5 吃了 1 饭 --- 还有 2 饭可以吃
厨师 Thread-2 做了 8 饭 --- 还剩 3 饭没卖完
厨师 Thread-2 做了 8 饭 --- 还剩 4 饭没卖完

Redis 佇列
Python內建了一個好用的佇列結構。我們也可以是用redis實現類似的操作。並做一個簡單的非同步任務。

Redis提供了兩種方式來做訊息佇列。一個是使用生產者消費模式模式,另一個方法就是發布訂閱者模式。前者會讓一個或多個客戶端監聽訊息佇列,一旦訊息到達,消費者馬上消費,誰先搶到算誰的,如果佇列裡沒有訊息,則消費者繼續監聽。後者也是一個或多個客戶端訂閱訊息頻道,只要發布者發布訊息,所有訂閱者都能收到訊息,訂閱者都是ping的。

生產消費模式
主要使用了redis提供的blpop獲取隊列數據,如果隊列沒有數據則阻塞等待,也就是監聽。

import redis

class Task(object):
  def __init__(self):
    self.rcon = redis.StrictRedis(host='localhost', db=5)
    self.queue = 'task:prodcons:queue'

  def listen_task(self):
    while True:
      task = self.rcon.blpop(self.queue, 0)[1]
      print "Task get", task

if __name__ == '__main__':
  print 'listen task queue'
  Task().listen_task()

#發布訂閱模式
#使用redis的pubsub功能,訂閱者訂閱頻道,發布者發布訊息到頻道了,頻道就是一個訊息隊列。

import redis


class Task(object):

  def __init__(self):
    self.rcon = redis.StrictRedis(host='localhost', db=5)
    self.ps = self.rcon.pubsub()
    self.ps.subscribe('task:pubsub:channel')

  def listen_task(self):
    for i in self.ps.listen():
      if i['type'] == 'message':
        print "Task get", i['data']

if __name__ == '__main__':
  print 'listen task channel'
  Task().listen_task()

Flask 入口
我們分別實現了兩種非同步任務的後端服務,直接啟動他們,就能監聽redis隊列或頻道的消息了。簡單的測試如下:

import redis
import random
import logging
from flask import Flask, redirect

app = Flask(__name__)

rcon = redis.StrictRedis(host='localhost', db=5)
prodcons_queue = 'task:prodcons:queue'
pubsub_channel = 'task:pubsub:channel'

@app.route('/')
def index():

  html = """
<br>
<center><h3>Redis Message Queue</h3>
<br>
<a href="/prodcons">生产消费者模式</a>
<br>
<br>
<a href="/pubsub">发布订阅者模式</a>
</center>
"""
  return html


@app.route(&#39;/prodcons&#39;)
def prodcons():
  elem = random.randrange(10)
  rcon.lpush(prodcons_queue, elem)
  logging.info("lpush {} -- {}".format(prodcons_queue, elem))
  return redirect(&#39;/&#39;)

@app.route(&#39;/pubsub&#39;)
def pubsub():
  ps = rcon.pubsub()
  ps.subscribe(pubsub_channel)
  elem = random.randrange(10)
  rcon.publish(pubsub_channel, elem)
  return redirect(&#39;/&#39;)

if __name__ == &#39;__main__&#39;:
  app.run(debug=True)

啟動腳本,使用

##
siege -c10 -r 5 http://127.0.0.1:5000/prodcons
siege -c10 -r 5 http://127.0.0.1:5000/pubsub

可以分別在監聽的腳本輸入中看到非同步訊息。在非同步的任務中,可以執行一些耗時的操作,當然目前這些做法並不知道非同步的執行結果,如果需要知道非同步的執行結果,可以考慮設計協程任務或使用一些工具如RQ或者celery等。

更多Python的Flask框架應用呼叫Redis隊列資料相關文章請關注PHP中文網!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn