Home  >  Article  >  Backend Development  >  Execute the same operator as multiple tasks in Cloud Composer

Execute the same operator as multiple tasks in Cloud Composer

PHPz
PHPzforward
2024-02-08 21:08:30573browse

在 Cloud Composer 中将同一运算符作为多个任务执行

Question content

I have a pythonoperator executed using cloud composer in airflow:

with DAG(
    dag_id = config['dag_id'],
    schedule_interval = config['schedule_interval'],
    default_args = default_args
    ) as dag:
    
    generate_data_task = PythonOperator(
        task_id = 'generate_dummy_data',
        python_callable = generate_data,
        dag = dag
    )

generate_data() function writes a randomly generated uniquely named csv file into a bucket containing some data. Executing as is works fine, but I want to execute the same task multiple times in parallel. If I specify 10 parallel executions, I expect 10 files to be written to the bucket. I've tried concurrency and task_concurrency but got the same result.

Can this be implemented using airflow on top of cloud composer?


Correct Answer


Use Dynamic Task Mapping:

generate_data_task = PythonOperator.partial(
        task_id = 'generate_dummy_data',
        python_callable = generate_data,
        dag = dag
    ).expand(op_args=[[]] * 10)

The above is the detailed content of Execute the same operator as multiple tasks in Cloud Composer. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:stackoverflow.com. If there is any infringement, please contact admin@php.cn delete