Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Timbunan Data Berbilang Enjin Deklaratif dengan Ibis

Timbunan Data Berbilang Enjin Deklaratif dengan Ibis

王林
王林asal
2024-07-19 03:34:101256semak imbas

TL;DR

Baru-baru ini saya menemui Surat Berita Kejuruteraan Data Ju oleh Julien Hurault pada tindanan data berbilang enjin. Ideanya mudah; kami ingin mengalihkan kod kami dengan mudah ke mana-mana bahagian belakang sambil mengekalkan fleksibiliti untuk mengembangkan saluran paip kami apabila bahagian belakang dan ciri baharu dibangunkan. Ini memerlukan sekurang-kurangnya aliran kerja peringkat tinggi berikut:

  1. Memunggah sebahagian daripada pertanyaan SQL ke enjin tanpa pelayan dengan DuckDB, polar, DataFusion, chdb dll.
  2. Saluran paip bersaiz tepat untuk pelbagai senario pembangunan dan penggunaan. Contohnya, pembangun boleh bekerja secara tempatan dan menghantar ke pengeluaran dengan yakin.
  3. Gunakan pengoptimuman gaya pangkalan data pada saluran paip anda secara automatik.

Dalam siaran ini, kami menyelami bagaimana kami boleh melaksanakan saluran paip berbilang enjin daripada bahasa pengaturcaraan; Daripada SQL, kami menggunakan cadangan menggunakan API Dataframe yang boleh digunakan untuk kedua-dua kes penggunaan interaktif dan kelompok. Secara khusus, kami menunjukkan cara untuk memecahkan saluran paip kami kepada bahagian yang lebih kecil dan melaksanakannya merentas DuckDB, panda dan Snowflake. Kami juga membincangkan kelebihan susunan data berbilang enjin dan menyerlahkan arah aliran yang muncul dalam bidang.

Kod yang dilaksanakan dalam siaran ini tersedia di GitHub^[Untuk mencuba repo dengan cepat, saya juga menyediakan serpihan nix]. Kerja rujukan dalam surat berita dengan pelaksanaan asal ada di sini.

Gambaran keseluruhan

Saluran paip tindanan data berbilang enjin berfungsi seperti berikut: Sesetengah data mendarat dalam baldi S3, dipraproses untuk mengalih keluar sebarang pendua dan kemudian dimuatkan ke dalam jadual Snowflake, di mana ia diubah lagi dengan fungsi khusus ML atau Snowflake^[Sila ambil perhatian kami tidak akan melaksanakan jenis perkara yang mungkin boleh dilakukan dalam Snowflake dan menganggapnya sebagai keperluan untuk aliran kerja]. Saluran paip menerima pesanan sebagai fail parket yang disimpan ke lokasi pendaratan, diproses terlebih dahulu dan kemudian disimpan di lokasi pementasan dalam baldi S3. Data pementasan kemudian dimuatkan dalam Snowflake untuk menyambungkan alat BI hiliran kepadanya. Saluran paip diikat bersama oleh SQL dbt dengan satu model untuk setiap bahagian belakang dan surat berita memilih Dagster sebagai alat orkestrasi.

Declarative Multi-Engine Data Stack with Ibis

Hari ini, kita akan menyelami cara kita boleh menukar kod panda kita kepada ungkapan Ibis, menghasilkan semula contoh lengkap untuk contoh tindanan berbilang enjin Julien Hurault 1. Daripada menggunakan Model dbt dan SQL, kami menggunakan ibis dan beberapa Python untuk menyusun dan mengatur enjin SQL daripada shell. Dengan menulis semula kod kami sebagai ungkapan Ibis, kami boleh membina saluran paip data kami secara deklaratif dengan pelaksanaan tertunda. Selain itu, Ibis menyokong lebih 20 hujung belakang, jadi kami boleh menulis kod sekali dan mengalihkan ibis.exprs kami ke berbilang hujung belakang. Untuk memudahkan lagi, kami menyerahkan penjadualan dan orkestrasi tugas2 yang disediakan oleh Dagster, terpulang kepada pembaca.

Konsep Teras Timbunan Data Berbilang Enjin

Berikut ialah konsep teras susunan data berbilang enjin seperti yang digariskan dalam surat berita Julien:

Declarative Multi-Engine Data Stack with Ibis

  1. Timbunan Data Berbilang Enjin: Konsep ini melibatkan penggabungan enjin data yang berbeza seperti Snowflake, Spark, DuckDB dan BigQuery. Pendekatan ini bertujuan untuk mengurangkan kos, mengehadkan penguncian vendor dan meningkatkan fleksibiliti. Julien menyebut bahawa untuk pertanyaan penanda aras tertentu, menggunakan DuckDB boleh mencapai pengurangan kos yang ketara berbanding Snowflake.
  2. Pembangunan Lapisan Pertanyaan Rentas Enjin: Surat berita menyerlahkan kemajuan dalam teknologi yang membolehkan pasukan data mentranspile kod SQL atau Bingkai Data mereka dari satu enjin ke enjin yang lain dengan lancar. Perkembangan ini penting untuk mengekalkan kecekapan merentas enjin yang berbeza.
  3. Penggunaan Apache Iceberg dan Alternatif: Walaupun Apache Iceberg dilihat sebagai lapisan storan bersatu yang berpotensi, integrasinya masih belum matang untuk digunakan dalam projek dbt. Sebaliknya, Julien telah memilih untuk menggunakan fail Parket yang disimpan dalam S3, diakses oleh DuckDB dan Snowflake, dalam Bukti Konsep (PoC) beliau.
  4. Orkestrasi dan Enjin dalam PoC: Untuk projek itu, Julien menggunakan Dagster sebagai orkestra, yang memudahkan penjadualan kerja enjin yang berbeza dalam projek dbt. Enjin yang digabungkan dalam PoC ini ialah DuckDB dan Snowflake.

Mengapa DataFrames dan Ibis?

Walaupun saluran paip di atas bagus untuk ETL dan ELT, kadangkala kami mahukan kuasa bahasa pengaturcaraan penuh dan bukannya Bahasa Pertanyaan seperti SQL cth. penyahpepijatan, ujian, UDF kompleks dsb. Untuk penerokaan saintifik, pengkomputeran interaktif adalah penting kerana saintis data perlu mengulangi kod mereka dengan cepat, memvisualisasikan keputusan dan membuat keputusan berdasarkan data.

DataFrames ialah struktur data sedemikian: DataFrames digunakan untuk memproses data tersusun dan menggunakan operasi pengiraan padanya secara interaktif. Mereka menyediakan fleksibiliti untuk dapat memproses data besar dengan operasi gaya SQL, tetapi juga menyediakan kawalan tahap yang lebih rendah untuk mengedit perubahan tahap sel ala Helaian Excel. Biasanya, jangkaan ialah semua data diproses dalam ingatan dan lazimnya sesuai dalam ingatan. Selain itu, DataFrames memudahkan untuk berulang-alik antara mod tertunda/kelompok dan interaktif.

DataFrames excel^[no pun intended] untuk membolehkan orang ramai menggunakan fungsi yang ditentukan pengguna dan membebaskan pengguna daripada batasan SQL iaitu anda kini boleh menggunakan semula kod, menguji operasi anda, dengan mudah memanjangkan jentera hubungan untuk operasi yang kompleks. DataFrames juga memudahkan untuk beralih daripada perwakilan Jadual data ke dalam Tatasusunan dan Tensor yang dijangkakan oleh perpustakaan Pembelajaran Mesin dengan cepat.

Pangkalan data khusus dan dalam proses cth. DuckDB untuk OLAP3, mengaburkan sempadan antara pangkalan data berat terpencil seperti Snowflake dan perpustakaan ergonomik seperti panda. Kami percaya ini adalah peluang untuk membenarkan DataFrames memproses lebih besar daripada data memori sambil mengekalkan jangkaan interaktiviti dan rasa pembangun shell Python tempatan, menjadikan data yang lebih besar daripada memori terasa kecil.

Teknikal Selam Dalam

Pelaksanaan kami memberi tumpuan kepada 4 konsep yang dibentangkan sebelum ini:

  1. Timbunan Data Berbilang Enjin: Kami akan menggunakan DuckDB, panda dan Snowflake sebagai enjin kami.
  2. Lapisan Pertanyaan Enjin Silang: Kami akan menggunakan Ibis untuk menulis ungkapan kami dan menyusunnya untuk dijalankan pada DuckDB, panda dan Snowflake.
  3. Apache Iceberg dan Alternatif: Kami akan menggunakan fail Parket yang disimpan secara setempat sebagai lapisan storan kami dengan jangkaan ia tidak penting untuk dilanjutkan ke S3 menggunakan pakej s3fs.
  4. Orkestrasi dan Enjin dalam PoC: Kami akan menumpukan pada penjadualan terperinci untuk enjin dan menyerahkan orkestrasi kepada pembaca. Penjadualan terperinci lebih sejajar dengan Ray, Dask, PySpark berbanding rangka kerja orkestra cth. Dagster, Aliran Udara dsb.

Melaksanakan dengan panda

Declarative Multi-Engine Data Stack with Ibis
panda ialah perpustakaan DataFrame yang paling penting dan mungkin menyediakan cara paling mudah untuk melaksanakan aliran kerja di atas. Pertama, kami menjana peminjaman data rawak daripada pelaksanaan dalam surat berita.

#| echo: false
import pandas as pd
from multi_engine_stack_ibis.generator import generate_random_data
generate_random_data("landing/orders.parquet")
df = pd.read_parquet("landing/orders.parquet")
deduped = df.drop_duplicates(["order_id", "dt"])

Pelaksanaan panda adalah penting dalam gaya dan direka bentuk supaya data yang boleh dimuatkan dalam ingatan. API panda sukar dikompilasi ke SQL dengan semua nuansanya dan sebahagian besarnya terletak di tempat istimewanya yang menggabungkan visualisasi Python, plot, pembelajaran mesin, AI dan perpustakaan pemprosesan yang kompleks.

pt.write_pandas(
    conn,
    deduped,
    table_name="T_ORDERS",
    auto_create_table=True,
    quote_identifiers=False,
    table_type="temporary"
)

Selepas menyahgandakan menggunakan operator panda, kami bersedia untuk menghantar data ke Snowflake. Kepingan salji mempunyai kaedah yang dipanggil write_pandas yang berguna untuk kes penggunaan kami.

Melaksanakan dengan Ibis aka Ibisify

Satu had panda ialah ia mempunyai API sendiri yang tidak cukup memetakan kembali kepada algebra hubungan. Ibis ialah perpustakaan yang benar-benar dibina oleh orang yang membina panda untuk menyediakan sistem ekspresi waras yang boleh dipetakan kembali ke beberapa bahagian belakang SQL. Ibis mengambil inspirasi daripada pakej dplyr R untuk membina sistem ekspresi baharu yang boleh dengan mudah memetakan kembali kepada algebra hubungan dan dengan itu menyusun kepada SQL. Ia juga adalah gaya deklaratif, membolehkan kami menggunakan pengoptimuman gaya pangkalan data pada pelan logik lengkap atau ungkapan. Ibis ialah komponen utama untuk membolehkan kebolehkomposisian seperti yang diserlahkan dalam codex boleh gubah yang sangat baik.

#| echo: false
import pathlib

import ibis
import ibis.backends.pandas.executor
import ibis.expr.types.relations
from ibis import _

from multi_engine_stack_ibis.generator import generate_random_data
from multi_engine_stack_ibis.utils import (MyExecutor, checkpoint_parquet,
                                           create_table_snowflake,
                                           replace_unbound)
from multi_engine_stack_ibis.connections import make_ibis_snowflake_connection



ibis.backends.pandas.executor.PandasExecutor = MyExecutor
setattr(ibis.expr.types.relations.Table, "checkpoint_parquet", checkpoint_parquet)
setattr(
    ibis.expr.types.relations.Table,
    "create_table_snowflake",
    create_table_snowflake,
)
ibis.set_backend("pandas")
p_staging = pathlib.Path("staging/staging.parquet")
p_landing = pathlib.Path("landing/orders.parquet")

snow_backend = make_ibis_snowflake_connection(database="MULTI_ENGINE", schema="PUBLIC", warehouse="COMPUTE_WH")
expr = (
  ibis.read_parquet(p_landing)
  .mutate(
      row_number=ibis.row_number().over(group_by=[_.order_id], order_by=[_.dt]))
  .filter(_.row_number == 0)
  .checkpoint_parquet(p_staging)
  .create_table_snowflake("T_ORDERS")
)
expr

Ekspresi Ibis mencetak dirinya sebagai pelan yang serupa dengan Pelan Logik tradisional dalam pangkalan data. Pelan Logik ialah pokok pengendali algebra hubungan yang menerangkan pengiraan yang perlu dilakukan. Pelan ini kemudiannya dioptimumkan oleh pengoptimum pertanyaan dan ditukar kepada pelan fizikal yang dilaksanakan oleh pelaksana pertanyaan. Ungkapan Ibis adalah serupa dengan Rancangan Logik kerana ia menerangkan pengiraan yang perlu dilakukan, tetapi ia tidak dilaksanakan serta-merta. Sebaliknya, mereka disusun ke dalam SQL dan dilaksanakan pada bahagian belakang apabila diperlukan. Pelan Logik secara amnya berada pada tahap butiran yang lebih tinggi daripada DAG yang dihasilkan oleh rangka kerja penjadualan tugas seperti Dask. Secara teorinya, rancangan ini boleh disusun ke dalam DAG Dask.

While pandas is embedded and is just a pip install away, it still has much documented limitations with plenty of performance improvements left on the table. This is where the recent embedded databases like DuckDB fill the gap of packing the full punch of a SQL engine, with all of its optimizations and benefiting from years of research that is as easy to import as is pandas. In this world, at minimum we can delegate all relational and SQL parts of our pipeline in pandas to DuckDB and only get the processed data ready for complex user defined Python.

Now, we are ready to take our Ibisified code and compile our expression above to execute on arbitrary engines, to truly realize the write-once-run-anywhere paradigm: We have successfully decoupled our compute engine with the expression system describing our computation.

Multi-Engine Data Stack w/ Ibis

DuckDB + pandas + Snowflake

Let's break our expression above into smaller parts and have them run across DuckDB, pandas and Snowflake. Note that we are not doing anything once the data lands in Snowflake and just show that we can select the data. Instead, we are leaving that up to the user's imagination what is possible with Snowflake native features.

Notice our expression above is bound to the pandas backend. First, lets create an UnboundTable expression to not have to depend on a backend when writing our expressions.

Declarative Multi-Engine Data Stack with Ibis

schema = {
    "user_id": "int64",
    "dt": "timestamp",
    "order_id": "string",
    "quantity": "int64",
    "purchase_price": "float64",
    "sku": "string",
    "row_number": "int64",
}

first_expr_for = (
    ibis.table(schema, name="orders")
    .mutate(
        row_number=ibis.row_number().over(group_by=[_.order_id], order_by=[_.dt])
    )
    .filter(_.row_number == 0)
)
first_expr_for

Next, we replace the UnboundTable expression with the DuckDB backend and execute it with to_parquet method4. This step is covered by the checkpoint_parquet operator that we added to pandas backend above. Here is an excellent blog that discusses inserting data into Snowflake from any Ibis backend with to_pyarrow functionality.

data = pd.read_parquet("landing/orders.parquet")
duck_backend = ibis.duckdb.connect()
duck_backend.con.execute("CREATE TABLE orders as SELECT * from data")

bind_to_duckdb = replace_unbound(first_expr_for, duck_backend) 
bind_to_duckdb.to_parquet(p_staging)
to_sql = ibis.to_sql(bind_to_duckdb)
print(to_sql)

Once the above step creates the de-duplicated table, we can then send data to Snowflake using the pandas backend. This functionality is covered by create_table_snowflake operator that we added to pandas backend above.

second_expr_for = ibis.table(schema, name="T_ORDERS") # nothing special just a reading the data from orders table
snow_backend.create_table("T_ORDERS", schema=second_expr_for.schema(), temp=True)
pandas_backend = ibis.pandas.connect({"T_ORDERS": pd.read_parquet(p_staging)})
snow_backend.insert("T_ORDERS", pandas_backend.to_pyarrow(second_expr_for))

Finally, we can select the data from the Snowflake table to verify that the data has been loaded successfully.

third_expr_for = ibis.table(schema, name="T_ORDERS") # add you Snowflake ML functions here
third_expr_for

Declarative Multi-Engine Data Stack with Ibis

We successfully broke up our computation in pieces, albeit manually, and executed them across DuckDB, pandas, and Snowflake. This demonstrates the flexibility and power of a multi-engine data stack, allowing users to leverage the strengths of different engines to optimize their data processing pipelines.

Acknowledgments

I'd like to thank Neal Richardson, Dan Lovell and Daniel Mesejo for providing the initial feedback on the post. I highly appreciate the early review and encouragement by Wes McKinney.

Resources

  • The Road to Composable Data Systems
  • The Composable Codex
  • Apache Arrow
  • Multi-Engine Data Stack Newsleter v0 v1
  • Ibis, the portable dataframe library
  • dbt Docs
  • Dagster Docs
  • LanceDB
  • KuzuDB
  • DuckDB

  1. In this post, we have primarily focused on v0 of the multi-engine data stack. In the latest version, Apache Iceberg is included as a storage and data format layer. NYC Taxi data is used instead of the random Orders data treated in this and v0 of the posts.  ↩

  2. Orchestration Vs fine-grained scheduling: ↩

    • The orchestration is left to the reader. The orchestration can be done using a tool like Dagster, Prefect, or Apache Airflow.
    • The fine-grained scheduling can be done using a tool like Dask, Ray, or Spark.
  3. Some of the examples of in-process databases is described in this post extending DuckDB example above to newer purpose built databases like LanceDB and KuzuDB.  ↩

  4. The Ibis docs use backend.to_pandas(expr) commands to bind and run the expression in the same go. Instead, we use replace_unbound method to show a generic way to just compile the expression and not execute it to said backend. This is just for illustration purposes. All the code below, uses the backend.to_pyarrow methods from here on. ↩

Atas ialah kandungan terperinci Timbunan Data Berbilang Enjin Deklaratif dengan Ibis. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn