Explanation on how python's distributed task huey implements asynchronous tasks
In this article we will share a python lightweight task queue program, which can enable python's distributed task huey to be implemented Asynchronous tasks, interested friends can take a look.
A lightweight task queue. Its functions and related brokers are not as powerful as celery. It focuses on being lightweight, and the code is relatively simple to read.
Introduction to huey: (Lighter than celery, easier to use than mrq and rq!)
a lightweight alternative.
Written in python
No deps outside stdlib, except redis (or roll your own backend)
Support for django
supports:
Multi-threaded task execution
Scheduled execution at a given time
Periodic execution, like a crontab
Retrying tasks that fail
Task result storage
Installation:
The code is as follows |
|
Installing
代码如下 |
|
Installing
huey can be installed very easily using pip.
pip install huey
huey has no dependencies outside the standard library, but currently the only fully-implemented queue backend it ships with requires redis. To use the redis backend, you will need to install the python client.
pip install redis
Using git
If you want to run the very latest, feel free to pull down the repo from github and install by hand.
git clone https://github.com/coleifer/huey.git
cd huey
python setup.py install
You can run the tests using the test-runner:
python setup.py test |
huey can be installed very easily using pip.
pip install huey
huey has no dependencies outside the standard library, but currently the only fully-implemented queue backend it ships with requires redis. To use the redis backend, you will need to install the python client.
pip install redis
代码如下 |
|
from huey import RedisHuey, crontab
huey = RedisHuey('my-app', host='redis.myapp.com')
@huey.task()
def add_numbers(a, b):
return a + b
@huey.periodic_task(crontab(minute='0', hour='3'))
def nightly_backup():
sync_all_data() |
Using git
If you want to run the very latest, feel free to pull down the repo from github and install by hand.
git clone https://github.com/coleifer/huey.git
cd huey
python setup.py install
You can run the tests using the test-runner:
python setup.py test |
Regarding huey’s API, there is a detailed introduction and parameter introduction below.
The code is as follows |
|
from huey import RedisHuey, crontab
huey = RedisHuey('my-app', host='redis.myapp.com')
@huey.task()
def add_numbers(a, b):
Return a + b
@huey.periodic_task(crontab(minute='0', hour='3'))
def nightly_backup():
sync_all_data() |
When juey is a woker, some cli parameters.
Commonly used ones are:
-l About the execution of log files.
-WORKERS, -W has a large value, it must be the ability to increase tasks
-p --periodic When starting huey worker, it will find tasks that require crontab from tasks.py, and will send out several threads to handle these tasks.
-n does not start the pre-periodic execution in crontab. Weekly tasks will only be executed when you trigger it.
--threads You know what it means.
1
The code is as follows |
|
# Original text:
The following table lists the options available for the consumer as well as their default values.
代码如下 |
|
# 原文:
The following table lists the options available for the consumer as well as their default values.
-l, --logfile
Path to file used for logging. When a file is specified, by default Huey will use a rotating file handler (1MB / chunk) with a maximum of 3 backups. You can attach your own handler (huey.logger) as well. The default loglevel is INFO.
-v, --verbose
Verbose logging (equates to DEBUG level). If no logfile is specified and verbose is set, then the consumer will log to the console. This is very useful for testing/debugging.
-q, --quiet
Only log errors. The default loglevel for the consumer is INFO.
-w, --workers
Number of worker threads, the default is 1 thread but for applications that have many I/O bound tasks, increasing this number may lead to greater throughput.
-p, --periodic
Indicate that this consumer process should start a thread dedicated to enqueueing “periodic” tasks (crontab-like functionality). This defaults to True, so should not need to be specified in practice.
-n, --no-periodic
Indicate that this consumer process should not enqueue periodic tasks.
-d, --delay
When using a “polling”-type queue backend, the amount of time to wait between polling the backend. Default is 0.1 seconds.
-m, --max-delay
The maximum amount of time to wait between polling, if using weighted backoff. Default is 10 seconds.
-b, --backoff
The amount to back-off when polling for results. Must be greater than one. Default is 1.15.
-u, --utc
Indicates that the consumer should use UTC time for all tasks, crontabs and scheduling. Default is True, so in practice you should not need to specify this option.
--localtime
Indicates that the consumer should use localtime for all tasks, crontabs and scheduling. Default is False.
Examples
Running the consumer with 8 threads, a logfile for errors only, and a very short polling interval:
huey_consumer.py my.app.huey -l /var/log/app.huey.log -w 8 -b 1.1 -m 1.0 |
-l, --logfile
Path to file used for logging. When a file is specified, by default Huey will use a rotating file handler (1MB / chunk) with a maximum of 3 backups. You can attach your own handler (huey.logger) as well. The default loglevel is INFO.
-v, --verbose
Verbose logging (equates to DEBUG level). If no logfile is specified and verbose is set, then the consumer will log to the console. This is very useful for testing/debugging.
-q, --quiet
Only log errors. The default loglevel for the consumer is INFO.
-w, --workers
Number of worker threads, the default is 1 thread but for applications that have many I/O bound tasks, increasing this number may lead to greater throughput.
-p, --periodic
代码如下 |
|
# config.py
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
huey = Huey(queue) |
Indicate that this consumer process should start a thread dedicated to enqueueing “periodic” tasks (crontab-like functionality). This defaults to True, so should not need to be specified in practice.
-n, --no-periodic
Indicate that this consumer process should not enqueue periodic tasks.
-d, --delay
When using a “polling”-type queue backend, the amount of time to wait between polling the backend. Default is 0.1 seconds.
-m, --max-delay
The maximum amount of time to wait between polling, if using weighted backoff. Default is 10 seconds.
-b, --backoff
The amount to back-off when polling for results. Must be greater than one. Default is 1.15.
-u, --utc
Indicates that the consumer should use UTC time for all tasks, crontabs and scheduling. Default is True, so in practice you should not need to specify this option.
--localtime
Indicates that the consumer should use localtime for all tasks, crontabs and scheduling. Default is False.
Examples
Running the consumer with 8 threads, a logfile for errors only, and a very short polling interval:
huey_consumer.py my.app.huey -l /var/log/app.huey.log -w 8 -b 1.1 -m 1.0 |
Task queue huey relies on redis to implement queue task storage, so we need to install redis-server and redis-py in advance. I won’t go into the installation method, just search it yourself.
We first create a huey link instance:
The code is as follows |
|
# config.py
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
huey = Huey(queue) |
Then it’s about tasks, that is, who you want to be in the task queue circle. Like celey, rq, and mrq, they are all represented by tasks.py.
The code is as follows |
|
from config import huey # import the huey we instantiated in config.py
代码如下 |
|
from config import huey # import the huey we instantiated in config.py
@huey.task()
def count_beans(num):
print '-- counted %s beans --' % num |
@huey.task()
def count_beans(num):
Print '-- counted %s beans --' % num
|
代码如下 |
|
main.py
from config import huey # import our "huey" object
from tasks import count_beans # import our task
if __name__ == '__main__':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print 'Enqueued job to count %s beans' % beans
Ensure you have Redis running locally
Ensure you have installed huey
Start the consumer: huey_consumer.py main.huey (notice this is “main.huey” and not “config.huey”).
Run the main program: python main.py |
Here’s another one that actually goes into execution. main.py is equivalent to the producer, and tasks.py is equivalent to the consumer relationship. main.py is responsible for feeding data.
The code is as follows |
|
main.py
from config import huey # import our "huey" object
from tasks import count_beans # import our task
if __name__ == '__main__':
代码如下 |
|
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
from huey.backends.redis_backend import RedisDataStore # ADD THIS LINE
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
result_store = RedisDataStore('results', host='localhost', port=6379) # ADDED
huey = Huey(queue, result_store=result_store) # ADDED result store |
beans = raw_input('How many beans? ')
Count_beans(int(beans))
Print 'Enqueued job to count %s beans' % beans
Ensure you have Redis running locally
代码如下 |
|
>>> from main import count_beans
>>> res = count_beans(100)
>>> res # what is "res" ?
>>> res.get() # get the result of this task
'Counted 100 beans'
|
Ensure you have installed huey
Start the consumer: huey_consumer.py main.huey (notice this is “main.huey” and not “config.huey”).
Run the main program: python main.py |
Like celery and rq, to obtain its results, you need to specify its storage method in your config.py or main code. Currently, huey only supports redis, but compared to its characteristics and volume, this is enough. !
Just a few sentences, import the RedisDataStore library and declare the storage address.
The code is as follows |
|
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
from huey.backends.redis_backend import RedisDataStore # ADD THIS LINE
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
result_store = RedisDataStore('results', host='localhost', port=6379) # ADDED
huey = Huey(queue, result_store=result_store) # ADDED result store |
At this time, when we try again in ipython, we will find that we can get the return value in tasks.py. In fact, when you get it in main.py, it is still taken out from redis through uuid.
The code is as follows |
|
>>> from main import count_beans
>>> res = count_beans(100)
>>> res # what is "res" ?
>>> res.get() # get the result of this task
'Counted 100 beans'
|
huey also supports celey's delayed execution and crontab functions. These functions are very important, and the priority can be customized or there is no need to rely on Linux's own crontab.
The usage is very simple, just add an extra delay time. After looking at the source code of huey, it is executed immediately by default. Of course, it still depends on whether all your threads are in a state of pending execution.
The code is as follows |
|
>>> import datetime
代码如下 |
|
>>> import datetime
>>> res = count_beans.schedule(args=(100,), delay=60)
>>> res
>>> res.get() # this returns None, no data is ready
>>> res.get() # still no data...
>>> res.get(blocking=True) # ok, let's just block until its ready
'Counted 100 beans'
|
>>> res = count_beans.schedule(args=(100,), delay=60)
>>> res
>>> res.get() # this returns None, no data is ready
>>> res.get() # still no data...
>>> res.get(blocking=True) # ok, let's just block until its ready
'Counted 100 beans'
|
代码如下 |
|
# tasks.py
from datetime import datetime
from config import huey
@huey.task(retries=3, retry_delay=10)
def try_thrice():
print 'trying....%s' % datetime.now()
raise Exception('nope') |
Here is another introduction to retry. Huey also has retry, which is a very practical thing. If you have seen my introduction to the celery retry mechanism in the above article, you should also understand what huey is about. Yes, he actually also made a decorator in front of the specific functions in tasks. There is a func try exception retry logic in the decorator. Everyone understands.
The code is as follows |
|
# tasks.py
from datetime import datetime
代码如下 |
|
# count some beans
res = count_beans(10000000)
res.revoke()
The same applies to tasks that are scheduled in the future:
res = count_beans.schedule(args=(100000,), eta=in_the_future)
res.revoke()
@huey.task(crontab(minute='*'))
def print_time():
print datetime.now() |
from config import huey
@huey.task(retries=3, retry_delay=10)
def try_thrice():
Print 'trying....%s' % datetime.now()
Raise Exception('nope')
|
huey gives you a chance to regret it~ That is to say, after you complete deley's planned task, if you want to cancel it, you can just revoke it.
The code is as follows |
|
# count some beans
res = count_beans(10000000)
res.revoke()
The same applies to tasks that are scheduled in the future:
res = count_beans.schedule(args=(100000,), eta=in_the_future)
res.revoke()
代码如下 |
|
from config import huey
from tasks import count_beans
if __name__ == '__main__':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans) |
@huey.task(crontab(minute='*'))
def print_time():
Print datetime.now() |
task() - a transparent decorator that makes your function more beautiful.
periodic_task() - This is a periodic task
crontab() - The attached crontab periodic task when starting the worker.
BaseQueue - Task Queue
BaseDataStore - After the task is executed, the results can be stuffed into it. BAseDataStore can be rewritten by yourself.
The official huey git library provides relevant test code:
main.py
The code is as follows |
|
from config import huey
from tasks import count_beans
if __name__ == '__main__':
beans = raw_input('How many beans? ')
Count_beans(int(beans))
Print('Enqueued job to count %s beans' % beans) |
tasks.py
代码如下 |
|
import random
代码如下 |
|
import random
import time
from huey import crontab
from config import huey
@huey.task()
def count_beans(num):
print "start..."
print('-- counted %s beans --' % num)
time.sleep(3)
print "end..."
return 'Counted %s beans' % num
@huey.periodic_task(crontab(minute='*/5'))
def every_five_mins():
print('Consumer prints this every 5 mins')
@huey.task(retries=3, retry_delay=10)
def try_thrice():
if random.randint(1, 3) == 1:
print('OK')
else:
print('About to fail, will retry in 10 seconds')
raise Exception('Crap something went wrong')
@huey.task()
def slow(n):
time.sleep(n)
print('slept %s' % n) |
import time
from huey import crontab
from config import huey
代码如下 |
|
#!/bin/bash
echo "HUEY CONSUMER"
echo "-------------"
echo "In another terminal, run 'python main.py'"
echo "Stop the consumer using Ctrl+C"
PYTHONPATH=.:$PYTHONPATH
python ../../huey/bin/huey_consumer.py main.huey --threads=2
=> |
@huey.task()
def count_beans(num):
print "start..."
print('-- counted %s beans --' % num)
time.sleep(3)
代码如下 |
|
[xiaorui@devops /tmp ]$ git clone https://github.com/coleifer/huey.git
Cloning into 'huey'...
remote: Counting objects: 1423, done.
remote: Compressing objects: 100% (9/9), done.
Receiving objects: 34% (497/1423), 388.00 KiB | 29.00 KiB/s KiB/s
Receiving objects: 34% (498/1423), 628.00 KiB | 22.00 KiB/s
remote: Total 1423 (delta 0), reused 0 (delta 0)
Receiving objects: 100% (1423/1423), 2.24 MiB | 29.00 KiB/s, done.
Resolving deltas: 100% (729/729), done.
Checking connectivity... done.
[xiaorui@devops /tmp ]$cd huey/examples/simple
[xiaorui@devops simple (master)]$ ll
total 40
-rw-r--r-- 1 xiaorui wheel 79B 9 8 08:49 README
-rw-r--r-- 1 xiaorui wheel 0B 9 8 08:49 __init__.py
-rw-r--r-- 1 xiaorui wheel 56B 9 8 08:49 config.py
-rwxr-xr-x 1 xiaorui wheel 227B 9 8 08:49 cons.sh
-rw-r--r-- 1 xiaorui wheel 205B 9 8 08:49 main.py
-rw-r--r-- 1 xiaorui wheel 607B 9 8 08:49 tasks.py
[xiaorui@devops simple (master)]$ |
print "end..."
return 'Counted %s beans' % num
@huey.periodic_task(crontab(minute='*/5'))
def every_five_mins():
print('Consumer prints this every 5 mins')
@huey.task(retries=3, retry_delay=10)
def try_thrice():
if random.randint(1, 3) == 1:
print('OK')
else:
print('About to fail, will retry in 10 seconds')
raise Exception('Crap something went wrong')
@huey.task()
def slow(n):
time.sleep(n)
print('slept %s' % n) |
run.sh
代码如下 |
|
#!/bin/bash
echo "HUEY CONSUMER"
echo "-------------"
echo "In another terminal, run 'python main.py'"
echo "Stop the consumer using Ctrl+C"
PYTHONPATH=.:$PYTHONPATH
python ../../huey/bin/huey_consumer.py main.huey --threads=2
=> |
咱们可以先clone下huey的代码库。 里面有个examples例子目录,可以看到他是支持django的,但是这不是重点 !
代码如下 |
|
[xiaorui@devops /tmp ]$ git clone https://github.com/coleifer/huey.git
Cloning into 'huey'...
remote: Counting objects: 1423, done.
remote: Compressing objects: 100% (9/9), done.
Receiving objects: 34% (497/1423), 388.00 KiB | 29.00 KiB/s KiB/s
Receiving objects: 34% (498/1423), 628.00 KiB | 22.00 KiB/s
remote: Total 1423 (delta 0), reused 0 (delta 0)
Receiving objects: 100% (1423/1423), 2.24 MiB | 29.00 KiB/s, done.
Resolving deltas: 100% (729/729), done.
Checking connectivity... done.
[xiaorui@devops /tmp ]$cd huey/examples/simple
[xiaorui@devops simple (master)]$ ll
total 40
-rw-r--r-- 1 xiaorui wheel 79B 9 8 08:49 README
-rw-r--r-- 1 xiaorui wheel 0B 9 8 08:49 __init__.py
-rw-r--r-- 1 xiaorui wheel 56B 9 8 08:49 config.py
-rwxr-xr-x 1 xiaorui wheel 227B 9 8 08:49 cons.sh
-rw-r--r-- 1 xiaorui wheel 205B 9 8 08:49 main.py
-rw-r--r-- 1 xiaorui wheel 607B 9 8 08:49 tasks.py
[xiaorui@devops simple (master)]$ |
http://www.bkjia.com/PHPjc/968078.htmlwww.bkjia.comtruehttp: //www.bkjia.com/PHPjc/968078.htmlTechArticleExplanation on how python's distributed task huey implements asynchronous tasks. In this article, we will share a python lightweight task queue program. , he can make python's distributed task huey asynchronous...