Heim >Backend-Entwicklung >Python-Tutorial >Verarbeiten Sie große Datensätze mit Python PySpark
In diesem Tutorial werden wir die leistungsstarke Kombination von Python und PySpark für die Verarbeitung großer Datenmengen erkunden. PySpark ist eine Python-Bibliothek, die eine Schnittstelle zu Apache Spark bereitstellt, einem schnellen und vielseitigen Cluster-Computing-System. Durch die Nutzung von PySpark können wir Daten effizient auf eine Reihe von Maschinen verteilen und verarbeiten, sodass wir große Datensätze problemlos verarbeiten können.
In diesem Artikel befassen wir uns mit den Grundlagen von PySpark und zeigen, wie man verschiedene Datenverarbeitungsaufgaben an großen Datensätzen durchführt. Wir behandeln Schlüsselkonzepte wie RDDs (Resilient Distributed Datasets) und Datenrahmen und zeigen ihre praktische Anwendung anhand von Schritt-für-Schritt-Beispielen. Durch das Studium dieses Tutorials verfügen Sie über ein solides Verständnis dafür, wie Sie PySpark effektiv zur Verarbeitung und Analyse großer Datensätze nutzen können.
In diesem Abschnitt richten wir die Entwicklungsumgebung ein und machen uns mit den Grundkonzepten von PySpark vertraut. Wir behandeln die Installation von PySpark, die Initialisierung einer SparkSession und das Laden von Daten in RDDs und DataFrames. Beginnen wir mit der Installation von PySpark:
# Install PySpark !pip install pyspark
Collecting pyspark ... Successfully installed pyspark-3.1.2
Nach der Installation von PySpark können wir eine SparkSession initialisieren, um eine Verbindung zu unserem Spark-Cluster herzustellen:
from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("LargeDatasetProcessing").getOrCreate()
Da unsere SparkSession bereit ist, können wir jetzt Daten in RDDs oder DataFrames laden. RDDs sind die grundlegende Datenstruktur in PySpark, die eine verteilte Sammlung von Elementen bereitstellt. DataFrames hingegen organisieren Daten in benannten Spalten, ähnlich wie Tabellen in relationalen Datenbanken. Laden wir eine CSV-Datei als DataFrame:
# Load a CSV file as a DataFrame df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)
+---+------+--------+ |id |name |age | +---+------+--------+ |1 |John |32 | |2 |Alice |28 | |3 |Bob |35 | +---+------+--------+
Wie Sie dem obigen Codeausschnitt entnehmen können, verwenden wir die Methode „read.csv()“, um die CSV-Datei in einen Datenrahmen einzulesen. Der Parameter „header=True“ bedeutet, dass die erste Zeile Spaltennamen enthält, und „inferSchema=True“ leitet automatisch den Datentyp jeder Spalte ab.
In diesem Abschnitt werden wir verschiedene Datentransformations- und Analysetechniken mit PySpark untersuchen. Wir behandeln Vorgänge wie das Filtern, Aggregieren und Zusammenführen von Datensätzen. Beginnen wir damit, die Daten nach bestimmten Kriterien zu filtern:
# Filter data filtered_data = df.filter(df["age"] > 30)
+---+----+---+ |id |name|age| +---+----+---+ |1 |John|32 | |3 |Bob |35 | +---+----+---+
Im obigen Codeausschnitt verwenden wir die Methode „filter()“, um Zeilen auszuwählen, in denen die Spalte „Alter“ größer als 30 ist. Mit dieser Operation können wir relevante Teilmengen aus großen Datensätzen extrahieren.
Als nächstes führen wir eine Aggregation des Datensatzes mit den Methoden „groupBy()“ und „agg()“ durch:
# Aggregate data aggregated_data = df.groupBy("gender").agg({"salary": "mean", "age": "max"})
+------+-----------+--------+ |gender|avg(salary)|max(age)| +------+-----------+--------+ |Male |2500 |32 | |Female|3000 |35 | +------+-----------+--------+
Hier gruppieren wir die Daten nach der Spalte „Geschlecht“ und berechnen das Durchschnittsgehalt und das Höchstalter für jede Gruppe. Der resultierende Datenrahmen „aggreated_data“ liefert uns wertvolle Einblicke in den Datensatz.
Neben der Filterung und Aggregation ermöglicht uns PySpark auch die effiziente Verknüpfung mehrerer Datensätze. Betrachten wir ein Beispiel, in dem wir zwei DataFrames haben: „df1“ und „df2“. Wir können sie anhand einer gemeinsamen Kolumne verbinden:
# Join two DataFrames joined_data = df1.join(df2, on="id", how="inner")
+---+----+---------+------+ |id |name|department|salary| +---+----+---------+------+ |1 |John|HR |2500 | |2 |Alice|IT |3000 | |3 |Bob |Sales |2000 | +---+----+---------+------+
Mit der Methode „join()“ können wir DataFrames basierend auf den gemeinsamen Spalten zusammenführen, die durch den Parameter „on“ angegeben werden. Je nach Bedarf können wir verschiedene Verbindungsarten wählen, wie zum Beispiel „innen“, „außen“, „links“ oder „rechts“.
In diesem Abschnitt werden wir die fortschrittliche PySpark-Technologie untersuchen, um unsere Datenverarbeitungsfähigkeiten weiter zu verbessern. Wir behandeln Themen wie benutzerdefinierte Funktionen (UDFs), Fensterfunktionen und Caching. Beginnen wir mit der Definition und Verwendung von UDFs:
from pyspark.sql.functions import udf # Define a UDF def square(x): return x ** 2 # Register the UDF square_udf = udf(square) # Apply the UDF to a column df = df.withColumn("age_squared", square_udf(df["age"]))
+---+------+---+------------+ |id |name |age|age_squared | +---+------+---+------------+ |1 |John |32 |1024 | |2 |Alice |28 |784 | |3 |Bob |35 |1225 | +---+------+---+------------+
Im obigen Codeausschnitt haben wir eine einfache UDF-Funktion namens „square()“ definiert, die zum Quadrieren der gegebenen Eingabe verwendet wird. Anschließend registrieren wir diese UDF mit der Funktion „udf()“ und wenden sie auf die Spalte „Alter“ an, wodurch eine neue Spalte mit dem Namen „age_squared“ in unserem DataFrame erstellt wird.
PySpark bietet außerdem leistungsstarke Fensterfunktionen, mit denen wir Berechnungen innerhalb eines bestimmten Fensterbereichs durchführen können. Betrachten wir die vorherige und die nächste Zeile, um das Durchschnittsgehalt pro Mitarbeiter zu berechnen:
from pyspark.sql.window import Window from pyspark.sql.functions import lag, lead, avg # Define the window window = Window.orderBy("id") # Calculate average salary with lag and lead df = df.withColumn("avg_salary", (lag(df["salary"]).over(window) + lead(df["salary"]).over(window) + df["salary"]) / 3)
+---+----+---------+------+----------+ |id |name|department|salary|avg_salary| +---+----+---------+------+----------+ |1 |John|HR |2500 |2666.6667 | |2 |Alice| IT |3000 |2833.3333 | |3 |Bob |Sales |2000 |2500 | +---+----+---------+------+----------+
Im obigen Codeauszug verwenden wir die Methode „Window.orderBy()“, um ein Fenster zu definieren, das die Reihenfolge der Zeilen basierend auf der Spalte „id“ angibt. Anschließend verwenden wir die Funktionen „lag()“ und „lead()“, um auf die vorherige bzw. nächste Zeile zuzugreifen. Abschließend berechnen wir das Durchschnittsgehalt, indem wir die aktuelle Zeile und ihre Nachbarn berücksichtigen.
Schließlich ist Caching eine wichtige Technologie in PySpark, um die Leistung iterativer Algorithmen oder wiederholter Berechnungen zu verbessern. Mit der Methode „cache()“ können wir einen DataFrame oder ein RDD im Speicher zwischenspeichern:
# Cache a DataFrame df.cache()
Caching zeigt keine Ausgabe an, aber nachfolgende Vorgänge, die auf dem zwischengespeicherten DataFrame basieren, werden schneller sein, da die Daten im Speicher gespeichert werden.
In diesem Tutorial erkunden wir die Leistungsfähigkeit von PySpark für die Verarbeitung großer Datensätze in Python. Wir haben zunächst die Entwicklungsumgebung eingerichtet und die Daten in RDDs und DataFrames geladen. Anschließend befassten wir uns mit Datentransformations- und Analysetechniken, einschließlich Filtern, Aggregieren und Zusammenführen von Datensätzen. Abschließend besprechen wir fortgeschrittene PySpark-Techniken wie benutzerdefinierte Funktionen, Fensterfunktionen und Caching.
Das obige ist der detaillierte Inhalt vonVerarbeiten Sie große Datensätze mit Python PySpark. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!