I assume that you are familiar with how spark runs the job, basics of distributed systems, current utilisation of cluster, job SLA, resources details etc.
There are mainly two ways we can optimise our jobs:
- Application Code
- Cluster/Resources Configurations
1. Application Code
There are lot of things we can do in spark while writing a job which i might also not be aware but i am trying to cover some important standard that each should follow according to my experience for better resource utilisation and execution :
A. Caching : Let suppose we are reading data from MySql through spark JDBC connector as a result we are getting a dataset. Now this dataset needs to be written at three different places(hive, csv file, Oracle). For all these three destinations we have different transformation to be applied. Now if we did not cache this dataset then due to lazy evaluation this dataset will be computed three times and thats a waste of execution time and resources. So it is great in such scenarios where we are going to use this dataset again and again we must persist. At the end try not to persist unnecessarily and after its work is done do not forget to unpersist cached dataset.
B. Number of Partitions : When the shuffle happens in spark it creates by default 200 partitions. Now from the development point of view we always need to keep an eye on the size of partition. We should always adjust the size of partition on the basis of data being processed.
Let suppose we are writing a dataset:
Scenario 1: Let suppose after shuffle dataset size is 1024 MB ( 1 GB) for writing. Now if we consider to have a default 200 partition then there are 200 files each having the size of (1024 MB/200) = 5.12 MB which is way less than the default block size and so many small files are also getting created so in this case we need to decrease the number of partition. In this case let suppose if the block size is 128 MB then the number of partition should be (1024 MB/128 MB) = 8 number of partitions approx.
Scenario 2: Let suppose after the shuffle dataset size is 1048576 MB (1 TB Approx.) for writing. Now if we consider to have a default 200 partition then there are 200 files each having the size of (1048576 MB/200) = 5,242.88 MB (5.12 GB Approx.) which is way more than the default block size so in this case we need to increase the number of partition. In this case let suppose if the block size is 128 MB then the number of partition should be (1048576 MB/128 MB) = 8192 number of partitions approx.
Note: In case of increasing the number of partition we also need to consider that default spark shuffle block cannot exceed 2GB. So for an example here if we are creating 8192 files of 128 MB we can always increase the file size upto 2GB but advisable is 1 GB and decrease the number of files within limits of shuffle blocks for better performance. So the calculation in this will look like (1048576 MB/1024 MB) = 1024 number of partitions approx.
To increase or decrease the number of partition, We can use different properties or functions:
df.repartition(1024) or df.coalesce(8)
C. Broadcasting : In distributed environment when there is a large gap between the size of dataset we can always practice to load the smaller dataset into memory and broadcast these at every executor so when some join will happen no data shuffling will take place.
D. UDF : Spark optimiser does not understand the logic written inside the user defined function. So when it evaluates the UDF, System goes flat with logic and execute as it is without considering the best optimised case. We should avoid using UDF unless really necessary.
There can be other lot of cases which can make the job faster and better like not using frequent actions like count or show, using the write file format, eliminate redundant transformations, reduce IO operations, using right serialisation like kyro etc.
2. Cluster/Resources Configurations
There can be different properties which we can use for fast performance and resource optimisation but here we are discussing some of the properties and methods that i have used in my experience.
A. Off-heap memory : Shuffle data structure can be stored in off-heap memory as a result this is not maintained by JVM at all which eliminates the chance of garbage collection hence the job can become faster.
Following property can enable this off-heap configuration:
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 1g (this can be modified on the basis of need)
B. Garbage Collector : GC in JVM automatically determines what memory is no longer used by application and then it recycle this space for other uses. Now in distributed environment these GC can kill time of the job more in garbage collection than execution. To eliminate this we can use different configuration such as following:
— conf spark.driver.extraJavaOptions=”-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=20"
— conf spark.executor.extraJavaOptions=”-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=20"
Before using GC configuration on driver and executors:
After using GC configuration on driver and executors:
To read more about the GC for Spark:
C. Number of Partition : This is same as point B in Application Code optimisation. We can do this from outside as well while submitting the spark job.
— conf spark.sql.shuffle.partitions=10 (10 is just for example this can be increased or decreased accordingly)
D. Network Timeout : In spark when the data load is too high we generally see the spark timeout exception or executor out of network exception. This happens because the executor reports its heartbeat to the driver. So in case if GC is taking more time in executor then spark.network.timeout should help driver waiting to get response from executor before it marked it as lost and start new. If the job start a new one then the task executed by the previous executor will be recalculated. Hence time and resources will be consumed again. This will make the job slow and expensive so setting this property will save the job from starting new executor and it will wait for response some more defined time.
— conf spark.network.timeout=240s (240 second is just for example and can be changed accordingly)
E. Compress Shuffle Spill : In spark when the shuffling happens data spills will take place which can be compressed using following property:
— conf spark.shuffle.spill.compress=true
F. Dynamic Allocation : In spark we can use two types of resource allocation. First is static allocation and other is dynamic allocation. In general almost in all production use cases we should use dynamic allocation because it gives the option of getting the resources as per need and does not block the resources if they are not needed like static allocation.
You can enable the dynamic allocation by setting following configuration:
— conf spark.shuffle.service.enabled=true
— conf spark.dynamicAllocation.enabled=true
In dynamic allocation we do not need to pass the number of executors as it takes accordingly as per the need of the job.
— driver-memory 2g — executor-memory 4g — executor-cores 4
Now back on the performance tuning side the problem with dynamic allocation is that if the job is running on priority scheduling basis then it can take up to n number of executors or we can say it can use any number of percentage of cluster in use for this single jobs because there is no upper or lower circuit is set on this job as a result it will be difficult for others job as well to run on the same cluster.
Now the question arises that even if the job will take all the resources in the cluster for this single job only then it means the job will run faster and finish early but if we understand the concept of shuffling we will not be agree with this statement because too less or too more parallelism in the job can adversely affect the job and whole impact of the job will become negative because if we have less number of executor than shuffling can be less which is good but the parallelism is less which not good on the other side if we have more number of executor then shuffling will be more which is not good but parallelism will be more which is good. So we always need to find the balance between both.
For an example:
Scenario 1: Let suppose we have 100 GB of data in the shuffling process and configuration are executor memory=50GB and number of executors=2
Scenario 2: Let suppose we have 100 GB of data in the shuffling process and configuration are executor memory=10GB and number of executors=10
So if we pass this as a static allocation then in some cases scenario one will be better and in some cases scenario 2 will be better.
To overcome this we can pass following properties to set the upper and lower limit to number of executor so that it can dynamically adjust resources according to the need of the job.
— conf spark.dynamicAllocation.minExecutors=2 — conf spark.dynamicAllocation.maxExecutors=20
After setting these properties this job will start with for an example minimum 2 executors and can go up to maximum 20 executors.
There are lot of things we need to consider while optimising the spark job. If we are clear then with using best practices and knowledge we can tune any job.
Performance tuning of any job of any kind comes with exploring and experience in the same domain so keep exploring new things.
Happy Learning :)