Introduction:
Spark Optimization is the most Important concepts. It is the responsibility of developer to design the spark application in a optimize manner so we can take full advantage of spark execution engine. When Application are not optimized, simple code take longer to execute, resulting in performance lags and downtime, and it takes effect on the other Application which is using the same cluster. Spark is a crucial component in the operations of today’s enterprises. This is why it is crucial to fine-tune Spark Application to achieve optimal performance while keeping resource consumption and related costs manageable.
There are two ways to optimize Spark Application.
Resource/Cluster level Optimization
Code Level / Application Level Optimization.
Let’s Discuss about Resource level Optimization. If developer write the optimized code but if application don’t have enough resources to run it then there is no use to write optimize code, as it will take lot of time due to lack of resources.
Before going to deep down into spark Resource level Optimization let’s first try to understand spark architecture, so it will be easy to understand the process of Resource level Optimization.
Spark Architecture:
Apache Spark follows a master/slave architecture with two main daemons and a cluster manager –
Master Daemon — (Master/Driver Process)
Worker Daemon –(Slave Process)
Cluster Manager
A spark cluster has Master Node and of Slaves/Workers Node. The driver and the executors run their individual Java processes and users can run them on the same horizontal spark cluster or on separate machines.
Master Node or driver process : The master is the driver that runs the main() program where the spark context is created. It then interacts with the cluster manager to schedule the job execution and perform the tasks.
Slaves/Workers Node: The worker consists of processes that can run in parallel to perform the tasks scheduled by the driver program. These processes are called Executors/Containers.
The role of worker nodes/executors:
1. Perform the data processing for the application code
2. Read from and write the data to the external sources
3. Store the computation results in memory, or disk.
Executors/Containers contains cores and RAM. Executors/Containers launch once in the beginning of Spark Application and then they run for the entire lifetime of an application. All the cores and total memory divides among the number of executors. One node can hold one or more then one executor. The individual task in the given Spark job runs in the Spark executors/Containers.
Executors/Containers are the combination of memory(RAM) and Cores.
Number Of Cores decides Number of Parallel Process in each executors. So let say if we have 5 cores in one executor then max 5 parallel task can execute in executor.
Tasks: Task is the smallest execution unit in Spark. A task in spark executes a series of instructions. For eg. reading data, filtering and applying map() on data can be combined into a task. Tasks are executed inside an executor. Based one HDFS block size (128 mb) spark will create partitions, and assign one task per partition. So if we have 1GB(1024 MB)data , spark will create (128mb+128mb+128mb+128mb) 4 partitions.
Stage : Group of tasks creates a Stage. Number of Stages depends on Data Shuffling(Narrow and wide Transformation). Spark encounters a function that requires a shuffle it creates a new stage. Transformation functions like reduceByKey(), Join()
etc will trigger a shuffle and will result in a new stage
Job : Group of Stages Creates a Job.
How to Decide Cluster Configuration.
Let’s Assume we have 10 node cluster.
16 CPU Cores Each Node
64 GB RAM Each Node
Now we can decide the no of Executers based on no of CPU cores. Each Executors can hold Minimum 1 Core and Max is Number of CPU cores available.
If we consider the minimum value. i.e. one core per executor.
In that case we will have 16 Executor and 1 core per executor, and each executor will have 64Gb/16=4GB RAM.
so as we discussed earlier, number of cores decided the parallel task, so in this case we can not execute more then one task at a time.
If we create any broadcast or accumulator variable then it will create copy on each executor.
so it is not good idea to assign one core per executor. This Process is also called tiny executor.
Let’s Discuss 2nd approach.
where we can create 1 single executor and assign all the 16 cores to one executor. so we will have one executor,64 core and 64 gb RAM.
Again In This case we will have 16 parallel process
It is observed and that if we execute more then 5 parallel process or have more then 5 cores per executor then the HDFS throughput suffers.
If executor holds very huge amount of memory (64gb ) then garbage collection takes lot of time.
So this is also not a good cluster configuration. This Process is also called as FAT executor.
So above tiny and fat executor is not a good cluster Configuration.
So we should always have a balanced approach , so can effectively use our cluster.
Let’s try to configure optimized Cluster.
Resources
1. Total Number Of Nodes : 10
2. Cores Per Node : 16
3. RAM Per Node : 64 GB
So, from the above Resources we need to give 1 core and 1 gb ram for other OS Activities.
So , Now we left with –
1. Total Number Of Nodes : 102. Cores Per Node : 153. RAM Per Node : 63 GB
As per the study ,5 cores per Executors is the best and preferred Choice.
15 cores → 63 GB RAM → each machineSo here we can have 15/5=3 executors per machine.Memory : 63/3=21 GB Per Executor
1 Node — → 3 Executors →5 Cores per Executors → 21 GB RAM Per Executor.
Out Of this 21GB RAM , some of Will go as part of overhead memory(off heap).
overhead memory= max(384 MB,7% Of Executor Memory)
i.e max(384 MB, 7% Of 21 GB)~1.5 GB
So the Remaining Memory Will be 21–1.5 ~ 19 GB
SO the Calculation Will be –
10 Node Cluster
Each node has 3 Executor: 10*3= 30 Executor
So 30 Executor With Each Executor will Hold. : 5 CPU Core and 19 GB Memory.
Now Out of These 30 Executors 1 executor will we given for Yarn Applications Manager.
So Now Our Final Cluster Configuration Will Look like below:
Number Of Nodes : 10Total Number Of Executors : 29Core Per Executor : 5RAM Per Executor :19 GB
Conclusion: So This was all about Cluster Level Configuration for Spark Jobs. We Discussed Tiny and FAT Executor and Discussed the drawback of each and finally discussed a balance configuration Process. We will Continue Application/Code Level Configuration from Next Article.
Important Links For Data Engineers:
1. EveryThing You Need To Need About Scala.
2. Top 20 Scala Programming Question For Interview.
3. Spark Dataframes For Beginner.
4. A Complete Guide On Spark Structure Streaming.
5. A Complete Guide On Apache Hive Basics.
6. A Complete Guide On Apache Hive Advance.
7. Hadoop Components Installation Guide In MacOs
8. Slowly Changing Dimension In Hive — DataWareHouse.
9. A Comprehensive Guide On Apache Sqoop
10. Linux Commands You’ll Actually Need for Your Data-Engineering Journey.
11. Apache Spark Checkpoints: Let’s Recover Lost Data From Offset Values.
Happy Learning!!
Note: To discover the best big-data blogs, visit Feedspot. This is the best Big Data blog list curated from thousands of blogs on the web and ranked by traffic, social media followers, domain authority, and freshness.