我是 ETL 新手,正在研究氣流和雪花。我使用 python 運算子從 mysql 表獲取最大創建值,並基於該運算符的 xcom,我創建一個雪花數據的 csv 文件,以僅將最新創建的數據從 mysql 轉儲到雪花。問題是當我在 sql 模板中提取值時,airflow xcom 會傳回雙引號。而 Snowflake 在其 sql 查詢中接受單引號。錯誤圖片
以下是我的 DAG 程式碼:
def defaultconverter(o): if isinstance(o, datetime): return o.__str__() def get_max_created_timestamp(sql_table_name): hook = MySqlHook(MYSQL_CONN) check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \ f"and table_schema = '{MYSQL_SCHEMA}';" print(hook.schema) data = hook.get_records(check_column) if any('created_at' in x for x in data): date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}' (created_timestamp_max,) = hook.get_first(date_sql) return json.dumps(created_timestamp_max, default=defaultconverter) # return int(created_timestamp_max) else: return 0 default_args = { "owner": "airflow", "depends_on_past": False, "email": [], "email_on_failure": True, "email_on_retry": False, "retries": 0, "retry_delay": timedelta(minutes=5), "template_searchpath": [TEMPLATE_SEARCHPATH, ] } with DAG(dag_id="lion_sense_snowflake_to_mysql_v1", start_date=datetime(2021, 12, 1, 0, 0, 0, 0), schedule_interval="@daily", catchup=False, default_args=default_args, max_active_runs=1, ) as dag: dag.doc_md = DOCS for table in tables: mysql_table = table["mysql_table"] snowflake_table = table["snowflake_table"] delete_flag = table["delete"] get_max_timestamp_task = PythonOperator( task_id=f"get_max_timestamp_{mysql_table}", python_callable=get_max_created_timestamp, op_args=[mysql_table, ], do_xcom_push=True, ) create_snowflake_table_csv = SnowflakeOperator( task_id=f"create_snowflake_{snowflake_table}_table_csv", dag=dag, sql="sql/convert_snowflake_table_to_csv.sql", snowflake_conn_id=SNOWFLAKE_CONN_ID, warehouse=SNOWFLAKE_WAREHOUSE, database=SNOWFLAKE_DATABASE, schema=SNOWFLAKE_SCHEMA, role=SNOWFLAKE_ROLE, params={ "snowflake_table": snowflake_table, "delete_flag": delete_flag, "max_date": get_max_timestamp_task.output } )
Mysql查詢模板:
copy into @S3_TKXEL_DEVEOPMENT_STAGE/airflow/{{ dag.dag_id }}/{{ ds_nodash }}/{{ params.snowflake_table }}/{{ ds_nodash }}_{{ params.snowflake_table }}.csv from ( select * from {{ params.snowflake_table }} {% if params.delete_flag %} where created_at > {{ params.max_date}} {% endif %} ) file_format = (TYPE = CSV, COMPRESSION = NONE, NULL_IF=(''), field_optionally_enclosed_by='"' ) OVERWRITE = TRUE SINGLE = TRUE MAX_FILE_SIZE=5000000000;
預先感謝您增加我的知識。
P粉8967510372024-02-26 11:16:10
是否更改 get_max_created_timestamp
的輸出:
def get_max_created_timestamp(sql_table_name): hook = MySqlHook(MYSQL_CONN) check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \ f"and table_schema = '{MYSQL_SCHEMA}';" print(hook.schema) data = hook.get_records(check_column) if any('created_at' in x for x in data): date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}' (created_timestamp_max,) = hook.get_first(date_sql) return "'" created_timestamp_max "'" else: return 0
幫助,現在字串將被正確引用以表示雪花字串期望。