


Data scientists or data practitioners who use Python for data processing are no strangers to the data science package pandas, and there are also heavy users of pandas like Yun Duojun. Most of the first lines of code written at the beginning of the project are import pandas as pd. Pandas can be said to be yyds for data processing! And its shortcomings are also very obvious. Pandas can only be processed on a single machine, and it cannot scale linearly with the amount of data. For example, if pandas attempts to read a data set larger than a machine's available memory, it will fail due to insufficient memory.
In addition, pandas is very slow in processing large data. Although there are other libraries like Dask or Vaex to optimize and improve data processing speed, it is a piece of cake in front of Spark, the god framework of big data processing.
Fortunately, in the new Spark 3.2 version, a new Pandas API has appeared, which integrates most of the pandas functions into PySpark. Using the pandas interface, you can use Spark, because Spark has The Pandas API uses Spark in the background, so that it can achieve the effect of strong cooperation, which can be said to be very powerful and very convenient.
It all started at Spark AI Summit 2019. Koalas is an open source project that uses Pandas on top of Spark. In the beginning, it only covered a small part of the functionality of Pandas, but it gradually grew in size. Now, in the new Spark 3.2 version, Koalas has been merged into PySpark.
Spark now integrates the Pandas API, so you can run Pandas on Spark. Only one line of code needs to be changed:
import pyspark.pandas as ps
From this we can gain many advantages:
- If we are familiar with using Python and Pandas, but not familiar with Spark, we can omit the complexity Start using PySpark immediately for your learning process.
- You can use one code base for everything: small and big data, single and distributed machines.
- You can run Pandas code faster on the Spark distributed framework.
The last point is particularly noteworthy.
On the one hand, distributed computing can be applied to code in Pandas. And with the Spark engine, your code will be faster even on a single machine! The graph below shows the performance comparison between running Spark on a machine with 96 vCPUs and 384 GiBs of memory and calling pandas alone to analyze a 130GB CSV dataset.
Multithreading and Spark SQL Catalyst Optimizer both help optimize performance. For example, the Join count operation is 4x faster with full stage code generation: 5.9 seconds without code generation and 1.6 seconds with code generation.
Spark has particularly significant advantages in chaining operations. The Catalyst query optimizer recognizes filters to filter data intelligently and can apply disk-based joins, whereas Pandas prefers to load all data into memory at each step.
Now are you eager to try out how to write some code using the Pandas API on Spark? Let's get started now!
Switching between Pandas / Pandas-on-Spark / Spark
The first thing you need to know is what exactly we are using. When using Pandas, use the class pandas.core.frame.DataFrame. When using the pandas API in Spark, use pyspark.pandas.frame.DataFrame. Although the two are similar, they are not the same. The main difference is that the former is in a single machine, while the latter is distributed.
You can use Pandas-on-Spark to create a Dataframe and convert it to Pandas and vice versa:
# import Pandas-on-Spark import pyspark.pandas as ps # 使用 Pandas-on-Spark 创建一个 DataFrame ps_df = ps.DataFrame(range(10)) # 将 Pandas-on-Spark Dataframe 转换为 Pandas Dataframe pd_df = ps_df.to_pandas() # 将 Pandas Dataframe 转换为 Pandas-on-Spark Dataframe ps_df = ps.from_pandas(pd_df)
Note that if you use multiple machines, you will not be able to use Pandas-on-Spark before converting it to Pandas. When converting a Spark Dataframe to a Pandas Dataframe, data is transferred from multiple machines to a single machine and vice versa (see PySpark Guide [1]).
It is also possible to convert a Pandas-on-Spark Dataframe to a Spark DataFrame and vice versa:
# 使用 Pandas-on-Spark 创建一个 DataFrame ps_df = ps.DataFrame(range(10)) # 将 Pandas-on-Spark Dataframe 转换为 Spark Dataframe spark_df = ps_df.to_spark() # 将 Spark Dataframe 转换为 Pandas-on-Spark Dataframe ps_df_new = spark_df.to_pandas_on_spark()
How does the data type change?
The data types are basically the same when using Pandas-on-Spark and Pandas. When converting a Pandas-on-Spark DataFrame to a Spark DataFrame, the data type is automatically converted to the appropriate type (see the PySpark Guide [2])
The following example shows how the data is converted when converting Type conversion from PySpark DataFrame to pandas-on-Spark DataFrame.
>>> sdf = spark.createDataFrame([ ... (1, Decimal(1.0), 1., 1., 1, 1, 1, datetime(2020, 10, 27), "1", True, datetime(2020, 10, 27)), ... ], 'tinyint tinyint, decimal decimal, float float, double double, integer integer, long long, short short, timestamp timestamp, string string, boolean boolean, date date') >>> sdf
DataFrame[tinyint: tinyint, decimal: decimal(10,0), float: float, double: double, integer: int, long: bigint, short: smallint, timestamp: timestamp, string: string, boolean: boolean, date: date]
psdf = sdf.pandas_api() psdf.dtypes
tinyintint8 decimalobject float float32 doublefloat64 integer int32 longint64 short int16 timestampdatetime64[ns] string object booleanbool date object dtype: object
Pandas-on-Spark vs Spark Functions
DataFrame in Spark and its most commonly used functions in Pandas-on-Spark. Note that the only syntax difference between Pandas-on-Spark and Pandas is the import pyspark.pandas as ps line.
当你看完如下内容后,你会发现,即使您不熟悉 Spark,也可以通过 Pandas API 轻松使用。
导入库
# 运行Spark from pyspark.sql import SparkSession spark = SparkSession.builder .appName("Spark") .getOrCreate() # 在Spark上运行Pandas import pyspark.pandas as ps
读取数据
以 old dog iris 数据集为例。
# SPARK sdf = spark.read.options(inferSchema='True', header='True').csv('iris.csv') # PANDAS-ON-SPARK pdf = ps.read_csv('iris.csv')
选择
# SPARK sdf.select("sepal_length","sepal_width").show() # PANDAS-ON-SPARK pdf[["sepal_length","sepal_width"]].head()
删除列
# SPARK sdf.drop('sepal_length').show()# PANDAS-ON-SPARK pdf.drop('sepal_length').head()
删除重复项
# SPARK sdf.dropDuplicates(["sepal_length","sepal_width"]).show() # PANDAS-ON-SPARK pdf[["sepal_length", "sepal_width"]].drop_duplicates()
筛选
# SPARK sdf.filter( (sdf.flower_type == "Iris-setosa") & (sdf.petal_length > 1.5) ).show() # PANDAS-ON-SPARK pdf.loc[ (pdf.flower_type == "Iris-setosa") & (pdf.petal_length > 1.5) ].head()
计数
# SPARK sdf.filter(sdf.flower_type == "Iris-virginica").count() # PANDAS-ON-SPARK pdf.loc[pdf.flower_type == "Iris-virginica"].count()
唯一值
# SPARK sdf.select("flower_type").distinct().show() # PANDAS-ON-SPARK pdf["flower_type"].unique()
排序
# SPARK sdf.sort("sepal_length", "sepal_width").show() # PANDAS-ON-SPARK pdf.sort_values(["sepal_length", "sepal_width"]).head()
分组
# SPARK sdf.groupBy("flower_type").count().show() # PANDAS-ON-SPARK pdf.groupby("flower_type").count()
替换
# SPARK sdf.replace("Iris-setosa", "setosa").show() # PANDAS-ON-SPARK pdf.replace("Iris-setosa", "setosa").head()
连接
#SPARK sdf.union(sdf) # PANDAS-ON-SPARK pdf.append(pdf)
transform 和 apply 函数应用
有许多 API 允许用户针对 pandas-on-Spark DataFrame 应用函数,例如:
DataFrame.transform() DataFrame.apply() DataFrame.pandas_on_spark.transform_batch() DataFrame.pandas_on_spark.apply_batch() Series.pandas_on_spark.transform_batch()
每个 API 都有不同的用途,并且在内部工作方式不同。
transform 和 apply
DataFrame.transform()和DataFrame.apply()之间的主要区别在于,前者需要返回相同长度的输入,而后者不需要。
# transform psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pser): return pser + 1# 应该总是返回与输入相同的长度。 psdf.transform(pandas_plus) # apply psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]}) def pandas_plus(pser): return pser[pser % 2 == 1]# 允许任意长度 psdf.apply(pandas_plus)
在这种情况下,每个函数采用一个 pandas Series,Spark 上的 pandas API 以分布式方式计算函数,如下所示。
在“列”轴的情况下,该函数将每一行作为一个熊猫系列。
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pser): return sum(pser)# 允许任意长度 psdf.apply(pandas_plus, axis='columns')
上面的示例将每一行的总和计算为pands Series
pandas_on_spark.transform_batch和pandas_on_spark.apply_batch
batch 后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。API 对 pandas-on-Spark DataFrame 或 Series 进行切片,然后以 pandas DataFrame 或 Series 作为输入和输出应用给定函数。请参阅以下示例:
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pdf): return pdf + 1# 应该总是返回与输入相同的长度。 psdf.pandas_on_spark.transform_batch(pandas_plus) psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]}) def pandas_plus(pdf): return pdf[pdf.a > 1]# 允许任意长度 psdf.pandas_on_spark.apply_batch(pandas_plus)
两个示例中的函数都将 pandas DataFrame 作为 pandas-on-Spark DataFrame 的一个块,并输出一个 pandas DataFrame。Spark 上的 Pandas API 将 pandas 数据帧组合为 pandas-on-Spark 数据帧。
在 Spark 上使用 pandas API的注意事项
避免shuffle
某些操作,例如sort_values在并行或分布式环境中比在单台机器上的内存中更难完成,因为它需要将数据发送到其他节点,并通过网络在多个节点之间交换数据。
避免在单个分区上计算
另一种常见情况是在单个分区上进行计算。目前, DataFrame.rank 等一些 API 使用 PySpark 的 Window 而不指定分区规范。这会将所有数据移动到单个机器中的单个分区中,并可能导致严重的性能下降。对于非常大的数据集,应避免使用此类 API。
不要使用重复的列名
不允许使用重复的列名,因为 Spark SQL 通常不允许这样做。Spark 上的 Pandas API 继承了这种行为。例如,见下文:
import pyspark.pandas as ps psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]}) psdf.columns = ["a", "a"]
Reference 'a' is ambiguous, could be: a, a.;
此外,强烈建议不要使用区分大小写的列名。Spark 上的 Pandas API 默认不允许它。
import pyspark.pandas as ps psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
Reference 'a' is ambiguous, could be: a, a.;
但可以在 Spark 配置spark.sql.caseSensitive中打开以启用它,但需要自己承担风险。
from pyspark.sql import SparkSession builder = SparkSession.builder.appName("pandas-on-spark") builder = builder.config("spark.sql.caseSensitive", "true") builder.getOrCreate() import pyspark.pandas as ps psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]}) psdf
aA 013 124
使用默认索引
pandas-on-Spark 用户面临的一个常见问题是默认索引导致性能下降。当索引未知时,Spark 上的 Pandas API 会附加一个默认索引,例如 Spark DataFrame 直接转换为 pandas-on-Spark DataFrame。
如果计划在生产中处理大数据,请通过将默认索引配置为distributed或distributed-sequence来使其确保为分布式。
有关配置默认索引的更多详细信息,请参阅默认索引类型[3]。
在 Spark 上使用 pandas API
尽管 Spark 上的 pandas API 具有大部分与 pandas 等效的 API,但仍有一些 API 尚未实现或明确不受支持。因此尽可能直接在 Spark 上使用 pandas API。
例如,Spark 上的 pandas API 没有实现__iter__(),阻止用户将所有数据从整个集群收集到客户端(驱动程序)端。不幸的是,许多外部 API,例如 min、max、sum 等 Python 的内置函数,都要求给定参数是可迭代的。对于 pandas,它开箱即用,如下所示:
>>> import pandas as pd >>> max(pd.Series([1, 2, 3])) 3 >>> min(pd.Series([1, 2, 3])) 1 >>> sum(pd.Series([1, 2, 3])) 6
Pandas 数据集存在于单台机器中,自然可以在同一台机器内进行本地迭代。但是,pandas-on-Spark 数据集存在于多台机器上,并且它们是以分布式方式计算的。很难在本地迭代,很可能用户在不知情的情况下将整个数据收集到客户端。因此,最好坚持使用 pandas-on-Spark API。上面的例子可以转换如下:
>>> import pyspark.pandas as ps >>> ps.Series([1, 2, 3]).max() 3 >>> ps.Series([1, 2, 3]).min() 1 >>> ps.Series([1, 2, 3]).sum() 6
pandas 用户的另一个常见模式可能是依赖列表推导式或生成器表达式。但是,它还假设数据集在引擎盖下是本地可迭代的。因此,它可以在 pandas 中无缝运行,如下所示:
import pandas as pd data = [] countries = ['London', 'New York', 'Helsinki'] pser = pd.Series([20., 21., 12.], index=countries) for temperature in pser: assert temperature > 0 if temperature > 1000: temperature = None data.append(temperature ** 2) pd.Series(data, index=countries)
London400.0 New York441.0 Helsinki144.0 dtype: float64
但是,对于 Spark 上的 pandas API,它的工作原理与上述相同。上面的示例也可以更改为直接使用 pandas-on-Spark API,如下所示:
import pyspark.pandas as ps import numpy as np countries = ['London', 'New York', 'Helsinki'] psser = ps.Series([20., 21., 12.], index=countries) def square(temperature) -> np.float64: assert temperature > 0 if temperature > 1000: temperature = None return temperature ** 2 psser.apply(square)
London400.0 New York441.0 Helsinki144.0
减少对不同 DataFrame 的操作
Spark 上的 Pandas API 默认不允许对不同 DataFrame(或 Series)进行操作,以防止昂贵的操作。只要有可能,就应该避免这种操作。
写在最后
到目前为止,我们将能够在 Spark 上使用 Pandas。这将会导致Pandas 速度的大大提高,迁移到 Spark 时学习曲线的减少,以及单机计算和分布式计算在同一代码库中的合并。
参考资料
[1]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/pandas_pyspark.html
[2]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html
[3]默认索引类型: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type
The above is the detailed content of Pandas and PySpark join forces to achieve both functionality and speed!. For more information, please follow other related articles on the PHP Chinese website!

在分布式系统的架构中,文件管理和存储是非常重要的一部分。然而,传统的文件系统在应对大规模的文件存储和管理时遇到了一些问题。为了解决这些问题,SeaweedFS分布式文件系统被开发出来。在本文中,我们将介绍如何使用PHP来实现开源SeaweedFS分布式文件系统。什么是SeaweedFS?SeaweedFS是一个开源的分布式文件系统,它用于解决大规模文件存储和

使用Python做数据处理的数据科学家或数据从业者,对数据科学包pandas并不陌生,也不乏像云朵君一样的pandas重度使用者,项目开始写的第一行代码,大多是importpandasaspd。pandas做数据处理可以说是yyds!而他的缺点也是非常明显,pandas只能单机处理,它不能随数据量线性伸缩。例如,如果pandas试图读取的数据集大于一台机器的可用内存,则会因内存不足而失败。另外pandas在处理大型数据方面非常慢,虽然有像Dask或Vaex等其他库来优化提升数

随着互联网的快速发展,网站的访问量也在不断增长。为了满足这一需求,我们需要构建高可用性的系统。分布式数据中心就是这样一个系统,它将各个数据中心的负载分散到不同的服务器上,增加系统的稳定性和可扩展性。在PHP开发中,我们也可以通过一些技术实现分布式数据中心。分布式缓存分布式缓存是互联网分布式应用中最常用的技术之一。它将数据缓存在多个节点上,提高数据的访问速度和

什么是分布式计数器?在分布式系统中,多个节点之间需要对共同的状态进行更新和读取,而计数器是其中一种应用最广泛的状态之一。通俗地讲,计数器就是一个变量,每次被访问时其值就会加1或减1,用于跟踪某个系统进展的指标。而分布式计数器则指的是在分布式环境下对计数器进行操作和管理。为什么要使用Redis实现分布式计数器?随着分布式计算的普及,分布式系统中的许多细节问题也

一、Raft 概述Raft 算法是分布式系统开发首选的共识算法。比如现在流行 Etcd、Consul。如果掌握了这个算法,就可以较容易地处理绝大部分场景的容错和一致性需求。比如分布式配置系统、分布式 NoSQL 存储等等,轻松突破系统的单机限制。Raft 算法是通过一切以领导者为准的方式,实现一系列值的共识和各节点日志的一致。二、Raft 角色2.1 角色跟随者(Follower):普通群众,默默接收和来自领导者的消息,当领导者心跳信息超时的

Redis实现分布式配置管理的方法与应用实例随着业务的发展,配置管理对于一个系统而言变得越来越重要。一些通用的应用配置(如数据库连接信息,缓存配置等),以及一些需要动态控制的开关配置,都需要进行统一管理和更新。在传统架构中,通常是通过在每台服务器上通过单独的配置文件进行管理,但这种方式会导致配置文件的管理和同步变得十分复杂。因此,在分布式架构下,采用一个可靠

Redis实现分布式对象存储的方法与应用实例随着互联网的快速发展和数据量的快速增长,传统的单机存储已经无法满足业务的需求,因此分布式存储成为了当前业界的热门话题。Redis是一个高性能的键值对数据库,它不仅支持丰富的数据结构,而且支持分布式存储,因此具有极高的应用价值。本文将介绍Redis实现分布式对象存储的方法,并结合应用实例进行说明。一、Redis实现分

随着互联网技术的发展,对于一个网络应用而言,对数据库的操作非常频繁。特别是对于动态网站,甚至有可能出现每秒数百次的数据库请求,当数据库处理能力不能满足需求时,我们可以考虑使用数据库分布式。而分布式数据库的实现离不开与编程语言的集成。PHP作为一门非常流行的编程语言,具有较好的适用性和灵活性,这篇文章将着重介绍PHP与数据库分布式集成的实践。分布式的概念分布式


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

EditPlus Chinese cracked version
Small size, syntax highlighting, does not support code prompt function

MinGW - Minimalist GNU for Windows
This project is in the process of being migrated to osdn.net/projects/mingw, you can continue to follow us there. MinGW: A native Windows port of the GNU Compiler Collection (GCC), freely distributable import libraries and header files for building native Windows applications; includes extensions to the MSVC runtime to support C99 functionality. All MinGW software can run on 64-bit Windows platforms.

SublimeText3 Chinese version
Chinese version, very easy to use

PhpStorm Mac version
The latest (2018.2.1) professional PHP integrated development tool

SublimeText3 Linux new version
SublimeText3 Linux latest version