Home >Backend Development >Python Tutorial >Understanding and applying Apache Spark tuning strategies

Understanding and applying Apache Spark tuning strategies

DDD
DDDOriginal
2024-11-12 17:55:02761browse

Motivators for reading this article.

  • Own experience lived in moments of chaos and moments of calm analysis.
  • What I looked for to delve deeper.
  • What I learned about how spark works for optimization.
  • What's 'plus' about Databricks for optimization.
  • Good practices that can avoid the need for tuning and refactoring.

Introduction

I have always had great contact with relational databases and later with distributed systems like Spark. Initially, I delved deeper into the DBMS, both to set up complex queries, administration and mainly how to put together a performative script for the DBMS. When I started working more with Spark and later with Databricks, initially I didn't have performance problems for the scenarios I had to build, but as the bigdata area really became bigdata I started to have performance problems in routines that increased by 30% every week and this made me look for how spark works 'under the hood', mainly because I already knew how a DBMS worked and this helped me understand some concepts that I will bring here.

Brief explanation of Apache Spark components

Let's keep it brief as I want this article to focus on performance analysis scenarios, techniques and best practices.

SparkCore:

This component is the basis of Spark, it is responsible for memory management, tasks, failure recovery, I/O management, in other words, it manipulates the RDD. Therefore, he is a guy who has a large part of the cluster.

Executors:

This component is the real worker of the spark ecosystem (cluster), it is the one who receives the writing or reading orders (tasks), which can be on disk, memory or both (I will explain later why this comes into play). performance scenarios).

workers:

Workers are literally what they are for those already familiar with distributed computing, they are the nodes of the cluster, so it is what 'hosts' the executors I mentioned above, each worker can contain one or more executors. It is responsible for managing the resources allocated to the executors, as if the executor were an assistant and the worker was a warehouse worker. What if he is the warehouseman he reports to?

Cluster Manager:

This is the manager, he manages resources (Memory and CPU) for the workers, he is the one who decides how many executors will be for each application and how much resource will be allocated, he manages the tasks sent by his 'boss' which I will explain further down, and as it is a higher position of responsibility, it also monitors the state of the cluster to recover from failures, redistributing tasks as necessary. (NOTE: there are several types of cluster managers: Yarn, mesos, kubernetes and the simplest which is standalone).

SparkContext:

Well, this is the boss or the gateway, I say gateway because any Spark application will go through it, it is what allows the application to interact with the cluster, that is, the workers and executors, it which allows and manages tasks between workers and in this way it manages the entire application in terms of configuration, number of executors and resources such as memory. Do you need to know how tasks are being carried out? talk to this boss here.

So, in an illustrative way:

Entendendo e aplicando estratégias de tunning Apache Spark

Now let's talk about performance, tuning, speed, speed and everything you hear from different positions.

When I worked with the relational banking side and there were performance problems, mainly in procedures or functions or a query in an application, I analyzed the following aspects:

  1. When is this script running and what is the server like at the moment?
  2. Is anyone competing for resources or table locks?
  3. Everything is smooth, no one blocking (blocking) the server resources are good, great...
  4. Now let me see the script, is its logic performative? In other words, whoever thought about reading/writing together or thought about it line by line (programming addiction), is consulting too many columns that they didn't need, monstrous queries with subquery, CTE, etc.? I modified all of these points (refactoring) and tested both the speed of the response and the use of server resources. Why am I explaining all this, when we are going to talk about Apache Spark? So.... this also applies to Spark and in a way that I would say is even more complex, but we'll get there.
  5. I think lastly, if the script was good I would analyze the 'path of stones', that is, the estimated execution plan and the actual execution plan. From this, I could understand what the DBMS was doing with its statistics (histogram) and which path it assumed to follow with its information and what was the reality, which path was followed. And then you could identify points such as: an additional filter in the query, a more performant JOIN and even the creation of an index or temporary tables.

Well, I think that's it, now what do these points have in common with Apache Spark?

  • Script not designed for distributed set manipulation (I said that Spark has a 'plus' of difficulty lol).
  • Time that a certain routine is running, if a simple Spark Job is running in the same cluster as another performing Job (or even not) that is consuming all the resources. (Look at a kind of famous DBMS lock here).
  • And finally, yes, Apache Spark has an execution plan, to be more precise, it has the following stages:
  1. Logical plan.
  2. Physical Plane.
  3. Execution strategy.
  4. Sometimes shows the estimated cost.

To summarize what each one is, despite the name, you can already get an idea:

Logical Plan:
Represents the original query as a series of logical operations. It is the abstract form of the query, without considering how it will actually be executed. Includes information about the operations that will be performed, such as filtering, selection, joining, aggregation and the wrong 'little things' too lol.

Physical Plane:
Details how Spark will actually execute the query. This includes the order of operations and which algorithms will be used (like DBMS algorithms). It may include details about how data will be partitioned and distributed among executors.

Execution Strategies:
The physical plane can show different execution strategies that Spark can use, such as "Broadcast Join" or "Shuffle Hash Join", depending on the operation and data size. I will also explain the main algorithms of the execution plan, calm down...

Estimated Cost:
Although not always displayed, some plans may include cost estimates for different parts of the plan, helping you understand which part of processing may be most costly.

Ways to view the Apache Spark execution plan

We have the 'root' form which would be textual, using the explain() command and it will have an output similar to the one below showing a simple filter and a dataframe:

== Physical Plan ==
*(2) Filter (Value > 1)
- *(2) Project [Name#0, Value#1]
- *(1) Scan ExistingRDD[Name#0, Value#1]

And objectively, we can analyze it via the interface, through the Spark UI, in Databricks we can access it, whether in cell executions, in the job or in the cluster. In Apache Spark it is directly the IP on the default port 4040.

Spark UI is divided into several useful sections:

  • Jobs: Shows a list of all jobs executed in the application. Each job corresponds to an action in your code.

  • Stages: Displays the stages that make up each job. Stages are subdivisions of work that can be performed in parallel.

  • Tasks: Details the individual tasks within each stage, including information about task execution time and status.

  • Storage: Provides information about memory and storage usage of RDDs (Resilient Distributed Datasets).

  • Environment: Shows runtime environment properties, including Spark configurations and system variables.

  • Executors: Displays information about the executors created for the application, including memory usage, disk usage and performance statistics.

Here I was hierarchical, okay? This is the order in which things work.

I want images to put on the screen!!

Entendendo e aplicando estratégias de tunning Apache Spark

Entendendo e aplicando estratégias de tunning Apache Spark

Entendendo e aplicando estratégias de tunning Apache Spark

Spark algorithms and how to know who the tuning offenders are:

Firstly, I will explain the main algorithms that are demonstrated both in the Spark UI interface and in the execution plan, be it the logical or the physical plan:

NOTE: Remembering that datasets are the same here as a Spark table ;)

1. Let's start with the most famous Scan:

  • FileScan: Reads data from input files. It can be optimized for different file formats such as parquet, ORC, CSV, JSON and what not.

2. Join (This one gives some B.O):

  • Broadcast Hash Join: Used when one of the datasets is small enough to be transmitted to all nodes in the cluster, avoiding Shuffle (I'll explain more about this thing, but in short it's a data shuffle operation to final join).

  • Shuffle Hash Join: Both datasets (tables if you prefer) are shuffled so that the corresponding keys are in the same partition. It is used when the datasets are large and cannot be transmitted to other nodes.

  • Sort Merge Join: Requires that both datasets be sorted before Joining. It is efficient for large datasets that are already partitioned and ordered, that is, the join is being done by partitioned and also ordered columns (e.g. df.write.partitionBy("coluna1").sortBy("coluna2").parquet("path /to/save/partitioned")

3. Aggregation (sum, count, group by etc...):

  • HashAggregate: uses a hash table to aggregate data. It is efficient for large data set that fits in memory.

  • SortAggregate. Aggregates data after sorting it. It is used when the data does not fit in memory.

4. Shuffle (Watch out for this guy):

  • Shuffle: Redistributes data between partitions for operations that require reorganization, such as joins and aggregations. It is an expensive operation in terms of I/O and network.

5. Exchange:

  • Changes the distribution of data between partitions. It can be used to balance workload across cluster nodes. (a strategy to balance and escape the shuffle)

Entendendo e aplicando estratégias de tunning Apache Spark

6. Project:

  • Selects a subset of columns from a DataFrame or RDD.

7. Filter:

  • Applies conditions to filter rows of data.

8. Sort:

  • Orders data based on one or more columns.

All of these algorithms above can be observed as I said previously through the explain() command.

Real-life Shuffle problem scenarios:

1. Join and GroupBy Operations
Operations such as join() and groupByKey() often trigger shuffle, which redistributes data between partitions. This can result in:
High disk I/O usage: Shuffle generates many intermediate files, which can saturate the executors' local disk.
High network load: The amount of data transferred between executors can be substantial, depending on the number of connections required (number of mappers multiplied by the number of reducers)

  • Identification: In Spark UI, on the Stage tab, check the Shuffle Read Size/Records and Shuffle Spill (Disk) values. High volume in these metrics indicates a potential problem.
  1. Partition Imbalance (Data Skew) When data is distributed unevenly across partitions, some tasks may take much longer than others, resulting in compromised overall performance. The identification is the same, go to Spark UI, go to the job referring to the section that is taking time (here comes a point of good practice that I will mention below) and check the stuck stage (it is running, but does not progress) and see the Shuffle metrics, in general, high volume in memory and starting to have volume on disk as you refresh it indicates that this imbalance reached the memory and started writing to the disk and obviously the disk is slower, then you sit and cry if you let it this scenario.

Mitigation

  • To mitigate shuffle-related issues: Reduce operations that cause shuffle: Whenever possible, minimize the Use of groupByKey() and prefer reduceByKey(). Adjust the number of Partitions: Use spark.sql.shuffle.partitions to adjust the number of Partitions during shuffle operations. Use techniques such as Broadcast Joins: To join large sets of Data with smaller sets, thus avoiding unnecessary shuffle.

Shuffle metrics in Spark UI:

Entendendo e aplicando estratégias de tunning Apache Spark

How shuffle works and why it is costly:

Entendendo e aplicando estratégias de tunning Apache Spark

Lastly and perhaps most importantly - Good practices:

  1. The vast majority work with notebooks due to the great popularity of Databricks, Jupyter notebook and Google Colab. Therefore, divide each query or transformation into separate cells, this makes it easier to identify which part is the performance problem. Leaving everything together, there are several jobs and it's difficult to know which stage is.

  2. Use Merge instead of Overwrite, I know it's more work, but it's more logical and performative, since Merge will use less I/O than a 'dump' overwrite of the entire table again in the datalake.

  3. Use cache() or persist() to store intermediate data in memory, especially if it will be reused across multiple operations. This can reduce recomputation time and improve performance.

  4. In case you don't know, Spark runs on a JVM so it is natively Java, when you create the famous UDF - User Definition Function with Python you leave a kind of "black box" for Spark, preventing automatic optimizations. Whenever possible, use built-in Spark SQL functions, optimized for performance.

Well, I think I wrote everything I had in mind, I like writing articles because it helps me remember some scenarios. I intend to record a video showing all this, in practice with some public data, I'll probably get it on Kaggle so follow me on LinkedIn to keep up with everything related to the world of data, artificial intelligence and software development

--> https://www.linkedin.com/in/airton-lira-junior-6b81a661

Follow me on LinkedIn, give me a boost, I like feedback and I'm also completely open to improving knowledge sharing.

If you've read this far, congratulations!!! I hope it overcomes all performance issues. In the next article, I will address the advantages of Databricks, so follow me on LinkedIn to find out. Thank you!!

The above is the detailed content of Understanding and applying Apache Spark tuning strategies. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn