首頁  >  問答  >  主體

python - celery beat 調度如何運行期間動態新增任務?

我嘗試過django-celery-beat,在admin後台添加任務,可以實現動態添加任務
但要重啟celery beat才生效,請問,有其他方試嗎?

某草草某草草2712 天前2153

全部回覆(2)我來回復

  • 为情所困

    为情所困2017-05-18 10:58:10

    無法動態添加,必須重啟 beat。

    ask 回答過原因了 #3493

    回覆
    0
  • 高洛峰

    高洛峰2017-05-18 10:58:10

    有個思路,你可以考慮,我目前也在嘗試這個方法,處於摸石過河階段。 celery是支援定時任務,但不符合我的需求,我需要像linux下的crontab這樣動態添加定時任務,我也看了django-celery-beat,因為用的是Flask,發現不值得參考實現,所以一直在看文檔搜資料,終於被我找到一種方式,celery的apply_async這個函數非常有用,它有個eta參數,它的簡化使用是countdown,但是eta的威力是很巨大的,因為它只接受datetime對象,例如你給定一個任務在2017-05-02 20:0:0執行,你可以這樣使用:

    job.apply_async(args=args, kwarg=kwargs, eta=datetime(2017,5,2,20,0,0))

    是不是很少用,假如我有一個任務需要每天晚上八點執行,我可以利用這個eta參數實現。偽代碼如下:

    时间规则 = '每天晚上八点执行'
    第一次调用任务,先计算最近的执行时间,作为eta的参数,调用apply_async函数,
    然后第一次任务执行成功,得到上次任务的eta参数值,在天的值上加一,然后把新的执行时间作为eta的参数再次调用apply_async函数,这里省略了很多判断,自行脑补。
    循环往复,是不是一直按每天晚上八点执行。

    這裡有個非常重要的點是如何在任務執行成功的時候計算下一次的執行時間,做法如下

    class MyTask(Task):
        def on_success(self, retval, task_id, args, kwargs):
            print 'task done: {0}'.format(retval)
            return super(MyTask, self).on_success(retval, task_id, args, kwargs)
            
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            print 'task fail, reason: {0}'.format(exc)
            return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
            
    @app.task(base=MyTask)
    def add(x, y):
        return x + y

    它提供了任務執行成功和失敗的函數,我們只要在此基礎上重寫就可以了,我說的只是最核心的部分,具體怎麼做有很多方法,

    回覆
    0
  • 取消回覆