Home  >  Article  >  Backend Development  >  Python's Flask framework application calls Redis queue data

Python's Flask framework application calls Redis queue data

高洛峰
高洛峰Original
2017-03-03 15:02:302356browse

Task asynchronousization

Open the browser, enter the address, press Enter, and open the page. So an HTTP request (request) is sent from the client to the server, and the server processes the request and returns the response (response) content.

We browse the web every day and send large and small requests to the server. Sometimes, when the server receives a request, it will find that it also needs to send a request to another server, or the server also needs to do something else, so the request they originally sent is blocked, that is, it has to wait for the server to complete other things.

More often, the additional things done by the server do not require the client to wait. At this time, these additional things can be done asynchronously. There are many tools for doing asynchronous tasks. The main principle is still to process notification messages. For notification messages, a queue structure is usually adopted. Produce and consume messages for communication and business implementation.

Production, consumption and queue
The implementation of the above asynchronous tasks can be abstracted into the producer consumption model. Just like a restaurant, the chef is cooking and the foodies are eating. If the chef cooks a lot and cannot sell all the items for the time being, the chef will take a break; if there are many customers and the chef is busy non-stop, the customers need to wait slowly. There are many ways to implement producers and consumers. Here is a small example using the Python standard library Queue:

import random
import time
from Queue import Queue
from threading import Thread

queue = Queue(10)

class Producer(Thread):
  def run(self):
    while True:
      elem = random.randrange(9)
      queue.put(elem)
      print "厨师 {} 做了 {} 饭 --- 还剩 {} 饭没卖完".format(self.name, elem, queue.qsize())
      time.sleep(random.random())

class Consumer(Thread):
  def run(self):
    while True:
      elem = queue.get()
      print "吃货{} 吃了 {} 饭 --- 还有 {} 饭可以吃".format(self.name, elem, queue.qsize())
      time.sleep(random.random())

def main():
  for i in range(3):
    p = Producer()
    p.start()
  for i in range(2):
    c = Consumer()
    c.start()

if __name__ == '__main__':
  main()

The approximate output is as follows:

厨师 Thread-1 做了 1 饭 --- 还剩 1 饭没卖完
厨师 Thread-2 做了 8 饭 --- 还剩 2 饭没卖完
厨师 Thread-3 做了 3 饭 --- 还剩 3 饭没卖完
吃货Thread-4 吃了 1 饭 --- 还有 2 饭可以吃
吃货Thread-5 吃了 8 饭 --- 还有 1 饭可以吃
吃货Thread-4 吃了 3 饭 --- 还有 0 饭可以吃
厨师 Thread-1 做了 0 饭 --- 还剩 1 饭没卖完
厨师 Thread-2 做了 0 饭 --- 还剩 2 饭没卖完
厨师 Thread-1 做了 1 饭 --- 还剩 3 饭没卖完
厨师 Thread-1 做了 1 饭 --- 还剩 4 饭没卖完
吃货Thread-4 吃了 0 饭 --- 还有 3 饭可以吃
厨师 Thread-3 做了 3 饭 --- 还剩 4 饭没卖完
吃货Thread-5 吃了 0 饭 --- 还有 3 饭可以吃
吃货Thread-5 吃了 1 饭 --- 还有 2 饭可以吃
厨师 Thread-2 做了 8 饭 --- 还剩 3 饭没卖完
厨师 Thread-2 做了 8 饭 --- 还剩 4 饭没卖完

Redis Queue
Python has a built-in useful queue structure. We can also use redis to implement similar operations. and do a simple asynchronous task.

Redis provides two ways to do message queues. One is to use the producer consumption model, and the other is the publish-subscriber model. The former will let one or more clients monitor the message queue. Once the message arrives, the consumer will consume it immediately. Whoever grabs it first will be the winner. If there is no message in the queue, the consumer will continue to listen. The latter is also one or more clients subscribing to the message channel. As long as the publisher publishes the message, all subscribers can receive the message and the subscribers are pinged.

Production and consumption mode
Mainly uses blpop provided by redis to obtain queue data. If there is no data in the queue, it will block and wait, that is, listening.

import redis

class Task(object):
  def __init__(self):
    self.rcon = redis.StrictRedis(host='localhost', db=5)
    self.queue = 'task:prodcons:queue'

  def listen_task(self):
    while True:
      task = self.rcon.blpop(self.queue, 0)[1]
      print "Task get", task

if __name__ == '__main__':
  print 'listen task queue'
  Task().listen_task()

Publish and subscribe mode
Using the pubsub function of redis, the subscriber subscribes to the channel, and the publisher publishes the message to the channel. A channel is a message queue.

import redis


class Task(object):

  def __init__(self):
    self.rcon = redis.StrictRedis(host='localhost', db=5)
    self.ps = self.rcon.pubsub()
    self.ps.subscribe('task:pubsub:channel')

  def listen_task(self):
    for i in self.ps.listen():
      if i['type'] == 'message':
        print "Task get", i['data']

if __name__ == '__main__':
  print 'listen task channel'
  Task().listen_task()

Flask entrance
We have implemented two back-end services for asynchronous tasks respectively. Start them directly and you can monitor them. Messages from the redis queue or channel. The simple test is as follows:

import redis
import random
import logging
from flask import Flask, redirect

app = Flask(__name__)

rcon = redis.StrictRedis(host='localhost', db=5)
prodcons_queue = 'task:prodcons:queue'
pubsub_channel = 'task:pubsub:channel'

@app.route('/')
def index():

  html = """
<br>
<center><h3>Redis Message Queue</h3>
<br>
<a href="/prodcons">生产消费者模式</a>
<br>
<br>
<a href="/pubsub">发布订阅者模式</a>
</center>
"""
  return html


@app.route(&#39;/prodcons&#39;)
def prodcons():
  elem = random.randrange(10)
  rcon.lpush(prodcons_queue, elem)
  logging.info("lpush {} -- {}".format(prodcons_queue, elem))
  return redirect(&#39;/&#39;)

@app.route(&#39;/pubsub&#39;)
def pubsub():
  ps = rcon.pubsub()
  ps.subscribe(pubsub_channel)
  elem = random.randrange(10)
  rcon.publish(pubsub_channel, elem)
  return redirect(&#39;/&#39;)

if __name__ == &#39;__main__&#39;:
  app.run(debug=True)

Start the script and use

siege -c10 -r 5 http://127.0.0.1:5000/prodcons
siege -c10 -r 5 http://127.0.0.1:5000/pubsub

to start the script respectively. Asynchronous messages are seen in the listened script input. In asynchronous tasks, you can perform some time-consuming operations. Of course, these current methods do not know the asynchronous execution results. If you need to know the asynchronous execution results, you can consider designing coroutine tasks or using some tools such as RQ or celery.

For more articles related to Python’s Flask framework application calling Redis queue data, please pay attention to the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn