Maison >développement back-end >Tutoriel Python >Exécuter le même opérateur que plusieurs tâches dans Cloud Composer

Exécuter le même opérateur que plusieurs tâches dans Cloud Composer

PHPz
PHPzavant
2024-02-08 21:08:30651parcourir

在 Cloud Composer 中将同一运算符作为多个任务执行

Contenu de la question

J'ai un opérateur python dans airflow exécuté à l'aide de cloud composer :

with DAG(
    dag_id = config['dag_id'],
    schedule_interval = config['schedule_interval'],
    default_args = default_args
    ) as dag:
    
    generate_data_task = PythonOperator(
        task_id = 'generate_dummy_data',
        python_callable = generate_data,
        dag = dag
    )
La fonction

generate_data() écrit un fichier csv généré aléatoirement et nommé de manière unique dans un compartiment contenant des données. L'exécution telle quelle fonctionne bien, mais je souhaite exécuter la même tâche plusieurs fois en parallèle. Si je spécifie 10 exécutions parallèles, je m'attends à ce que 10 fichiers soient écrits dans le bucket. J'ai essayé la concurrence et task_concurrency mais j'ai obtenu le même résultat.

Cela peut-il être réalisé en utilisant le flux d'air au-dessus du cloud composer ?


Bonne réponse


Utilisez Mappage dynamique des tâches :

generate_data_task = PythonOperator.partial(
        task_id = 'generate_dummy_data',
        python_callable = generate_data,
        dag = dag
    ).expand(op_args=[[]] * 10)

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer