Home  >  Article  >  Backend Development  >  Queue module and source code analysis

Queue module and source code analysis

高洛峰
高洛峰Original
2016-11-03 17:31:181682browse

The Queue module is a module that provides queue operations. Queue is the most commonly used form of exchanging data between threads. This module provides three queues:

Queue.Queue(maxsize): first in, first out, maxsize is the size of the queue, and when its value is a non-positive number, it is a wireless circular queue

Queue.LifoQueue(maxsize): last in, first out , equivalent to stack

Queue.PriorityQueue(maxsize): priority queue.

Among them LifoQueue and PriorityQueue are subclasses of Queue. The three have the following common methods:

qsize(): Returns the approximate queue size. Why add the word "approximately"? Because when the value is greater than 0, it does not guarantee that the get() method will not be blocked during concurrent execution. Similarly, it is valid for the put() method.

empty(): Returns a Boolean value. When the queue is empty, it returns True, otherwise it returns False.

full(): When the queue size is set, if the queue is full, it returns True, otherwise it returns False.

put(item[,block[,timeout]]): Add element item to the queue. When block is set to False, a Full exception will be thrown if the queue is full. If block is set to True and timeout is set to None, it will wait until there is space before adding it to the queue; otherwise, a Full exception will be thrown based on the timeout value set by timeout.

put_nowwait(item): Equivalent to put(item,False). When block is set to False, if the queue is empty, an Empty exception will be thrown. If block is set to True and timeout is set to None, it will wait until there is space before adding it to the queue; otherwise, an Empty exception will be thrown based on the timeout value set by timeout.

get([block[,timeout]]): Removes an element from the queue and returns the value of the element. If timeout is a positive number, it will block for up to timeout seconds and if there are no items available within that time , an Empty exception is thrown.

get_nowwait(): Equivalent to get(False)

task_done(): Sends a signal to indicate that the enqueuing task has been completed, often used in consumer threads.

join(): Block until all elements of the queue are processed, and then process other operations.

(1) Source code analysis

The Queue module is very simple to use, but I think it is necessary to post the relevant source code of the module and analyze it. You will learn a lot and see how good the code written by the masters is. Beautiful, how structured and modular, it makes me cry when I think about the code I wrote, come and learn. In order to reduce the length, the comment part of the source code has been deleted.

from time import time as _time
try:
    import threading as _threading
except ImportError:
    import dummy_threading as _threading
from collections import deque
import heapq
 
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
 
class Empty(Exception):
    "Exception raised by Queue.get(block=0)/get_nowait()."
    pass
 
class Full(Exception):
    "Exception raised by Queue.put(block=0)/put_nowait()."
    pass
 
class Queue:
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)
        self.mutex = _threading.Lock()
        self.not_empty = _threading.Condition(self.mutex)
        self.not_full = _threading.Condition(self.mutex)
        self.all_tasks_done = _threading.Condition(self.mutex)
        self.unfinished_tasks = 
       
    def get_nowait(self):
        return self.get(False)
    def _init(self, maxsize):
        self.queue = deque()
    def _qsize(self, len=len):
        return len(self.queue)
    def _put(self, item):
        self.queue.append(item)
    def _get(self):
        return self.queue.popleft()

Through the analysis of the following functions, we know that the Queue object is based on the queue of the collections module (for the collections module, refer to Python: Using Counter for counting statistics and collections module), plus the threading module mutex lock and condition variable encapsulation of.

deque is a double-ended queue, very suitable for queues and stacks. The above Queue object is a first-in, first-out queue, so first the _init() function defines a double-ended queue, and then it defines the _put() and _get() functions, which are from the right side of the double-ended queue. Adding elements and deleting elements on the left constitutes a first-in-first-out queue. In the same way, it is easy to think of the implementation of LifoQueue (last-in-first-out queue). Just ensure that the right side of the queue is added and the right is deleted. You can post the source code for a look.

class LifoQueue(Queue):
    '''Variant of Queue that retrieves most recently added entries first.'''
 
    def _init(self, maxsize):
        self.queue = []
 
    def _qsize(self, len=len):
        return len(self.queue)
 
    def _put(self, item):
        self.queue.append(item)
 
    def _get(self):
        return self.queue.pop()

Although its "queue" does not use queue(), it is the same with a list, because the list append() and pop() operations are to add elements to the rightmost element and delete the rightmost element.

Let’s take a look at PriorityQueue. It is a priority queue. The heappush() and heappop() functions of the heapq module are used here. The heapq module modularizes the data structure of the heap and can build this data structure. At the same time, the heapq module also provides corresponding methods to operate the heap. Among them, self.queue=[] in the _init() function can be regarded as creating an empty heap. heappush() inserts a new value into the heap, and heappop() pops the minimum value from the heap, which can achieve priority (here is a brief introduction to the heapq module). The source code is as follows:

class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).
 
    Entries are typically tuples of the form:  (priority number, data).
    '''
 
    def _init(self, maxsize):
        self.queue = []
 
    def _qsize(self, len=len):
        return len(self.queue)
 
    def _put(self, item, heappush=heapq.heappush):
        heappush(self.queue, item)
 
    def _get(self, heappop=heapq.heappop):
        return heappop(self.queue)

The basic data structure has been analyzed, and then the other parts are analyzed.

mutex is a threading.Lock() object, a mutex lock; not_empty, not_full, and all_tasks_done are all threading.Condition() objects and condition variables, and they maintain the same lock object mutex (about the threading module For Lock objects and Condition objects, please refer to the previous blog post Python: Threads, Processes and Coroutines (2) - threading module).

Among them:

self.mutex mutex lock: Any operation that obtains the status of the queue (empty(), qsize(), etc.) or modifies the content of the queue (get, put, etc.) must hold this mutex lock . acquire() acquires the lock and release() releases the lock. At the same time, the mutex lock is jointly maintained by three condition variables.

self.not_empty condition variable: After the thread adds data to the queue, it will call self.not_empty.notify() to notify other threads, and then wake up a thread that removes elements.

self.not_full condition variable: When an element is removed from the queue, a thread that adds elements will be awakened.

self.all_tasks_done condition variable: When the number of unfinished tasks is deleted to 0, notify all tasks to complete

self.unfinished_tasks : Define the number of unfinished tasks


Let’s look at the main method again:

(1 )put()

源代码如下:

def put(self, item, block=True, timeout=None):
        self.not_full.acquire()                  #not_full获得锁
        try:
            if self.maxsize > 0:                 #如果队列长度有限制
                if not block:                    #如果没阻塞
                    if self._qsize() == self.maxsize:   #如果队列满了抛异常
                        raise Full
                elif timeout is None:           #有阻塞且超时为空,等待
                    while self._qsize() == self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("&#39;timeout&#39; must be a non-negative number")
                else:        #如果有阻塞,且超时非负时,结束时间=当前时间+超时时间
                    endtime = _time() + timeout
                    while self._qsize() == self.maxsize:
                        remaining = endtime - _time()
                        if remaining <= 0.0:       #到时后,抛异常
                            raise Full
                            #如果没到时,队列是满的就会一直被挂起,直到有“位置”腾出
                        self.not_full.wait(remaining)
            self._put(item)                    #调用_put方法,添加元素
            self.unfinished_tasks += 1         #未完成任务+1
            self.not_empty.notify()             #通知非空,唤醒非空挂起的任务
        finally:
            self.not_full.release()            #not_full释放锁

    默认情况下block为True,timeout为None。如果队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),并且未完成任务加1还会通知队列非空。

    如果设置block参数为Flase,队列满时则会抛异常。如果设置了超时那么在时间到之前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操作。

(2)get()

源码如下:

def get(self, block=True, timeout=None):
         
        self.not_empty.acquire()                #not_empty获得锁
        try:
            if not block:                       #不阻塞时
                if not self._qsize():           #队列为空时抛异常
                    raise Empty
            elif timeout is None:               #不限时时,队列为空则会等待
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("&#39;timeout&#39; must be a non-negative number")
            else:
                endtime = _time() + timeout
                while not self._qsize():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()                  #调用_get方法,移除并获得项目
            self.not_full.notify()              #通知非满
            return item                        #返回项目
        finally:
            self.not_empty.release()            #释放锁

   逻辑跟put()函数一样,参数默认情况下队列空了则会等待,否则将会调用_get方法(往下看)移除并获得一个项,最后返回这个项。这个方法使用not_empty对象进行操作。

        不过我觉得put()与get()两个函数结合起来理解比较好。not_full与not_empty代表的是两种不同操作类型的线程,not_full可以理解成is-not-full,即队列是否满了,默认是没有满,没有满时not_full这个条件变量才能获取锁,并做一些条件判断,只有符合条件才能向队列里加元素,添加成功后就会通知not_empty条件变量队列里不是空的,“我”刚刚添加进了一个元素,满足可以执行删除动作的基本条件了(队列不是空的,想想如果是空的执行删除动作就没有意义了),同时唤醒一些被挂起的执行移除动作的线程,让这些线程重新判断条件,如果条件准许就会执行删除动作,然后又通知not_full条件变量,告诉“它”队列不是满的,因为“我”刚才删除了一个元素(想想如果队列满了添加元素就添加不进呀,就没意义了),满足了添加元素的基本条件(队列不是满的),同时唤醒一些被挂起的执行添加动作的线程,这些线程又会进行条件判断,符合条件就会添加元素,否则继续挂起,依次类推,同时这样也保证了线程的安全。正与前面所说,当一个元素被移除出队列时,会唤醒一个添加元素的线程;当添加一个元素时会唤醒一个删除元素的线程。

    

(3)task_done()

源码如下:

def task_done(self):
    
        self.all_tasks_done.acquire()       #获得锁
        try:
            unfinished = self.unfinished_tasks - 1  #判断队列中一个线程的任务是否全部完成
            if unfinished <= 0:                     #是则进行通知,或在过量调用时报异常
                if unfinished < 0:
                    raise ValueError(&#39;task_done() called too many times&#39;)
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished      #否则未完成任务数量-1
        finally:
            self.all_tasks_done.release()           #最后释放锁

    这个方法判断队列中一个线程的任务是否全部完成,首先会通过all_tasks_done对象获得锁,如果是则进行通知,最后释放锁。


(4)join()

源码如下:

def join(self):
 
    self.all_tasks_done.acquire()
    try:
        while self.unfinished_tasks:        #如果有未完成的任务,将调用wait()方法等待
            self.all_tasks_done.wait()
    finally:
        self.all_tasks_done.release()

阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。


其它的方法都比较简单,也比较好理解,有兴趣可以去看看Queue.py里的源码,要注意的是任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有互斥锁mutex。

(二)简单例子

实现一个线程不断生成一个随机数到一个队列中

实现一个线程从上面的队列里面不断的取出奇数

实现另外一个线程从上面的队列里面不断取出偶数

import random,threading,time
from Queue import Queue
is_product = True
class Producer(threading.Thread):
    """生产数据"""
    def __init__(self, t_name, queue):
       threading.Thread.__init__(self,name=t_name)
       self.data=queue
    def run(self):
        while 1:
 
            if self.data.full():
                global is_product
                is_product = False
            else:
                if self.data.qsize() <= 7:#队列长度小于等于7时添加元素
                    is_product = True
                    for i in range(2): #每次向队列里添加两个元素
 
                        randomnum=random.randint(1,99)
                        print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)
                        self.data.put(randomnum,False) #将数据依次存入队列
                        time.sleep(1)
                        print "deque length is %s"%self.data.qsize()
                else:
                    if is_product:
                        for i in range(2):  #
 
                            randomnum = random.randint(1, 99)
                            print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)
                            self.data.put(randomnum,False)  # 将数据依次存入队列
                            time.sleep(1)
                            print "deque length is %s" % self.data.qsize()
                    else:
                        pass
 
        print "%s: %s finished!" %(time.ctime(), self.getName())
 
#Consumer thread
class Consumer_even(threading.Thread):
    def __init__(self,t_name,queue):
        threading.Thread.__init__(self,name=t_name)
        self.data=queue
    def run(self):
        while 1:
            if self.data.qsize() > 7:#队列长度大于7时开始取元素
                val_even = self.data.get(False)
                if val_even%2==0:
                    print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)
                    time.sleep(2)
                else:
                    self.data.put(val_even)
                    time.sleep(2)
                print "deque length is %s" % self.data.qsize()
            else:
                pass
 
 
class Consumer_odd(threading.Thread):
    def __init__(self,t_name,queue):
        threading.Thread.__init__(self, name=t_name)
        self.data=queue
    def run(self):
        while 1:
            if self.data.qsize() > 7:
                val_odd = self.data.get(False)
                if val_odd%2!=0:
                    print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)
                    time.sleep(2)
                else:
                    self.data.put(val_odd)
                    time.sleep(2)
                print "deque length is %s" % self.data.qsize()
            else:
                pass
 
#Main thread
def main():
    queue = Queue(20)
    producer = Producer(&#39;Pro.&#39;, queue)
    consumer_even = Consumer_even(&#39;Con_even.&#39;, queue)
    consumer_odd = Consumer_odd(&#39;Con_odd.&#39;,queue)
    producer.start()
    consumer_even.start()
    consumer_odd.start()
    producer.join()
    consumer_even.join()
    consumer_odd.join()
 
if __name__ == &#39;__main__&#39;:
    main()


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Previous article:pyqt5Next article:pyqt5