Maison > Article > développement back-end > Pandas et PySpark unissent leurs forces pour atteindre à la fois fonctionnalité et rapidité !
Les data scientists ou praticiens des données qui utilisent Python pour le traitement des données ne sont pas étrangers au package de science des données pandas, et il existe également de gros utilisateurs de pandas comme Yun Duojun. La plupart des premières lignes de code écrites au début du projet sont importées. pandas en tant que PD. On peut dire que les pandas sont des adeptes du traitement des données ! Et ses défauts sont également très évidents. Les Pandas ne peuvent être traités que sur une seule machine et ne peuvent pas évoluer de manière linéaire avec la quantité de données. Par exemple, si pandas tente de lire un ensemble de données supérieur à la mémoire disponible d'une machine, il échouera en raison d'une mémoire insuffisante.
De plus, pandas est très lent à traiter des données volumineuses. Bien qu'il existe d'autres bibliothèques comme Dask ou Vaex pour optimiser et améliorer la vitesse de traitement des données, c'est un jeu d'enfant devant le framework divin du traitement du Big Data, Spark.
Heureusement, dans la nouvelle version Spark 3.2, une nouvelle API Pandas est apparue, qui intègre la plupart des fonctions pandas dans PySpark. En utilisant l'interface pandas, vous pouvez utiliser Spark, car l'API Pandas sur Spark utilise Spark en arrière-plan. peut obtenir l'effet d'une coopération forte, qui peut être considérée comme très puissante et très pratique.
Tout a commencé au Spark + AI Summit 2019. Koalas est un projet open source qui utilise Pandas sur Spark. Au début, il ne couvrait qu’une petite partie des fonctionnalités de Pandas, mais sa taille a progressivement augmenté. Désormais, dans la nouvelle version Spark 3.2, Koalas a été fusionné dans PySpark.
Spark intègre désormais l'API Pandas, vous pouvez donc exécuter Pandas sur Spark. Nous n'avons besoin de changer qu'une seule ligne de code :
import pyspark.pandas as ps
Nous pouvons en tirer de nombreux avantages :
Le dernier point est particulièrement remarquable.
D'une part, l'informatique distribuée peut être appliquée au code dans Pandas. Et avec le moteur Spark, votre code sera plus rapide même sur une seule machine ! Le graphique ci-dessous montre la comparaison des performances entre l'exécution de Spark sur une machine avec 96 vCPU et 384 Gio de mémoire et l'appel de pandas seuls pour analyser un ensemble de données CSV de 130 Go.
Le multi-threading et Spark SQL Catalyst Optimizer aident tous deux à optimiser les performances. Par exemple, l'opération Join count est 4 fois plus rapide avec la génération de code sur toute l'étape : 5,9 secondes sans génération de code et 1,6 seconde avec génération de code.
Spark présente des avantages particulièrement importants dans les opérations de chaînage. L'optimiseur de requêtes Catalyst reconnaît les filtres pour filtrer les données intelligemment et peut appliquer des jointures basées sur le disque, tandis que Pandas préfère charger toutes les données en mémoire à chaque étape.
Vous avez hâte d'essayer comment écrire du code à l'aide de l'API Pandas sur Spark ? Commençons maintenant !
La première chose que vous devez savoir est ce que nous utilisons exactement. Lorsque vous utilisez Pandas, utilisez la classe pandas.core.frame.DataFrame. Lorsque vous utilisez l'API pandas dans Spark, utilisez pyspark.pandas.frame.DataFrame. Bien que les deux soient similaires, ils ne sont pas identiques. La principale différence est que le premier est dans une seule machine, tandis que le second est distribué.
Il est possible de créer un Dataframe à l'aide de Pandas-on-Spark et de le convertir en Pandas et vice versa :
# import Pandas-on-Spark import pyspark.pandas as ps # 使用 Pandas-on-Spark 创建一个 DataFrame ps_df = ps.DataFrame(range(10)) # 将 Pandas-on-Spark Dataframe 转换为 Pandas Dataframe pd_df = ps_df.to_pandas() # 将 Pandas Dataframe 转换为 Pandas-on-Spark Dataframe ps_df = ps.from_pandas(pd_df)
Notez que si vous utilisez plusieurs machines, lors de la conversion d'un Dataframe Pandas-on-Spark en Pandas Dataframe, Data est transféré de plusieurs machines vers une seule machine et vice versa (voir le guide PySpark [1]).
Il est également possible de convertir un Dataframe Pandas-on-Spark en Spark DataFrame et vice versa :
# 使用 Pandas-on-Spark 创建一个 DataFrame ps_df = ps.DataFrame(range(10)) # 将 Pandas-on-Spark Dataframe 转换为 Spark Dataframe spark_df = ps_df.to_spark() # 将 Spark Dataframe 转换为 Pandas-on-Spark Dataframe ps_df_new = spark_df.to_pandas_on_spark()
Les types de données sont fondamentalement les mêmes lors de l'utilisation de Pandas-on-Spark et Pandas. Lors de la conversion d'un DataFrame Pandas-on-Spark en Spark DataFrame, les types de données sont automatiquement convertis vers le type approprié (voir Guide PySpark [2])
L'exemple suivant montre comment les types de données sont convertis à partir d'un DataFrame PySpark lors de la conversion Convertissez en DataFrame pandas-on-Spark.
>>> sdf = spark.createDataFrame([ ... (1, Decimal(1.0), 1., 1., 1, 1, 1, datetime(2020, 10, 27), "1", True, datetime(2020, 10, 27)), ... ], 'tinyint tinyint, decimal decimal, float float, double double, integer integer, long long, short short, timestamp timestamp, string string, boolean boolean, date date') >>> sdf
DataFrame[tinyint: tinyint, decimal: decimal(10,0), float: float, double: double, integer: int, long: bigint, short: smallint, timestamp: timestamp, string: string, boolean: boolean, date: date]
psdf = sdf.pandas_api() psdf.dtypes
tinyintint8 decimalobject float float32 doublefloat64 integer int32 longint64 short int16 timestampdatetime64[ns] string object booleanbool date object dtype: object
DataFrame dans Spark et ses fonctions les plus couramment utilisées dans Pandas-on-Spark. Notez que la seule différence de syntaxe entre Pandas-on-Spark et Pandas est la ligne import pyspark.pandas as ps.
当你看完如下内容后,你会发现,即使您不熟悉 Spark,也可以通过 Pandas API 轻松使用。
# 运行Spark from pyspark.sql import SparkSession spark = SparkSession.builder .appName("Spark") .getOrCreate() # 在Spark上运行Pandas import pyspark.pandas as ps
以 old dog iris 数据集为例。
# SPARK sdf = spark.read.options(inferSchema='True', header='True').csv('iris.csv') # PANDAS-ON-SPARK pdf = ps.read_csv('iris.csv')
# SPARK sdf.select("sepal_length","sepal_width").show() # PANDAS-ON-SPARK pdf[["sepal_length","sepal_width"]].head()
# SPARK sdf.drop('sepal_length').show()# PANDAS-ON-SPARK pdf.drop('sepal_length').head()
# SPARK sdf.dropDuplicates(["sepal_length","sepal_width"]).show() # PANDAS-ON-SPARK pdf[["sepal_length", "sepal_width"]].drop_duplicates()
# SPARK sdf.filter( (sdf.flower_type == "Iris-setosa") & (sdf.petal_length > 1.5) ).show() # PANDAS-ON-SPARK pdf.loc[ (pdf.flower_type == "Iris-setosa") & (pdf.petal_length > 1.5) ].head()
# SPARK sdf.filter(sdf.flower_type == "Iris-virginica").count() # PANDAS-ON-SPARK pdf.loc[pdf.flower_type == "Iris-virginica"].count()
# SPARK sdf.select("flower_type").distinct().show() # PANDAS-ON-SPARK pdf["flower_type"].unique()
# SPARK sdf.sort("sepal_length", "sepal_width").show() # PANDAS-ON-SPARK pdf.sort_values(["sepal_length", "sepal_width"]).head()
# SPARK sdf.groupBy("flower_type").count().show() # PANDAS-ON-SPARK pdf.groupby("flower_type").count()
# SPARK sdf.replace("Iris-setosa", "setosa").show() # PANDAS-ON-SPARK pdf.replace("Iris-setosa", "setosa").head()
#SPARK sdf.union(sdf) # PANDAS-ON-SPARK pdf.append(pdf)
有许多 API 允许用户针对 pandas-on-Spark DataFrame 应用函数,例如:
DataFrame.transform() DataFrame.apply() DataFrame.pandas_on_spark.transform_batch() DataFrame.pandas_on_spark.apply_batch() Series.pandas_on_spark.transform_batch()
每个 API 都有不同的用途,并且在内部工作方式不同。
DataFrame.transform()和DataFrame.apply()之间的主要区别在于,前者需要返回相同长度的输入,而后者不需要。
# transform psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pser): return pser + 1# 应该总是返回与输入相同的长度。 psdf.transform(pandas_plus) # apply psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]}) def pandas_plus(pser): return pser[pser % 2 == 1]# 允许任意长度 psdf.apply(pandas_plus)
在这种情况下,每个函数采用一个 pandas Series,Spark 上的 pandas API 以分布式方式计算函数,如下所示。
在“列”轴的情况下,该函数将每一行作为一个熊猫系列。
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pser): return sum(pser)# 允许任意长度 psdf.apply(pandas_plus, axis='columns')
上面的示例将每一行的总和计算为pands Series
batch 后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。API 对 pandas-on-Spark DataFrame 或 Series 进行切片,然后以 pandas DataFrame 或 Series 作为输入和输出应用给定函数。请参阅以下示例:
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pdf): return pdf + 1# 应该总是返回与输入相同的长度。 psdf.pandas_on_spark.transform_batch(pandas_plus) psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pdf): return pdf[pdf.a > 1]# 允许任意长度 psdf.pandas_on_spark.apply_batch(pandas_plus)
两个示例中的函数都将 pandas DataFrame 作为 pandas-on-Spark DataFrame 的一个块,并输出一个 pandas DataFrame。Spark 上的 Pandas API 将 pandas 数据帧组合为 pandas-on-Spark 数据帧。
某些操作,例如sort_values在并行或分布式环境中比在单台机器上的内存中更难完成,因为它需要将数据发送到其他节点,并通过网络在多个节点之间交换数据。
另一种常见情况是在单个分区上进行计算。目前, DataFrame.rank 等一些 API 使用 PySpark 的 Window 而不指定分区规范。这会将所有数据移动到单个机器中的单个分区中,并可能导致严重的性能下降。对于非常大的数据集,应避免使用此类 API。
不允许使用重复的列名,因为 Spark SQL 通常不允许这样做。Spark 上的 Pandas API 继承了这种行为。例如,见下文:
import pyspark.pandas as ps psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]}) psdf.columns = ["a", "a"]
Reference 'a' is ambiguous, could be: a, a.;
此外,强烈建议不要使用区分大小写的列名。Spark 上的 Pandas API 默认不允许它。
import pyspark.pandas as ps psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
Reference 'a' is ambiguous, could be: a, a.;
但可以在 Spark 配置spark.sql.caseSensitive中打开以启用它,但需要自己承担风险。
from pyspark.sql import SparkSession builder = SparkSession.builder.appName("pandas-on-spark") builder = builder.config("spark.sql.caseSensitive", "true") builder.getOrCreate() import pyspark.pandas as ps psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]}) psdf
aA 013 124
pandas-on-Spark 用户面临的一个常见问题是默认索引导致性能下降。当索引未知时,Spark 上的 Pandas API 会附加一个默认索引,例如 Spark DataFrame 直接转换为 pandas-on-Spark DataFrame。
如果计划在生产中处理大数据,请通过将默认索引配置为distributed或distributed-sequence来使其确保为分布式。
有关配置默认索引的更多详细信息,请参阅默认索引类型[3]。
尽管 Spark 上的 pandas API 具有大部分与 pandas 等效的 API,但仍有一些 API 尚未实现或明确不受支持。因此尽可能直接在 Spark 上使用 pandas API。
例如,Spark 上的 pandas API 没有实现__iter__(),阻止用户将所有数据从整个集群收集到客户端(驱动程序)端。不幸的是,许多外部 API,例如 min、max、sum 等 Python 的内置函数,都要求给定参数是可迭代的。对于 pandas,它开箱即用,如下所示:
>>> import pandas as pd >>> max(pd.Series([1, 2, 3])) 3 >>> min(pd.Series([1, 2, 3])) 1 >>> sum(pd.Series([1, 2, 3])) 6
Pandas 数据集存在于单台机器中,自然可以在同一台机器内进行本地迭代。但是,pandas-on-Spark 数据集存在于多台机器上,并且它们是以分布式方式计算的。很难在本地迭代,很可能用户在不知情的情况下将整个数据收集到客户端。因此,最好坚持使用 pandas-on-Spark API。上面的例子可以转换如下:
>>> import pyspark.pandas as ps >>> ps.Series([1, 2, 3]).max() 3 >>> ps.Series([1, 2, 3]).min() 1 >>> ps.Series([1, 2, 3]).sum() 6
pandas 用户的另一个常见模式可能是依赖列表推导式或生成器表达式。但是,它还假设数据集在引擎盖下是本地可迭代的。因此,它可以在 pandas 中无缝运行,如下所示:
import pandas as pd data = [] countries = ['London', 'New York', 'Helsinki'] pser = pd.Series([20., 21., 12.], index=countries) for temperature in pser: assert temperature > 0 if temperature > 1000: temperature = None data.append(temperature ** 2) pd.Series(data, index=countries)
London400.0 New York441.0 Helsinki144.0 dtype: float64
但是,对于 Spark 上的 pandas API,它的工作原理与上述相同。上面的示例也可以更改为直接使用 pandas-on-Spark API,如下所示:
import pyspark.pandas as ps import numpy as np countries = ['London', 'New York', 'Helsinki'] psser = ps.Series([20., 21., 12.], index=countries) def square(temperature) -> np.float64: assert temperature > 0 if temperature > 1000: temperature = None return temperature ** 2 psser.apply(square)
London400.0 New York441.0 Helsinki144.0
Spark 上的 Pandas API 默认不允许对不同 DataFrame(或 Series)进行操作,以防止昂贵的操作。只要有可能,就应该避免这种操作。
到目前为止,我们将能够在 Spark 上使用 Pandas。这将会导致Pandas 速度的大大提高,迁移到 Spark 时学习曲线的减少,以及单机计算和分布式计算在同一代码库中的合并。
[1]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/pandas_pyspark.html
[2]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html
[3]默认索引类型: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!