搜索

首页  >  问答  >  正文

在SQL模板中,Airflow的XCom以双引号字符串的形式返回值,而Snowflake在SQL查询中使用单引号字符串

我是 ETL 新手,正在研究气流和雪花。我使用 python 运算符从 mysql 表获取最大创建值,并基于该运算符的 xcom,我创建一个雪花数据的 csv 文件,以仅将最新创建的数据从 mysql 转储到雪花。问题是当我在 sql 模板中提取值时,airflow xcom 返回双引号。而 Snowflake 在其 sql 查询中接受单引号。错误图片

以下是我的 DAG 代码:

def defaultconverter(o):
    if isinstance(o, datetime):
        return o.__str__()


def get_max_created_timestamp(sql_table_name):
    hook = MySqlHook(MYSQL_CONN)
    check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \
                   f"and table_schema = '{MYSQL_SCHEMA}';"
    print(hook.schema)
    data = hook.get_records(check_column)
    if any('created_at' in x for x in data):
        date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}'
        (created_timestamp_max,) = hook.get_first(date_sql)
        return json.dumps(created_timestamp_max, default=defaultconverter)
        # return int(created_timestamp_max)
    else:
        return 0


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email": [],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=5),
    "template_searchpath": [TEMPLATE_SEARCHPATH, ]
}


with DAG(dag_id="lion_sense_snowflake_to_mysql_v1",
         start_date=datetime(2021, 12, 1, 0, 0, 0, 0),
         schedule_interval="@daily",
         catchup=False,
         default_args=default_args,
         max_active_runs=1,
         ) as dag:
    dag.doc_md = DOCS

    for table in tables:
        mysql_table = table["mysql_table"]
        snowflake_table = table["snowflake_table"]
        delete_flag = table["delete"]

        get_max_timestamp_task = PythonOperator(
            task_id=f"get_max_timestamp_{mysql_table}",
            python_callable=get_max_created_timestamp,
            op_args=[mysql_table, ],
            do_xcom_push=True,
        )

        create_snowflake_table_csv = SnowflakeOperator(
            task_id=f"create_snowflake_{snowflake_table}_table_csv",
            dag=dag,
            sql="sql/convert_snowflake_table_to_csv.sql",
            snowflake_conn_id=SNOWFLAKE_CONN_ID,
            warehouse=SNOWFLAKE_WAREHOUSE,
            database=SNOWFLAKE_DATABASE,
            schema=SNOWFLAKE_SCHEMA,
            role=SNOWFLAKE_ROLE,
            params={
                "snowflake_table": snowflake_table,
                "delete_flag": delete_flag,
                "max_date": get_max_timestamp_task.output
            }
        )

Mysql查询模板:

copy into @S3_TKXEL_DEVEOPMENT_STAGE/airflow/{{ dag.dag_id }}/{{ ds_nodash }}/{{ params.snowflake_table }}/{{ ds_nodash }}_{{ params.snowflake_table }}.csv
from (
    select * from {{ params.snowflake_table }}
        {% if params.delete_flag %}
 where created_at > {{ params.max_date}}
        {% endif %}
    )
file_format = (TYPE = CSV, COMPRESSION = NONE, NULL_IF=(''), field_optionally_enclosed_by='"' )
OVERWRITE = TRUE
SINGLE = TRUE
MAX_FILE_SIZE=5000000000;

预先感谢您增加我的知识。

P粉043470158P粉043470158275 天前464

全部回复(1)我来回复

  • P粉896751037

    P粉8967510372024-02-26 11:16:10

    是否更改 get_max_created_timestamp 的输出:

    def get_max_created_timestamp(sql_table_name):
        hook = MySqlHook(MYSQL_CONN)
        check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \
                       f"and table_schema = '{MYSQL_SCHEMA}';"
        print(hook.schema)
        data = hook.get_records(check_column)
        if any('created_at' in x for x in data):
            date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}'
            (created_timestamp_max,) = hook.get_first(date_sql)
            return "'" + created_timestamp_max + "'"
        else:
            return 0

    帮助,现在该字符串将被正确引用以表示雪花字符串期望。

    回复
    0
  • 取消回复