Maison  >  Questions et réponses  >  le corps du texte

Dans les modèles SQL, XCom d'Airflow renvoie les valeurs sous forme de chaînes entre guillemets doubles tandis que Snowflake utilise des chaînes entre guillemets simples dans les requêtes SQL.

Je suis nouveau sur ETL et je travaille sur le flux d'air et les flocons de neige. J'utilise un opérateur python pour obtenir la valeur maximale créée à partir d'une table mysql et, sur la base du xcom de cet opérateur, je crée un fichier csv de données de flocon de neige pour vider uniquement les dernières données créées de mysql vers snowflake. Le problème est que lorsque j'extrais la valeur dans le modèle SQL, airflow xcom renvoie des guillemets doubles. Et Snowflake accepte les guillemets simples dans ses requêtes SQL. Photo d'erreur

Voici mon code DAG :

def defaultconverter(o):
    if isinstance(o, datetime):
        return o.__str__()


def get_max_created_timestamp(sql_table_name):
    hook = MySqlHook(MYSQL_CONN)
    check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \
                   f"and table_schema = '{MYSQL_SCHEMA}';"
    print(hook.schema)
    data = hook.get_records(check_column)
    if any('created_at' in x for x in data):
        date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}'
        (created_timestamp_max,) = hook.get_first(date_sql)
        return json.dumps(created_timestamp_max, default=defaultconverter)
        # return int(created_timestamp_max)
    else:
        return 0


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email": [],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=5),
    "template_searchpath": [TEMPLATE_SEARCHPATH, ]
}


with DAG(dag_id="lion_sense_snowflake_to_mysql_v1",
         start_date=datetime(2021, 12, 1, 0, 0, 0, 0),
         schedule_interval="@daily",
         catchup=False,
         default_args=default_args,
         max_active_runs=1,
         ) as dag:
    dag.doc_md = DOCS

    for table in tables:
        mysql_table = table["mysql_table"]
        snowflake_table = table["snowflake_table"]
        delete_flag = table["delete"]

        get_max_timestamp_task = PythonOperator(
            task_id=f"get_max_timestamp_{mysql_table}",
            python_callable=get_max_created_timestamp,
            op_args=[mysql_table, ],
            do_xcom_push=True,
        )

        create_snowflake_table_csv = SnowflakeOperator(
            task_id=f"create_snowflake_{snowflake_table}_table_csv",
            dag=dag,
            sql="sql/convert_snowflake_table_to_csv.sql",
            snowflake_conn_id=SNOWFLAKE_CONN_ID,
            warehouse=SNOWFLAKE_WAREHOUSE,
            database=SNOWFLAKE_DATABASE,
            schema=SNOWFLAKE_SCHEMA,
            role=SNOWFLAKE_ROLE,
            params={
                "snowflake_table": snowflake_table,
                "delete_flag": delete_flag,
                "max_date": get_max_timestamp_task.output
            }
        )

Modèle de requête MySQL :

copy into @S3_TKXEL_DEVEOPMENT_STAGE/airflow/{{ dag.dag_id }}/{{ ds_nodash }}/{{ params.snowflake_table }}/{{ ds_nodash }}_{{ params.snowflake_table }}.csv
from (
    select * from {{ params.snowflake_table }}
        {% if params.delete_flag %}
 where created_at > {{ params.max_date}}
        {% endif %}
    )
file_format = (TYPE = CSV, COMPRESSION = NONE, NULL_IF=(''), field_optionally_enclosed_by='"' )
OVERWRITE = TRUE
SINGLE = TRUE
MAX_FILE_SIZE=5000000000;

Merci d'avance d'élargir mes connaissances.

P粉043470158P粉043470158237 Il y a quelques jours416

répondre à tous(1)je répondrai

  • P粉896751037

    P粉8967510372024-02-26 11:16:10

    S'il faut modifier la sortie de get_max_created_timestamp :

    def get_max_created_timestamp(sql_table_name):
        hook = MySqlHook(MYSQL_CONN)
        check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \
                       f"and table_schema = '{MYSQL_SCHEMA}';"
        print(hook.schema)
        data = hook.get_records(check_column)
        if any('created_at' in x for x in data):
            date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}'
            (created_timestamp_max,) = hook.get_first(date_sql)
            return "'" + created_timestamp_max + "'"
        else:
            return 0

    Aide, la chaîne sera désormais correctement citée pour représenter les attentes en matière de chaîne de flocon de neige.

    répondre
    0
  • Annulerrépondre