Home >Backend Development >Python Tutorial >Analyze Python's implementation of MQ message queue and the advantages of message queue

Analyze Python's implementation of MQ message queue and the advantages of message queue

巴扎黑
巴扎黑Original
2017-08-16 13:37:046247browse

The preservation role of message queue (MQ, Message Queue) in message data transmission provides guarantee and real-time processing convenience for data communication. Here we will take a look at the MQ message queue implementation of threads in Python and the message queue Advantage analysis

"Message queue" is a container that saves messages during the transmission of messages. The message queue manager acts as a middleman when relaying messages from its source to its destination. The main purpose of a queue is to provide routing and guarantee the delivery of messages; if the recipient is unavailable when the message is sent, Message Queue retains the message until it can be successfully delivered. I believe that message queue is a crucial component for any architecture or application. Here are ten reasons:

Python message queue example:

1.threading+Queue implementation Thread queue

#!/usr/bin/env python
 
import Queue
import threading
import time
 
queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
 """没打印一个数字等待1秒,并发打印10个数字需要多少秒?"""
 def __init__(self, queue):
  threading.Thread.__init__(self)
  self.queue = queue
 
 def run(self):
  whileTrue:
   #消费者端,从队列中获取num
   num = self.queue.get()
   print "i'm num %s"%(num)
   time.sleep(1)
   #在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号
   self.queue.task_done()
 
start = time.time()
def main():
 #产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发
 for i in range(10):
  t = ThreadNum(queue)
  t.setDaemon(True)
  t.start()
  
 #往队列中填错数据 
 for num in range(10):
   queue.put(num)
 #wait on the queue until everything has been processed
 queue.join()
 
main()
print "Elapsed Time: %s" % (time.time() - start)

Running results:

i'm num 0
i'm num 1
i'm num 2
i'm num 3
i'm num 4
i'm num 5
i'm num 6
i'm num 7
i'm num 8
i'm num 9
Elapsed Time: 1.01399993896

Interpretation:

The specific working steps are described as follows:

1, create an instance of Queue.Queue() , and then populate it with data.

2, pass the filled data instance to the thread class, which is created by inheriting threading.Thread.

3, generate a daemon thread pool.

4, take out one item from the queue each time, and use the data and run method in the thread to perform the corresponding work.

5. After completing this work, use the queue.task_done() function to send a signal to the queue that the task has been completed.

6. Performing a join operation on the queue actually means waiting until the queue is empty before exiting the main program.

One thing to note when using this mode: by setting the daemon thread to true, the program will automatically exit after running. The advantage is that you can perform a join operation on the queue or wait until the queue is empty before exiting.

2. Multiple queues

The so-called multiple queues, the output of one queue can be used as the input of another queue

#!/usr/bin/env python
import Queue
import threading
import time
 
queue = Queue.Queue()
out_queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
  def __init__(self, queue, out_queue):
    threading.Thread.__init__(self)
    self.queue = queue
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #从队列中取消息
      num = self.queue.get()
      bkeep = num
      
      #将bkeep放入队列中
      self.out_queue.put(bkeep)
 
      #signals to queue job is done
      self.queue.task_done()
 
class PrintLove(threading.Thread):
  def __init__(self, out_queue):
    threading.Thread.__init__(self)
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #从队列中获取消息并赋值给bkeep
      bkeep = self.out_queue.get()  
      keke = "I love " + str(bkeep)
      print keke,
      print self.getName()
      time.sleep(1)
 
      #signals to queue job is done
      self.out_queue.task_done()
 
start = time.time()
def main():
  #populate queue with data
  for num in range(10):
    queue.put(num)
    
  #spawn a pool of threads, and pass them queue instance
  for i in range(5):
    t = ThreadNum(queue, out_queue)
    t.setDaemon(True)
    t.start()
 
 
  for i in range(5):
    pl = PrintLove(out_queue)
    pl.setDaemon(True)
    pl.start()
 
  #wait on the queue until everything has been processed
  queue.join()
  out_queue.join()
 
main()
print "Elapsed Time: %s" % (time.time() - start)

Running results:

I love 0 Thread-6
I love 1 Thread-7
I love 2 Thread-8
I love 3 Thread-9
I love 4 Thread-10
I love 5 Thread-7
I love 6 Thread-6
I love 7 Thread-9
I love 8 Thread-8
I love 9 Thread-10
Elapsed Time: 2.00300002098

Interpretation:

ThreadNum class workflow

Define queue--->Inherit threading---->Initialize queue----> Define the run function--->get the data in the queue---->process the data---->put the data to another queue-->send a signal to tell the queue that the item has been processed

Main function workflow:

---> Throwing data into the custom queue

--->The for loop determines the number of threads to start-- -->Instantiate the ThreadNum class---->Start the thread and set up the daemon

--->The for loop determines the number of started threads---->Instantiate the PrintLove class--- >Start the thread and set it as a guard

--->Execute join after waiting for the messages in the queue to be processed. That is, exit the main program.

After understanding the general implementation of MQ, let’s summarize the advantages of message queues:

1. Decoupling

Predict future project problems at the beginning of the project It is extremely difficult to find what is needed. Message queue inserts an implicit, data-based interface layer in the middle of the processing process, and the processing processes on both sides must implement this interface. This allows you to extend or modify the processes on both sides independently, as long as they adhere to the same interface constraints.

2. Redundancy

Sometimes the process fails when processing data. Unless the data is persisted, it is lost forever. Message Queuing avoids the risk of data loss by persisting data until they have been completely processed. In the "insert-get-delete" paradigm used by many message queues, before deleting a message from the queue, your processing process needs to clearly indicate that the message has been processed to ensure that your data is safe. Save it until you're done using it.

3. Scalability

Because the message queue decouples your processing, it is easy to increase the frequency of message enqueuing and processing; just add additional processing. There is no need to change code or adjust parameters. Expanding is as easy as turning up the power button.

4. Flexibility & Peak Processing Capacity

When your application is on the homepage of Hacker News, you will find that the traffic increases to an unusual level. Your application still needs to continue to function when the number of visits increases dramatically, but such bursts of traffic are uncommon; it would be a huge waste to invest resources on standby based on the standard of being able to handle such peak visits. Using message queues allows critical components to withstand increased access pressure without completely collapsing due to overloaded requests. Check out our blog post on peak processing capabilities for more information on this.

5. Recoverability

When some components of the system fail, it will not affect the entire system. The message queue reduces the coupling between processes, so even if a process processing messages hangs up, the messages added to the queue can still be processed after the system recovers. The ability to allow requests to be retried or deferred is often the difference between a slightly inconvenienced user and a frustrated user.

6. Delivery Guarantee

The redundancy mechanism provided by the message queue ensures that the message can be actually processed, as long as one process reads the queue. On this basis, IronMQ provides a "delivery only once" guarantee. No matter how many processes are receiving data from the queue, each message can only be processed once. This is possible because getting a message simply "subscribes" to the message, temporarily removing it from the queue. Unless the client explicitly indicates that it has finished processing the message, the message will be put back into the queue and can be processed again after a configurable period of time.

7. Sorting Guarantee

In many cases, the order in which data is processed is important. The message queue is inherently sorted and can guarantee that data will be processed in a specific order. IronMO ensures that messages are processed in FIFO (first in, first out) order, so the position of messages in the queue is the position from which they were retrieved.

8. Buffering

In any significant system, there will be elements that require different processing times. For example, loading an image takes less time than applying a filter. Message queues use a buffering layer to help tasks execute most efficiently - writes to the queue are processed as quickly as possible, without being constrained by preparatory processing for reads from the queue. This buffering helps control and optimize the speed at which data flows through the system.

9. Understand data flow

In a distributed system, it is a huge challenge to get an overall impression of how long user operations will take and why. Message series can help identify underperforming processes or areas based on the frequency with which messages are processed, where the data flow is not optimized enough.

10. Asynchronous communication

Many times, you don’t want or need to process messages immediately. Message queues provide an asynchronous processing mechanism that allows you to put a message into the queue but not process it immediately. You can put as many messages as you want into the queue and process them when you feel like it.

The above is the detailed content of Analyze Python's implementation of MQ message queue and the advantages of message queue. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn