Home >php教程 >php手册 >python的分布式任务huey如何实现异步化任务讲解

python的分布式任务huey如何实现异步化任务讲解

WBOY
WBOYOriginal
2016-06-13 09:12:111443browse

python的分布式任务huey如何实现异步化任务讲解

 本文我们来分享一个python的轻型的任务队列程序,他可以让python的分布式任务huey实现异步化任务,感兴趣的朋友可以看看。

 

 

一个轻型的任务队列,功能和相关的broker没有celery强大,重在轻型,而且代码读起来也比较的简单。 


关于huey的介绍:  (比celery轻型,比mrq、rq要好用 !)

a lightweight alternative.

    written in python

    no deps outside stdlib, except redis (or roll your own backend)

    support for django

supports:

    multi-threaded task execution

    scheduled execution at a given time

    periodic execution, like a crontab

    retrying tasks that fail

    task result storage


安装:

 代码如下  
Installing
huey can be installed very easily using pip.
 
pip install huey
huey has no dependencies outside the standard library, but currently the only fully-implemented queue backend it ships with requires redis. To use the redis backend, you will need to install the python client.
 
pip install redis
Using git
If you want to run the very latest, feel free to pull down the repo from github and install by hand.
 
git clone https://github.com/coleifer/huey.git
cd huey
python setup.py install
You can run the tests using the test-runner:
 
python setup.py test




关于huey的api,下面有详细的介绍及参数介绍的。

 代码如下  
from huey import RedisHuey, crontab
 
huey = RedisHuey('my-app', host='redis.myapp.com')
 
@huey.task()
def add_numbers(a, b):
    return a + b
 
@huey.periodic_task(crontab(minute='0', hour='3'))
def nightly_backup():
    sync_all_data()




juey作为woker的时候,一些cli参数。 


常用的是:  

-l                  关于日志文件的执行 。

-w                 workers的数目,-w的数值大了,肯定是增加任务的处理能力

-p --periodic     启动huey worker的时候,他会从tasks.py里面找到 需要crontab的任务,会派出几个线程专门处理这些事情。 

-n                  不启动关于crontab里面的预周期执行,只有你触发的时候,才会执行周期星期的任务。 

--threads   意思你懂的。
1

 代码如下  
# 原文:     
The following table lists the options available for the consumer as well as their default values.
 
-l, --logfile
Path to file used for logging. When a file is specified, by default Huey will use a rotating file handler (1MB / chunk) with a maximum of 3 backups. You can attach your own handler (huey.logger) as well. The default loglevel is INFO.
-v, --verbose
Verbose logging (equates to DEBUG level). If no logfile is specified and verbose is set, then the consumer will log to the console. This is very useful for testing/debugging.
-q, --quiet
Only log errors. The default loglevel for the consumer is INFO.
-w, --workers
Number of worker threads, the default is 1 thread but for applications that have many I/O bound tasks, increasing this number may lead to greater throughput.
-p, --periodic
Indicate that this consumer process should start a thread dedicated to enqueueing “periodic” tasks (crontab-like functionality). This defaults to True, so should not need to be specified in practice.
-n, --no-periodic
Indicate that this consumer process should not enqueue periodic tasks.
-d, --delay
When using a “polling”-type queue backend, the amount of time to wait between polling the backend. Default is 0.1 seconds.
-m, --max-delay
The maximum amount of time to wait between polling, if using weighted backoff. Default is 10 seconds.
-b, --backoff
The amount to back-off when polling for results. Must be greater than one. Default is 1.15.
-u, --utc
Indicates that the consumer should use UTC time for all tasks, crontabs and scheduling. Default is True, so in practice you should not need to specify this option.
--localtime
Indicates that the consumer should use localtime for all tasks, crontabs and scheduling. Default is False.
Examples
 
Running the consumer with 8 threads, a logfile for errors only, and a very short polling interval:
 
huey_consumer.py my.app.huey -l /var/log/app.huey.log -w 8 -b 1.1 -m 1.0





任务队列huey 是靠着redis来实现queue的任务存储,所以需要咱们提前先把redis-server和redis-py都装好。 安装的方法就不说了,自己搜搜吧。 


我们首先创建下huey的链接实例 :

 代码如下  
# config.py
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
 
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
huey = Huey(queue)


然后就是关于任务的,也就是你想让谁到任务队列这个圈子里面,和celey、rq,mrq一样,都是用tasks.py表示的。

 代码如下  
from config import huey # import the huey we instantiated in config.py
 
 
@huey.task()
def count_beans(num):
    print '-- counted %s beans --' % num




再来一个真正去执行的 。  main.py 相当于生产者,tasks.py相当于消费者的关系。  main.py负责喂数据。

 代码如下  
main.py
from config import huey  # import our "huey" object
from tasks import count_beans  # import our task
 
 
if __name__ == '__main__':
    beans = raw_input('How many beans? ')
    count_beans(int(beans))
    print 'Enqueued job to count %s beans' % beans


Ensure you have Redis running locally

Ensure you have installed huey

Start the consumer: huey_consumer.py main.huey (notice this is “main.huey” and not “config.huey”).

Run the main program: python main.py




和celery、rq一样,他的结果获取是需要在你的config.py或者主代码里面指明他的存储的方式,现在huey还仅仅是支持redis,但相对他的特点和体积,这已经很足够了 !


只是那几句话而已,导入RedisDataStore库,申明下存储的地址。

 代码如下  
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
from huey.backends.redis_backend import RedisDataStore  # ADD THIS LINE
 
 
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
result_store = RedisDataStore('results', host='localhost', port=6379)  # ADDED
 
huey = Huey(queue, result_store=result_store) # ADDED result store




这个时候,我们在ipython再次去尝试的时候,会发现可以获取到tasks.py里面的return值了 其实你在main.py里面获取的时候,他还是通过uuid从redis里面取出来的。

 代码如下  
>>> from main import count_beans
>>> res = count_beans(100)
>>> res  # what is "res" ?

>>> res.get()  # get the result of this task
'Counted 100 beans'




huey也是支持celey的延迟执行和crontab的功能 。  这些功能很是重要,可以自定义的优先级或者不用再借助linux本身的crontab。


用法很简单,多加一个delay的时间就行了,看了下huey的源码,他默认是立马执行的。当然还是要看你的线程是否都是待执行的状态了。

 代码如下  
>>> import datetime
>>> res = count_beans.schedule(args=(100,), delay=60)
>>> res

>>> res.get()  # this returns None, no data is ready
>>> res.get()  # still no data...
>>> res.get(blocking=True)  # ok, let's just block until its ready
'Counted 100 beans'





再来一个重试retry的介绍,huey也是有retry,这个很是实用的东西。 如果大家有看到我的上面文章关于celery重试机制的介绍,应该也能明白huey是个怎么个回事了。  是的,他其实也是在tasks里具体函数的前面做了装饰器,装饰器里面有个func try 异常重试的逻辑 。 大家懂的。

 代码如下  
# tasks.py
from datetime import datetime
 
from config import huey
 
@huey.task(retries=3, retry_delay=10)
def try_thrice():
    print 'trying....%s' % datetime.now()
    raise Exception('nope')





huey是给你反悔的机会饿 ~  也就是说,你做了deley的计划任务后,如果你又想取消,那好看,直接revoke就可以了。

 代码如下  
# count some beans
res = count_beans(10000000)
 
res.revoke()
The same applies to tasks that are scheduled in the future:
 
res = count_beans.schedule(args=(100000,), eta=in_the_future)
res.revoke()
 
@huey.task(crontab(minute='*'))
def print_time():
    print datetime.now()


task() - 透明的装饰器,让你的函数变得优美点。 

periodic_task() - 这个是周期性的任务

crontab() - 启动worker的时候,附带的crontab的周期任务。 

BaseQueue - 任务队列

BaseDataStore - 任务执行后,可以把 结果塞入进去。  BAseDataStore可以自己重写。

 


官方的huey的git库里面是提供了相关的测试代码的: 


main.py

 代码如下  
from config import huey
from tasks import count_beans
 
 
if __name__ == '__main__':
    beans = raw_input('How many beans? ')
    count_beans(int(beans))
    print('Enqueued job to count %s beans' % beans)




tasks.py

 代码如下  
import random
import time
from huey import crontab
 
from config import huey
 
 
@huey.task()
def count_beans(num):
    print "start..."
    print('-- counted %s beans --' % num)
    time.sleep(3)
    print "end..."
    return 'Counted %s beans' % num
 
@huey.periodic_task(crontab(minute='*/5'))
def every_five_mins():
    print('Consumer prints this every 5 mins')
 
@huey.task(retries=3, retry_delay=10)
def try_thrice():
    if random.randint(1, 3) == 1:
        print('OK')
    else:
        print('About to fail, will retry in 10 seconds')
        raise Exception('Crap something went wrong')
 
@huey.task()
def slow(n):
    time.sleep(n)
    print('slept %s' % n)




run.sh

 代码如下  
#!/bin/bash
echo "HUEY CONSUMER"
echo "-------------"
echo "In another terminal, run 'python main.py'"
echo "Stop the consumer using Ctrl+C"
PYTHONPATH=.:$PYTHONPATH
python ../../huey/bin/huey_consumer.py main.huey --threads=2

=>



咱们可以先clone下huey的代码库。 里面有个examples例子目录,可以看到他是支持django的,但是这不是重点 !

 代码如下  
[xiaorui@devops /tmp ]$ git clone https://github.com/coleifer/huey.git
Cloning into 'huey'...
remote: Counting objects: 1423, done.
remote: Compressing objects: 100% (9/9), done.
Receiving objects:  34% (497/1423), 388.00 KiB | 29.00 KiB/s   KiB/s
 
Receiving objects:  34% (498/1423), 628.00 KiB | 22.00 KiB/s
 
 
remote: Total 1423 (delta 0), reused 0 (delta 0)
Receiving objects: 100% (1423/1423), 2.24 MiB | 29.00 KiB/s, done.
Resolving deltas: 100% (729/729), done.
Checking connectivity... done.
[xiaorui@devops /tmp ]$cd huey/examples/simple
[xiaorui@devops simple (master)]$ ll
total 40
-rw-r--r--  1 xiaorui  wheel    79B  9  8 08:49 README
-rw-r--r--  1 xiaorui  wheel     0B  9  8 08:49 __init__.py
-rw-r--r--  1 xiaorui  wheel    56B  9  8 08:49 config.py
-rwxr-xr-x  1 xiaorui  wheel   227B  9  8 08:49 cons.sh
-rw-r--r--  1 xiaorui  wheel   205B  9  8 08:49 main.py
-rw-r--r--  1 xiaorui  wheel   607B  9  8 08:49 tasks.py
[xiaorui@devops simple (master)]$



Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn