cari

Rumah  >  Soal Jawab  >  teks badan

Dalam templat SQL, Airflow's XCom mengembalikan nilai sebagai rentetan petikan dua kali manakala Snowflake menggunakan rentetan petikan tunggal dalam pertanyaan SQL

Saya baru dalam ETL dan mengusahakan aliran udara dan kepingan salji. Saya menggunakan pengendali python untuk mendapatkan nilai ciptaan maksimum daripada jadual mysql dan berdasarkan xcom pengendali itu, saya mencipta fail csv data kepingan salji untuk membuang hanya data ciptaan terkini dari mysql ke kepingan salji. Masalahnya ialah apabila saya mengekstrak nilai dalam templat sql, aliran udara xcom mengembalikan petikan berganda. Dan Snowflake menerima petikan tunggal dalam pertanyaan sqlnya. Gambar ralat

Ini kod DAG saya:

def defaultconverter(o):
    if isinstance(o, datetime):
        return o.__str__()


def get_max_created_timestamp(sql_table_name):
    hook = MySqlHook(MYSQL_CONN)
    check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \
                   f"and table_schema = '{MYSQL_SCHEMA}';"
    print(hook.schema)
    data = hook.get_records(check_column)
    if any('created_at' in x for x in data):
        date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}'
        (created_timestamp_max,) = hook.get_first(date_sql)
        return json.dumps(created_timestamp_max, default=defaultconverter)
        # return int(created_timestamp_max)
    else:
        return 0


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email": [],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=5),
    "template_searchpath": [TEMPLATE_SEARCHPATH, ]
}


with DAG(dag_id="lion_sense_snowflake_to_mysql_v1",
         start_date=datetime(2021, 12, 1, 0, 0, 0, 0),
         schedule_interval="@daily",
         catchup=False,
         default_args=default_args,
         max_active_runs=1,
         ) as dag:
    dag.doc_md = DOCS

    for table in tables:
        mysql_table = table["mysql_table"]
        snowflake_table = table["snowflake_table"]
        delete_flag = table["delete"]

        get_max_timestamp_task = PythonOperator(
            task_id=f"get_max_timestamp_{mysql_table}",
            python_callable=get_max_created_timestamp,
            op_args=[mysql_table, ],
            do_xcom_push=True,
        )

        create_snowflake_table_csv = SnowflakeOperator(
            task_id=f"create_snowflake_{snowflake_table}_table_csv",
            dag=dag,
            sql="sql/convert_snowflake_table_to_csv.sql",
            snowflake_conn_id=SNOWFLAKE_CONN_ID,
            warehouse=SNOWFLAKE_WAREHOUSE,
            database=SNOWFLAKE_DATABASE,
            schema=SNOWFLAKE_SCHEMA,
            role=SNOWFLAKE_ROLE,
            params={
                "snowflake_table": snowflake_table,
                "delete_flag": delete_flag,
                "max_date": get_max_timestamp_task.output
            }
        )

Templat pertanyaan mysql:

copy into @S3_TKXEL_DEVEOPMENT_STAGE/airflow/{{ dag.dag_id }}/{{ ds_nodash }}/{{ params.snowflake_table }}/{{ ds_nodash }}_{{ params.snowflake_table }}.csv
from (
    select * from {{ params.snowflake_table }}
        {% if params.delete_flag %}
 where created_at > {{ params.max_date}}
        {% endif %}
    )
file_format = (TYPE = CSV, COMPRESSION = NONE, NULL_IF=(''), field_optionally_enclosed_by='"' )
OVERWRITE = TRUE
SINGLE = TRUE
MAX_FILE_SIZE=5000000000;

Terima kasih terlebih dahulu kerana menambah pengetahuan saya.

P粉043470158P粉043470158319 hari yang lalu490

membalas semua(1)saya akan balas

  • P粉896751037

    P粉8967510372024-02-26 11:16:10

    Sama ada hendak menukar output get_max_created_timestamp:

    def get_max_created_timestamp(sql_table_name):
        hook = MySqlHook(MYSQL_CONN)
        check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \
                       f"and table_schema = '{MYSQL_SCHEMA}';"
        print(hook.schema)
        data = hook.get_records(check_column)
        if any('created_at' in x for x in data):
            date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}'
            (created_timestamp_max,) = hook.get_first(date_sql)
            return "'" + created_timestamp_max + "'"
        else:
            return 0

    Bantuan, kini rentetan akan dipetik dengan betul untuk mewakili jangkaan rentetan kepingan salji.

    balas
    0
  • Batalbalas