Maison >développement back-end >Tutoriel Python >Explorer les opérations de données avec PySpark, Pandas, DuckDB, Polars et DataFusion dans un bloc-notes Python
Jurutera dan saintis data sering bekerja dengan pelbagai alatan untuk mengendalikan jenis operasi data yang berbeza—daripada pemprosesan teragih berskala besar kepada manipulasi data dalam memori. Imej Docker alexmerced/spark35nb memudahkan perkara ini dengan menawarkan persekitaran pra-konfigurasi yang membolehkan anda mencuba berbilang alatan data popular, termasuk PySpark, Pandas, DuckDB, Polars dan DataFusion.
Dalam blog ini, kami akan membimbing anda melalui penyediaan persekitaran ini dan menunjukkan cara melaksanakan operasi data asas seperti menulis data, memuatkan data dan melaksanakan pertanyaan dan pengagregatan menggunakan alatan ini. Sama ada anda berurusan dengan set data yang besar atau hanya perlu memanipulasi data dalam memori yang kecil, anda akan melihat bagaimana perpustakaan berbeza ini boleh saling melengkapi.
Untuk bermula, anda perlu menarik imej Docker alexmerced/spark35nb daripada Docker Hub. Imej ini disertakan dengan persekitaran prakonfigurasi yang termasuk Spark 3.5.2, JupyterLab dan banyak perpustakaan manipulasi data popular seperti Pandas, DuckDB dan Polars.
Jalankan arahan berikut untuk menarik imej:
docker pull alexmerced/spark35nb
Seterusnya, jalankan bekas menggunakan arahan berikut:
docker run -p 8888:8888 -p 4040:4040 -p 7077:7077 -p 8080:8080 -p 18080:18080 -p 6066:6066 -p 7078:7078 -p 8081:8081 alexmerced/spark35nb
Setelah bekas siap dan berjalan, buka penyemak imbas anda dan navigasi ke localhost:8888 untuk mengakses JupyterLab, tempat anda akan melaksanakan semua operasi data anda.
Sekarang anda telah menyediakan persekitaran anda, kami boleh meneruskan untuk melaksanakan beberapa operasi data asas menggunakan PySpark, Pandas, DuckDB, Polars dan DataFusion.
PySpark ialah API Python untuk Apache Spark, enjin sumber terbuka yang direka untuk pemprosesan data berskala besar dan pengkomputeran teragih. Ia membolehkan anda bekerja dengan data besar dengan mengedarkan data dan pengiraan merentas kluster. Walaupun Spark biasanya dijalankan dalam kelompok teragih, persediaan ini membolehkan anda menjalankannya secara setempat pada satu nod—sempurna untuk pembangunan dan ujian.
Menggunakan PySpark, anda boleh melakukan manipulasi data, pertanyaan SQL, pembelajaran mesin dan banyak lagi, semuanya dalam rangka kerja yang mengendalikan data besar dengan cekap. Dalam bahagian ini, kami akan membincangkan cara menulis dan membuat pertanyaan data menggunakan PySpark dalam persekitaran JupyterLab.
Mari mulakan dengan mencipta set data ringkas dalam PySpark. Mula-mula, mulakan sesi Spark, yang diperlukan untuk berinteraksi dengan fungsi Spark. Kami akan mencipta DataFrame kecil dengan data sampel dan memaparkannya.
from pyspark.sql import SparkSession # Initialize the Spark session spark = SparkSession.builder.appName("PySpark Example").getOrCreate() # Sample data: a list of tuples containing names and ages data = [("Alice", 34), ("Bob", 45), ("Catherine", 29)] # Create a DataFrame df = spark.createDataFrame(data, ["Name", "Age"]) # Show the DataFrame df.show()
Dalam contoh ini, kami mencipta DataFrame dengan tiga baris data, yang mewakili nama dan umur orang. Fungsi df.show() membolehkan kami memaparkan kandungan DataFrame, menjadikannya mudah untuk memeriksa data yang baru kami buat.
Seterusnya, mari muatkan set data daripada fail dan jalankan beberapa pertanyaan asas. PySpark boleh mengendalikan pelbagai format fail, termasuk CSV, JSON dan Parket.
Untuk contoh ini, mari kita anggap kita mempunyai fail CSV dengan lebih banyak data tentang orang, yang akan kita muatkan ke dalam DataFrame. Kemudian kami akan menunjukkan pertanyaan penapis dan pengagregatan mudah untuk mengira bilangan orang dalam setiap kumpulan umur.
# Load a CSV file into a DataFrame df_csv = spark.read.csv("data/people.csv", header=True, inferSchema=True) # Show the first few rows of the DataFrame df_csv.show() # Filter the data to only include people older than 30 df_filtered = df_csv.filter(df_csv["Age"] > 30) # Show the filtered DataFrame df_filtered.show() # Group by Age and count the number of people in each age group df_grouped = df_csv.groupBy("Age").count() # Show the result of the grouping df_grouped.show()
Dalam contoh ini, kami memuatkan fail CSV ke dalam PySpark DataFrame menggunakan spark.read.csv(). Kemudian, kami menggunakan dua operasi berbeza:
Dengan PySpark, anda boleh melakukan pertanyaan dan pengagregatan yang lebih kompleks pada set data yang besar, menjadikannya alat untuk pemprosesan data besar.
Dalam bahagian seterusnya, kami akan meneroka Panda, yang bagus untuk operasi data dalam memori yang lebih kecil yang tidak memerlukan pemprosesan teragih.
Panda ialah salah satu perpustakaan Python yang paling banyak digunakan untuk manipulasi dan analisis data. Ia menyediakan struktur data yang mudah digunakan, seperti DataFrames, yang membolehkan anda bekerja dengan data jadual dengan cara yang intuitif. Tidak seperti PySpark, yang direka untuk pemprosesan data teragih berskala besar, Pandas berfungsi dalam ingatan, menjadikannya sesuai untuk set data bersaiz kecil hingga sederhana.
With Pandas, you can read and write data from various formats, including CSV, Excel, and JSON, and perform common data operations like filtering, aggregating, and merging data with simple and readable syntax.
Let’s start by loading a dataset into a Pandas DataFrame. We’ll read a CSV file, which is a common file format for data storage, and display the first few rows.
import pandas as pd # Load a CSV file into a Pandas DataFrame df_pandas = pd.read_csv("data/people.csv") # Display the first few rows of the DataFrame print(df_pandas.head())
In this example, we read the CSV file people.csv using pd.read_csv() and loaded it into a Pandas DataFrame. The head() method lets you view the first few rows of the DataFrame, which is useful for quickly inspecting the data.
Now that we have loaded the data, let’s perform some basic operations, such as filtering rows and grouping data. Pandas allows you to apply these operations easily with simple Python syntax.
# Filter the data to show only people older than 30 df_filtered = df_pandas[df_pandas["Age"] > 30] # Display the filtered data print(df_filtered) # Group the data by 'Age' and count the number of people in each age group df_grouped = df_pandas.groupby("Age").count() # Display the grouped data print(df_grouped)
Here, we filtered the data to include only people older than 30 using a simple boolean expression. Then, we used the groupby() function to group the DataFrame by age and count the number of people in each age group.
Pandas is incredibly efficient for in-memory data operations, making it a go-to tool for smaller datasets that can fit in your machine's memory. In the next section, we’ll explore DuckDB, a SQL-based tool that enables fast querying over in-memory data.
DuckDB is an in-memory SQL database management system (DBMS) designed for analytical workloads. It offers high-performance, efficient querying of datasets directly within your Python environment. DuckDB is particularly well-suited for performing complex SQL queries on structured data, like CSVs or Parquet files, without needing to set up a separate database server.
DuckDB is lightweight, yet powerful, and can be used as an alternative to tools like SQLite, especially when working with analytical queries on large datasets.
DuckDB can easily integrate with Pandas, allowing you to transfer data from a Pandas DataFrame into DuckDB for SQL-based queries. Here’s how to create a table in DuckDB using the data from Pandas.
import duckdb # Connect to an in-memory DuckDB instance conn = duckdb.connect() # Create a table in DuckDB from the Pandas DataFrame conn.execute("CREATE TABLE people AS SELECT * FROM df_pandas") # Show the content of the 'people' table conn.execute("SELECT * FROM people").df()
In this example, we connected to DuckDB and created a new table people from the Pandas DataFrame df_pandas. DuckDB’s execute() function allows you to run SQL commands, making it easy to interact with data using SQL queries.
Once your data is loaded into DuckDB, you can run SQL queries to filter, aggregate, and analyze your data. DuckDB supports a wide range of SQL functionality, making it ideal for users who prefer SQL over Python for data manipulation.
# Query to select people older than 30 result = conn.execute("SELECT Name, Age FROM people WHERE Age > 30").df() # Display the result of the query print(result) # Query to group people by age and count the number of people in each age group result_grouped = conn.execute("SELECT Age, COUNT(*) as count FROM people GROUP BY Age").df() # Display the grouped result print(result_grouped)
In this example, we used SQL to filter the people table, selecting only those who are older than 30. We then ran a grouping query to count the number of people in each age group.
DuckDB is an excellent choice when you need SQL-like functionality directly in your Python environment. It allows you to leverage the power of SQL without the overhead of setting up and managing a database server. In the next section, we will explore Polars, a DataFrame library known for its speed and efficiency.
Polars is a DataFrame library designed for high-performance data manipulation. It’s known for its speed and efficiency, particularly when compared to libraries like Pandas. Polars is written in Rust and uses an optimized query engine to handle large datasets quickly and with minimal memory usage. It also provides a similar interface to Pandas, making it easy to learn and integrate into existing Python workflows.
Polars is particularly well-suited for processing large datasets that might not fit into memory as easily or for scenarios where performance is a critical factor.
Let’s start by creating a Polars DataFrame from a Python dictionary. We’ll then perform some basic operations like filtering and aggregating data.
import polars as pl # Create a Polars DataFrame df_polars = pl.DataFrame({ "Name": ["Alice", "Bob", "Catherine"], "Age": [34, 45, 29] }) # Display the Polars DataFrame print(df_polars)
In this example, we created a Polars DataFrame using a Python dictionary. The syntax is similar to Pandas, but the operations are optimized for speed. Polars offers lazy evaluation, which means it can optimize the execution of multiple operations at once, reducing computation time.
Now, let’s perform some common data operations such as filtering and aggregating the data. These operations are highly optimized in Polars and can be done using a simple and expressive syntax.
# Filter the DataFrame to show only people older than 30 df_filtered = df_polars.filter(pl.col("Age") > 30) # Display the filtered DataFrame print(df_filtered) # Group by 'Age' and count the number of people in each age group df_grouped = df_polars.groupby("Age").count() # Display the grouped result print(df_grouped)
In this example, we filtered the data to show only rows where the age is greater than 30, and then we grouped the data by age to count how many people are in each group. These operations are highly efficient in Polars due to its optimized memory management and query execution engine.
Polars is ideal when you need the speed of a DataFrame library for both small and large datasets, and when performance is a key requirement. Next, we will explore DataFusion, a tool for SQL-based querying over Apache Arrow data.
DataFusion is an in-memory query execution engine built on top of Apache Arrow, an efficient columnar memory format for analytics. It provides a powerful SQL engine that allows users to run complex queries over structured data stored in Arrow format. DataFusion is part of the Apache Arrow ecosystem, which aims to provide fast data interoperability across different data processing tools.
DataFusion is particularly well-suited for scenarios where you need to query large in-memory datasets using SQL without the overhead of traditional databases. Its integration with Arrow ensures that the data processing is both fast and memory-efficient.
DataFusion allows you to execute SQL queries on in-memory data using Apache Arrow. Let’s first create a DataFrame using DataFusion and then perform a few SQL queries on it.
from datafusion import SessionContext # Initialize a DataFusion session ctx = SessionContext() # Create a DataFrame with some data data = [ {"Name": "Alice", "Age": 34}, {"Name": "Bob", "Age": 45}, {"Name": "Catherine", "Age": 29} ] # Register the DataFrame as a table df = ctx.create_dataframe(data) ctx.register_table("people", df) # Query the data to select people older than 30 result = ctx.sql("SELECT Name, Age FROM people WHERE Age > 30").collect() # Display the result print(result)
In this example, we used DataFusion’s SessionContext to create a DataFrame and registered it as a table. We then performed a simple SQL query to filter the data for people older than 30. DataFusion allows you to combine the power of SQL with the speed and efficiency of Apache Arrow’s in-memory format.
Just like in DuckDB, we can perform aggregation queries to group data by a specific field and count the number of records in each group. Let’s see how this works in DataFusion.
# Group by 'Age' and count the number of people in each age group result_grouped = ctx.sql("SELECT Age, COUNT(*) as count FROM people GROUP BY Age").collect() # Display the grouped result print(result_grouped)
In this query, we grouped the data by the 'Age' column and counted how many people were in each age group. DataFusion’s SQL execution engine ensures that queries run efficiently, even on large datasets stored in-memory.
DataFusion is a great tool for users who need fast, SQL-based querying of large in-memory datasets and want to take advantage of Apache Arrow’s high-performance columnar data format. It’s particularly useful for building analytical pipelines that involve heavy querying of structured data.
Dremio is a powerful data lakehouse platform that helps organizations unify and query their data from various sources. It enables users to easily govern, join, and accelerate queries on their data without the need for expensive and complex data warehouse infrastructures. Dremio's ability to access and query data directly from formats like Apache Iceberg, Delta Lake, S3, RDBMS, and JSON files, along with its performance enhancements, reduces the workload on traditional data warehouses.
Dremio is built on top of Apache Arrow, a high-performance columnar in-memory format, and utilizes Arrow Flight to accelerate the transmission of large datasets over the network. This integration provides blazing-fast query performance while enabling interoperability between various analytics tools.
In this section, we will demonstrate how to set up Dremio in a Docker container and use Python to query Dremio's data sources using the dremio-simple-query library.
To run Dremio on your local machine, use the following Docker command:
docker run -p 9047:9047 -p 31010:31010 -p 45678:45678 -p 32010:32010 -e DREMIO_JAVA_SERVER_EXTRA_OPTS=-Dpaths.dist=file:///opt/dremio/data/dist --name try-dremio dremio/dremio-oss
Once Dremio is up and running, navigate to http://localhost:9047 in your browser to access the Dremio UI. Here, you can configure your data sources, create virtual datasets, and explore the platform's capabilities.
The dremio-simple-query library allows you to query Dremio using Apache Arrow Flight, providing a high-performance interface for fetching and analyzing data from Dremio sources. With this library, you can easily convert Dremio queries into Pandas, Polars, or DuckDB DataFrames, or work directly with Apache Arrow data.
Here’s how to get started:
Make sure you have the dremio-simple-query library installed (It is pre-installed on the alexmerced/spark35nb image). You can install it using pip:
pip install dremio-simple-query
You’ll need your Dremio credentials to retrieve a token and establish a connection. Here’s a basic example:
from dremio_simple_query.connect import get_token, DremioConnection from os import getenv from dotenv import load_dotenv # Load environment variables (TOKEN and ARROW_ENDPOINT) load_dotenv() # Login to Dremio and get a token login_endpoint = "http://{host}:9047/apiv2/login" payload = { "userName": "your_username", "password": "your_password" } token = get_token(uri=login_endpoint, payload=payload) # Dremio Arrow Flight endpoint, make sure to put in the right host for your Dremio instance arrow_endpoint = "grpc://{host}:32010" # Establish connection to Dremio using Arrow Flight dremio = DremioConnection(token, arrow_endpoint)
If you are running this locally using the docker run command, the host should be the IP address of the Dremio container on the docker network which you can find by running docker inspect.
In this code, we use the get_token function to retrieve an authentication token from Dremio's REST API and establish a connection to Dremio's Arrow Flight endpoint.
Once connected, you can use the connection to query Dremio and retrieve results in different formats, including Arrow, Pandas, Polars, and DuckDB. Here’s how:
# Query Dremio and return data as an Apache Arrow Table stream = dremio.toArrow("SELECT * FROM my_table;") arrow_table = stream.read_all() # Display Arrow Table print(arrow_table)
# Query Dremio and return data as a Pandas DataFrame df = dremio.toPandas("SELECT * FROM my_table;") print(df)
# Query Dremio and return data as a Polars DataFrame df_polars = dremio.toPolars("SELECT * FROM my_table;") print(df_polars)
# Query Dremio and return as a DuckDB relation duck_rel = dremio.toDuckDB("SELECT * FROM my_table") # Perform a query on the DuckDB relation result = duck_rel.query("my_table", "SELECT * FROM my_table WHERE Age > 30").fetchall() # Display results print(result)
With the dremio-simple-query library, you can efficiently query large datasets from Dremio and immediately start analyzing them with various tools like Pandas, Polars, and DuckDB, all while leveraging the high-performance Apache Arrow format under the hood.
Dremio provides several benefits that make it a powerful addition to your data stack:
Governance: Centralize governance over all your data sources, ensuring compliance and control.
Data Federation: Join data across various sources, such as Iceberg, Delta Lake, JSON, CSV, and relational databases, without moving the data.
Performance: Accelerate your queries with the help of Dremio's query acceleration features and Apache Arrow Flight.
Cost Savings: By offloading workloads from traditional data warehouses, Dremio can reduce infrastructure costs.
Dremio's close relationship with Apache Arrow ensures that your queries are both fast and efficient, allowing you to seamlessly integrate various data sources and tools into your analytics workflows.
In this blog, we explored how to use a variety of powerful tools for data operations within a Python notebook environment. Starting with the alexmerced/spark35nb Docker image, we demonstrated how to set up a development environment that includes PySpark, Pandas, DuckDB, Polars, and DataFusion—each optimized for different data processing needs. We showcased basic operations like writing, querying, and aggregating data using each tool’s unique strengths.
Finally, we introduced Dremio, which integrates with Apache Arrow to enable lightning-fast queries across a range of data sources. With the dremio-simple-query library, Dremio allows analysts to quickly fetch and analyze data using tools like Pandas, Polars, and DuckDB, ensuring that data is available when and where it's needed without the overhead of traditional data warehouses.
Whether you're working with small datasets or handling massive amounts of data in distributed environments, this setup provides a versatile, efficient, and scalable platform for any data engineering or data science project. By leveraging these tools together, you can cover the full spectrum of data processing, from exploration to large-scale analytics, with minimal setup and maximum performance.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!