Home > Article > Backend Development > Amway everyone has a Python big data analysis artifact
python video tutorialThe column introduces a big data analysis artifact
Recommended (free) : python video tutorial
How to improve the running speed of Pandas
has been introduced many times before, and is often mentioned in itDask
, many friends may not know much about it if they have never been exposed to it. I would like to recommend this artifact today.
1. What is Dask?
Pandas
and Numpy
are all familiar to everyone. After the code is run, the data is loaded into RAM. If the data set is particularly large, we will look to memory soaring. But sometimes the data to be processed is not suitable for RAM
, and this time Dask
comes.
Dask
is open source and free. It is developed in coordination with other community projects such as Numpy, Pandas and Scikit-Learn.
Official: https://dask.org/
Dask
Supports DataFrame
and NumpyArray of
Pandas
data structure, and can be run on the local computer or extended to run on a cluster.
Basically, just write the code once, use normal Pythonic
syntax, and run it locally or deploy it to a multi-node cluster. This is an awesome feature in itself, but it’s not the most awesome thing.
I think the most awesome function of Dask
is: It is compatible with most of the tools we are already using, and you can use your own notebook with only a small amount of code changes. The processing power already available on the computer runs the code in parallel. Processing data in parallel means less execution time, less waiting time and more analysis time.
The following is the general process of data processing in Dask
.
#2. What existing tools does Dask support?
This is also what I like, because Dask
can be compatible with Python
data processing and modeling library packages, and follows the API of the library package , which has extremely low learning costs for Python users. Big data processing like Hadoop
and Spark
has a high learning threshold and time cost.
Currently, Dask
can support pandas
, Numpy
, Sklearn
, XGBoost
,XArray
, RAPIDS
, etc., I think these alone are enough, at least for common data processing, modeling analysis, they are fully covered.
3. Dask installation
You can use conda
or pip
, or from the source code Install dask
.
conda install dask
Because dask
has many dependencies, you can also use the following code for quick installation, which will install the minimum set of dependencies required to run Dask
.
conda install dask-core
The other option is to install from source.
git clone https://github.com/dask/dask.git cd dask python -m pip install .
4. How to use Dask?
Numpy, pandas
Dask
Introduces 3 parallel collections that can store data larger than RAM. These collections areDataFrame
, Bags
, Arrays
. Each of these collection types is capable of working with data partitioned between RAM and hard disk, as well as data distributed across multiple nodes in the cluster. The usage of
Dask
is very clear, if you use NumPy
array, start with Dask
array, if you use Pandas DataFrame
, start with Dask DataFrame
, and so on.
import dask.array as da x = da.random.uniform(low=0, high=10, size=(10000, 10000), # normal numpy code chunks=(1000, 1000)) # break into chunks of size 1000x1000 y = x + x.T - x.mean(axis=0) # Use normal syntax for high level algorithms # DataFrames import dask.dataframe as dd df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp', # normal Pandas code blocksize=64000000) # break text into 64MB chunks s = df.groupby('name').balance.mean() # Use normal syntax for high level algorithms # Bags / lists import dask.bag as db b = db.read_text('*.json').map(json.loads) total = (b.filter(lambda d: d['name'] == 'Alice') .map(lambda d: d['balance']) .sum())
These high-level interfaces replicate the standard interfaces with slight changes. For most of the APIs in the original project, these interfaces will automatically process larger data sets in parallel for us. The implementation is not very complicated. You can complete it step by step by referring to the doc document of Dask
.
Delayed
Let’s talk about the Delay
function of Dask
, which is very powerful.
Dask.delayed
is a simple yet powerful way to parallelize existing code. It is called delayed
because it does not calculate the result immediately, but records the result of the calculation as a task in a graph that will be run on parallel hardware later.
Sometimes the existing dask.array
or dask.dataframe
may not be suitable for the problem. In these cases, we can use the simpler dask.delayed
Interface parallelizes custom algorithms. Take for example the following example.
def inc(x): return x + 1 def double(x): return x * 2 def add(x, y): return x + y data = [1, 2, 3, 4, 5] output = [] for x in data: a = inc(x) b = double(x) c = add(a, b) output.append(c) total = sum(output) 45
上面代码在单个线程中按顺序运行。但是,我们看到其中很多可以并行执行。Dask delayed
函数可修饰inc
、double
这些函数,以便它们可延迟运行,而不是立即执行函数,它将函数及其参数放入计算任务图中。
我们简单修改代码,用delayed
函数包装一下。
import dask output = [] for x in data: a = dask.delayed(inc)(x) b = dask.delayed(double)(x) c = dask.delayed(add)(a, b) output.append(c) total = dask.delayed(sum)(output)
代码运行后inc
、double
、add
和sum
都还没有发生,而是生成一个计算的任务图交给了total
。然后我们用visualizatize
看下任务图。
total.visualize()
上图明显看到了并行的可能性,所以毫不犹豫,使用compute
进行并行计算,这时才完成了计算。
>>> total.compute() 45
由于数据集较小无法比较时间,这里只介绍下使用方法,具体可自己动手实践下。
Sklearn机器学习
关于机器学习的并行化执行,由于内容较多,东哥会在另一篇文章展开。这里简单说下一下dask-learn
。
dask-learn
项目是与Sklearn
开发人员协作完成的。现在可实现并行化有Scikit-learn
的Pipeline
、GridsearchCV
和RandomSearchCV
以及这些的变体,它们可以更好地处理嵌套的并行操作。
因此,如果你将sklearn
替换为dklearn
,那么速度将会提升很多。
# from sklearn.grid_search import GridSearchCV from dklearn.grid_search import GridSearchCV # from sklearn.pipeline import Pipeline from dklearn.pipeline import Pipeline 下面是一个使用Pipeline的示例,其中应用了PCA和逻辑回归。 from sklearn.datasets import make_classification X, y = make_classification(n_samples=10000, n_features=500, n_classes=2, n_redundant=250, random_state=42) from sklearn import linear_model, decomposition from sklearn.pipeline import Pipeline from dklearn.pipeline import Pipeline logistic = linear_model.LogisticRegression() pca = decomposition.PCA() pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)]) grid = dict(pca__n_components=[50, 100, 150, 250], logistic__C=[1e-4, 1.0, 10, 1e4], logistic__penalty=['l1', 'l2']) # from sklearn.grid_search import GridSearchCV from dklearn.grid_search import GridSearchCV estimator = GridSearchCV(pipe, grid) estimator.fit(X, y)
结果是:sklearn
会在40秒钟左右执行此计算,而dask-learn
替代品大约需要10秒钟。
另外,如果添加以下代码可以连接到集群,通过Client
可以展示整个计算过程的dashboard
,由Bokeh
实现。
from dask.distributed import Client c = Client('scheduler-address:8786')
5、总结
以上就是Dask
的简单介绍,Dask的功能是非常强大的,且说明文档也非常全,既有示例又有解释。感兴趣的朋友可以自行去官网或者GitHub
学习,东哥下次分享使用Dask
进行机器学习的一些实例。
The above is the detailed content of Amway everyone has a Python big data analysis artifact. For more information, please follow other related articles on the PHP Chinese website!