本文展示了如何使用redis构建一个简单的多生产者,多消费者队列并且提供类似python标准库queue一样的接口。你可以使用这个队列方便的从多个进程或者耗时的计算到多个消费者进程之间共享数据。 我们使用redis列表来保存数据。redis列表按照字符串插入的顺序保
本文展示了如何使用redis构建一个简单的多生产者,多消费者队列并且提供类似python标准库queue一样的接口。你可以使用这个队列方便的从多个进程或者耗时的计算到多个消费者进程之间共享数据。
我们使用redis列表来保存数据。redis列表按照字符串插入的顺序保存数据。
下面的redis命令会被用到:
实现过程使用了redis-py库和服务器进行交互:
RedisQueue.py
<code class="python">import redis class RedisQueue(object): """Simple Queue with Redis Backend""" def __init__(self, name, namespace='queue', **redis_kwargs): """The default connection parameters are: host='localhost', port=6379, db=0""" self.__db= redis.Redis(**redis_kwargs) self.key = '%s:%s' %(namespace, name) def qsize(self): """Return the approximate size of the queue.""" return self.__db.llen(self.key) def empty(self): """Return True if the queue is empty, False otherwise.""" return self.qsize() == 0 def put(self, item): """Put item into the queue.""" self.__db.rpush(self.key, item) def get(self, block=True, timeout=None): """Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available.""" if block: item = self.__db.blpop(self.key, timeout=timeout) else: item = self.__db.lpop(self.key) if item: item = item[1] return item def get_nowait(self): """Equivalent to get(False).""" return self.get(False) </code>
使用:
<code>>>> from RedisQueue import RedisQueue >>> q = RedisQueue('test') >>> q.put('hello world')</code>
现在我们使用redis-cli客户端查看数据库,期望的结果如下:
<code>redis 127.0.0.1:6379> keys * 1) "queue:test" redis 127.0.0.1:6379> type queue:test list redis 127.0.0.1:6379> llen queue:test (integer) 1 redis 127.0.0.1:6379> lrange queue:test 0 1 1) "hello world"</code>
我们可以使用一个不同的脚本来获取数据:
<code>>>> from RedisQueue import RedisQueue >>> q = RedisQueue('test') >>> q.get() 'hello world'</code>
随后的q.get()调用会一直阻塞直到某人重新向队列发送一个新的数据。
接下来的工作将是到队列的 编码/解码(例如python-json),这样你就可以不受限制的发送任何字符串。
现在已经存在漂亮而又简单的hotqueue库,它具有像上面例子中的接口别且提供编码/解码功能。
其他值得提到的使用redis做后端的有:
原文中的一个小bug,如下:
生产者:
<code>In [1]: from RedisQueue import RedisQueue In [2]: q = RedisQueue('test') In [3]: q.put('1') In [4]: q.put('2')</code>
消费者:
<code>In [10]: from RedisQueue import RedisQueue In [11]: q = RedisQueue('test') In [12]: q.get() Out[12]: '1' In [13]: q.get_nowait() IndexError Traceback (most recent call last) <ipython-input-13-4f5e3b567b9e> in <module>() ----> 1 q.get_nowait() /data9/RedisQueue.py in get_nowait(self) 36 def get_nowait(self): 37 """Equivalent to get(False).""" ---> 38 return self.get(False) 39 /data9/RedisQueue.py in get(self, block, timeout) 31 32 if item: ---> 33 item = item[1] 34 return item 35 IndexError: string index out of range</module></ipython-input-13-4f5e3b567b9e></code>
使用redis-py库连接redis数据库,获取下数据,很容易就看到问题出在哪里。
<code>In [55]: r = redis.Redis() In [56]: r.blpop('queue:test') Out[56]: ('queue:test', '1') In [57]: r.lpop('queue:test') Out[57]: '2'</code>
当使用非阻塞方式获取数据时,redis客户端返回的是一个string;当使用阻塞方式获取数据时,redis返回的数据是一个tuple。因此,修改下get函数中对item的判断条件就可以了:
<code>if isinstance(item,tuple): item = item[1]</code>
自言自语:
翻译文章是个劳心的活,记得2、3年前有一次想翻译一篇关于nginx的文章,当时是越翻译越别扭,感觉整个人都不好了,后来就断了翻译文档的心。就在翻译这篇文章之前我还在纠结到底译不译,看当我咬咬牙,不也是顺利完活了。学技术贵在坚持,搞IT只要不是底层内核、算法层面的东西,多练练都是没问题的。
原文地址:http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html
原文地址:python下使用redis构造一个简单的队列(翻译), 感谢原作者分享。