Home  >  Q&A  >  body text

python - How to dynamically add tasks during the run of celery beat scheduler?

I tried django-celery-beat. Adding tasks in the admin background can achieve dynamic addition of tasks.
But it will take effect only after restarting celery beat. Is there any other way to try?

某草草某草草2712 days ago2152

reply all(2)I'll reply

  • 为情所困

    为情所困2017-05-18 10:58:10

    Cannot be added dynamically, beat must be restarted.

    ask answered the reason #3493

    reply
    0
  • 高洛峰

    高洛峰2017-05-18 10:58:10

    There is an idea that you can consider. I am currently trying this method and am in the stage of trying to cross the river by feeling the stones. Celery supports scheduled tasks, but it does not meet my needs. I need to dynamically add scheduled tasks like crontab under Linux. I also looked at django-celery-beat. Because I use Flask, I found that it is not worth reference for implementation, so I have been After reading the documentation and searching for information, I finally found a way. Celery's apply_async function is very useful. It has an eta parameter. Its simplified use is countdown, but the power of eta is huge because it only accepts datetime objects. , for example, if you give a task to be executed at 2017-05-02 20:0:0, you can use it like this:

    job.apply_async(args=args, kwarg=kwargs, eta=datetime(2017,5,2,20,0,0))

    Is it rarely used? If I have a task that needs to be executed at eight o'clock every night, I can use this eta parameter to achieve it. The pseudo code is as follows:

    时间规则 = '每天晚上八点执行'
    第一次调用任务,先计算最近的执行时间,作为eta的参数,调用apply_async函数,
    然后第一次任务执行成功,得到上次任务的eta参数值,在天的值上加一,然后把新的执行时间作为eta的参数再次调用apply_async函数,这里省略了很多判断,自行脑补。
    循环往复,是不是一直按每天晚上八点执行。

    A very important point here is how to calculate the next execution time when the task is successfully executed. The method is as follows

    class MyTask(Task):
        def on_success(self, retval, task_id, args, kwargs):
            print 'task done: {0}'.format(retval)
            return super(MyTask, self).on_success(retval, task_id, args, kwargs)
            
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            print 'task fail, reason: {0}'.format(exc)
            return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
            
    @app.task(base=MyTask)
    def add(x, y):
        return x + y

    It provides functions for success and failure of task execution. We only need to rewrite it on this basis. I am only talking about the core part. There are many ways to do it specifically,

    reply
    0
  • Cancelreply