I tried django-celery-beat. Adding tasks in the admin background can achieve dynamic addition of tasks.
But it will take effect only after restarting celery beat. Is there any other way to try?
为情所困2017-05-18 10:58:10
Cannot be added dynamically, beat must be restarted.
ask answered the reason #3493
高洛峰2017-05-18 10:58:10
There is an idea that you can consider. I am currently trying this method and am in the stage of trying to cross the river by feeling the stones. Celery supports scheduled tasks, but it does not meet my needs. I need to dynamically add scheduled tasks like crontab under Linux. I also looked at django-celery-beat. Because I use Flask, I found that it is not worth reference for implementation, so I have been After reading the documentation and searching for information, I finally found a way. Celery's apply_async function is very useful. It has an eta parameter. Its simplified use is countdown, but the power of eta is huge because it only accepts datetime objects. , for example, if you give a task to be executed at 2017-05-02 20:0:0, you can use it like this:
job.apply_async(args=args, kwarg=kwargs, eta=datetime(2017,5,2,20,0,0))
Is it rarely used? If I have a task that needs to be executed at eight o'clock every night, I can use this eta parameter to achieve it. The pseudo code is as follows:
时间规则 = '每天晚上八点执行'
第一次调用任务,先计算最近的执行时间,作为eta的参数,调用apply_async函数,
然后第一次任务执行成功,得到上次任务的eta参数值,在天的值上加一,然后把新的执行时间作为eta的参数再次调用apply_async函数,这里省略了很多判断,自行脑补。
循环往复,是不是一直按每天晚上八点执行。
A very important point here is how to calculate the next execution time when the task is successfully executed. The method is as follows
class MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print 'task done: {0}'.format(retval)
return super(MyTask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
print 'task fail, reason: {0}'.format(exc)
return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
@app.task(base=MyTask)
def add(x, y):
return x + y
It provides functions for success and failure of task execution. We only need to rewrite it on this basis. I am only talking about the core part. There are many ways to do it specifically,