Note: This article is a compilation effort of multiple performance tuning methodologies in Apache Spark. Text/Images in following article has been referred from various interesting articles and book, details of which are captured under “References”.
Viewing and Setting Apache Spark Configurations
4 ways of doing it :
Using $SPARK_HOME directory (Configuration changes in the conf/spark-defaults.conf file apply to the Spark cluster and all Spark applications submitted to the cluster.):
Specify Spark configurations directly in your Spark application or on the command line when submitting the application with spark-submit, using the –conf flag
Ex: spark-submit –conf spark.sql.shuffle.partitions=5 –conf “spark.executor.memory=2g” –class main.scala.chapter7.SparkConfig_7_1 jars/main-scala-chapter7_2.12-1.0.jar
The third option is through a programmatic interface via the Spark shell. Through the SparkSession object, you can access most Spark config settings.
Note: First check if the property is modifiable. spark.conf.isModifiable(“<config_name>”) will return true or false.
View only the Spark SQL–specific Spark configs ::
# In Python
spark.sql(“SET -v”).select(“key”, “value”).show(n=5, truncate=False)
keyvaluespark.sql.adaptive.enabledfalsespark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin0.2spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabledtruespark.sql.adaptive.shuffle.localShuffleReader.enabledtruespark.sql.adaptive.shuffle.maxNumPostShufflePartitions<undefined>only showing top 5 rows
# In Python
>>> spark.conf.set(“spark.sql.shuffle.partitions”, 5)
SparkSession > spark-submit > spark-defaules.conf
Access Spark’s current configuration through the Spark UI’s Environment
Static vs Dynamic allocation
- Spark driver can request more or fewer compute resources as the demand of large workloads flows and ebbs
- Enabling dynamic resource allocation allows Spark to achieve better utilisation of resources, freeing executors when not in use and acquiring new ones when needed.
- To enable and configure dynamic allocation, you can use settings like the following. Note that the numbers here are arbitrary; the appropriate settings will depend on the nature of your workload and they should be adjusted accordingly.
By default spark.dynamicAllocation.enabled is set to false. When enabled with the settings shown here, the Spark driver will request that the cluster manager create two executors to start with, as a minimum (spark.dynamicAllocation.minExecutors). As the task queue backlog increases, new executors will be requested each time the backlog timeout (spark.dynamicAllocation.schedulerBacklogTimeout) is exceeded. In this case, whenever there are pending tasks that have not been scheduled for over 1 minute, the driver will request that a new executor be launched to schedule backlogged tasks, up to a maximum of 20 (spark.dynamicAllocation.maxExecutors). By contrast, if an executor finishes a task and is idle for 2 minutes (spark.dynamicAllocation.executorIdleTimeout), the Spark driver will terminate it.
Spark Executor memory
The amount of memory available to each executor is controlled by spark.executor.memory. This is divided into three sections: execution memory, storage memory, and reserved memory. The default division is 60% for execution memory and 40% for storage, after allowing for 300 MB for reserved memory, to safeguard against OOM errors.
Execution memory is used for Spark shuffles, joins, sorts, and aggregations.
spark.memory.fraction is 0.6 by default
Much of Spark’s efficiency is due to its ability to run multiple tasks in parallel at scale.
To optimize resource utilization and maximize parallelism, the ideal is at least as many partitions as there are cores on the executor
The size of a partition in Spark is dictated by spark.sql.files.maxPartitionBytes. The default is 128 MB. You can decrease the size, but that may result in what’s known as the “small file problem”—many small partition files, introducing an inordinate amount of disk I/O and performance degradation thanks to filesystem operations such as opening, closing, and listing directories, which on a distributed filesystem can be slow.
Spark Shuffle is an expensive operation since it involves the following
- Disk I/O
- Involves data serialization and deserialization
- Network I/O
We cannot completely avoid shuffle operations but when possible try to reduce the number of shuffle operations by removing any unused operations.
Number of shuffle partitions
Shuffle partitions are created during the shuffle stage. By default, the number of shuffle partitions is set to 200 in spark.sql.shuffle.partitions. You can adjust this number depending on the size of the data set you have, to reduce the amount of small partitions being sent across the network to executors’ tasks.
Created during operations like groupBy() or join(), also known as wide transformations, shuffle partitions consume both network and disk I/O resources. During these operations, the shuffle will spill results to executors’ local disks at the location specified in spark.local.directory. Having performant SSD disks for this operation will boost the performance.
Shuffle Service internals
During map and shuffle operations, Spark writes to and reads from the local disk’s shuffle files, so there is heavy I/O activity. This can result in a bottleneck, because the default configurations are suboptimal for large-scale Spark jobs. Knowing what configurations to tweak can mitigate this risk during this phase of a Spark job.
|Configuration||Default Value, recommendation and description|
|spark.driver.memory||Default is 1g (1 GB). This is the amount of memory allocated to the Spark driver to receive data from executors. This is often changed during spark-submit with –driver-memory.Only change this if you expect the driver to receive large amounts of data back from operations like collect(), or if you run out of driver memory.|
|spark.shuffle.file.buffer||Default is 32 KB. Recommended is 1 MB. This allows Spark to do more buffering before writing final map results to disk.|
|spark.file.transferTo||Default is true. Setting it to false will force Spark to use the file buffer to transfer files before finally writing to disk; this will decrease the I/O activity.|
|spark.shuffle.unsafe.file.output.buffer||Default is 32 KB. This controls the amount of buffering possible when merging files during shuffle operations. In general, large values (e.g., 1 MB) are more appropriate for larger workloads, whereas the default can work for smaller workloads.|
|spark.io.compression.lz4.blockSize||Default is 32 KB. Increase to 512 KB. You can decrease the size of the shuffle file by increasing the compressed size of the block.|
|spark.shuffle.service.index.cache.size||Default is 100m. Cache entries are limited to the specified memory footprint in byte.|
|spark.shuffle.registration.timeout||Default is 5000 ms. Increase to 120000 ms.|
|spark.shuffle.registration.maxAttempts||Default is 3. Increase to 5 if needed.|
- Bucketing is another data organisation technique that groups data with the same bucket value.
- Bucketing boosts performance by already sorting and shuffling data before performing sort-merge joins.
- The information about bucketing is stored in the metastore
- vs Partition:
- Bucketing distributes data across a fixed number of buckets by a hash on the bucket value; whereas, partitioning creates a directory for each partition.
- Bucketing works well when the number of unique values is large; whereas, partitioning should only be used with columns that have a limited number of values.
- Bucketing might be used with or without partitioning.
- Columns that are commonly used in aggregations and joins as keys are suitable candidates for bucketing. In a general manner, joins, groupBy, distinct transformations are benefited from bucketing.
df = df.bucketBy(32, ‘key’).sortBy(‘value’)
- It is important the have the same number of buckets on both sides of the tables in the join.
- Joins are one of the fundamental operation when developing a spark job. So, it is worth knowing about the optimizations before working with joins.
- Spark approaches two types of cluster communication Strategy:
- node-node communication strategy → Spark shuffles the data across the clusters
- per-node communication strategy → Spark perform broadcast joins
Shuffle Hash join
- works based on the concept of map reduce
- Map through the data frames and use the values of the join column as output key. Shuffles the data frames based on the output keys and join the data frames in the reduce phase as the rows from the different data frame with the same keys will ended up in the same machine.
- Spark chooses Shuffle Hash join when Sort merge join is turned off or if the key is not suitable
- Creating hash tables are costly and it can be only done when the average size of a single partition is small enough to build a hash table.
- Note: Sort merge join is a very good candidate in most of times as it can spill the data to the disk and doesn’t need to hold the data in memory like its counterpart Shuffle Hash join. However, when the build size is smaller than the stream size Shuffle Hash join will outperform Sort Merge join.
- To accomplish ideal performance:
- The DataFrame should be distributed evenly on the joining columns.
- To leverage parallelism the DataFrame should have an adequate number of unique keys.
- Tips: During joins if there are rows which are irrelevant to the key, filter the rows before the join. Otherwise, there will be more data shuffle over the network.
Broadcast Hash joins
- Per-node communication strategy.
- Yield the maximum performance in spark.
- it is relevant only for little datasets.
- The smaller table will be broadcasted to all worker nodes. Thus, when working with one large table and another smaller table always makes sure to broadcast the smaller table.
- Recently Spark has increased the maximum size for the broadcast table from 2GB to 8GB. Thus, it is not possible to broadcast tables which are greater than 8GB.
- Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using “spark.sql.autoBroadcastJoinThreshold” which is by default 10MB.
- We can hint spark to broadcast a table as below:import org.apache.spark.sql.functions.broadcast
val dataframe = largedataframe.join(broadcast(smalldataframe), “key”)
Shuffle Sort merge join
- Default join algorithm in spark
- However, this can be turned down by using the internal parameter ‘spark.sql.join.preferSortMergeJoin’ which by default is true
- Composed of 2 steps:
- The first step is to sort the datasets and,
- Second operation is to merge the sorted data in the partition by iterating over the elements and according to the join key join the rows having the same value.
- To accomplish ideal performance in Sort Merge Join:
- Make sure the partitions have been co-located. Otherwise, there will be shuffle operations to co-locate the data as it has a pre-requirement that all rows having the same value for the join key should be stored in the same partition.
- The DataFrame should be distributed uniformly on the joining columns.
- To leverage parallelism the DataFrame should have an adequate number of unique keys
Tackle Skew Data
When one partition has more data to process as compared to others it result into performance bottleneck due to this uneven distribution of data.
If the data is highly skewed, it might even cause a spill of the data from memory to disk.
- To observe the distribution of data among partitions, glom function might be used.
- Additionally, uneven distribution of data might be also detected with the help of task execution time and tasks’ processed data volume information screened in the executor page of Spark UI.
partition_number = df.rdd.getNumPartitions()
data_distribution = df.rdd.glom().map(len).collect()
Spark 3.0 version comes with a nice feature Adaptive Query Execution which automatically balances out the skewness across the partitions
Two separate workarounds come forward to tackle skew in the data distribution among the partitions — salting and repartition.
It means adding randomization to the data to help it to be distributed more uniformly.
An extra processing cost is paid in return for evenly distributed data across the partitions, and so performance gains.
Salting technique is applied only to the skewed key
random values are added to the key. Then, <key1+random_salting_value> is obtained, and this created new key values are matched with the replicated corresponding key values in the other table if it is a join operation.
For Example: take a look at the following example where the key column is city information in join, and the distribution of the key column is highly skewed in tables. To distribute the data evenly, we append random values from 1 to 5 to the end of key values for the bigger table of join and compose a new column in the smaller table by exploding an array from 1 to 5.
Repartition does a full shuffle, creates new partitions, and increases the level of parallelism in the application.
More partitions will help to deal with the data skewness problem with an extra cost that is a shuffling of full data as mentioned above. However adding one shuffle to the query plan might eliminate two other shuffles, and speed up the running
Apart from data skew, I highly recommend taking a look at this post, which gives examples about the usage of repartition efficiently with use cases and explains the details under the hood.
AQE (Adaptive Query Execution)
Adaptive Query Execution is an enhancement enabling Spark 3 to alter physical execution plans at runtime, which allows improvements on the physical implementation based on statistics collected after shuffle exchange operations.
Level of parallelism and join implementation strategy have been shown to be key factors for query performance on large clusters. According to this premise, new optimisation rules have been built on top of the AQE feature:
- dynamically coalescing shuffle partitions by eventually merging small adjacent ones.
- dynamically replacing a Sort-Merge join for a Broadcast-Hash join when size conditions are met on any side.
- dynamically optimising skew joins by eventually scheduling multiple reduce tasks to work on a single huge partition.
Set following 3 parameters :
Iterative broadcast join
Split one of the table into multiple small parts and broadcast it on all executors sequentially to join with skewed table. Combine result at the end.
When you perform a
join command with
Dataset objects, if you find that the query is stuck on finishing a small number of tasks due to data skew, you can specify the skew hint with the
df.hint("skew"). The skew join optimization is performed on the
DataFrame for which you specify the
In addition to the basic hint, you can specify the
hint method with the following combinations of parameters: column name, list of column names, and column name and skew value.
#DataFrame and column name. df.hint("skew", "col1") #DataFrame and multiple columns. df.hint("skew", ["col1","col2"]) #DataFrame, column name, and skew value. df.hint("skew", "col1", "value")
To take full advantage of lazy evaluation functionality, it is very wise to store expensive intermediate results if several operations use them downstream of the DAG. Indeed, if an action is run, its computation can be based on these intermediate results and thus only replay a sub-part of the DAG before this action.
Let’s take the following DAG as an example:
To obtain the results of the two actions, the treatments are described in the two DAGs below.
In order to speed up the execution, one can decide to cache intermediate results (e.g. the result of a join).
The processing of the second action is now simplified. Note that during the first action the results have not yet been stored in memory.
If this caching can speed up execution of a job, we pay a cost when these results are written to memory and/or disk. It should be tested at different locations in the processing pipeline whether the total time saving outweighs the cost. This is especially relevant when there are several paths on the DAG.
cache() will store as many of the partitions read in memory across Spark executors as memory allows.
While a DataFrame may be fractionally cached, partitions cannot be fractionally cached (e.g., if you have 8 partitions but only 4.5 partitions can fit in memory, only 4 will be cached). However, if not all your partitions are cached, when you want to access the data again, the partitions that are not cached will have to be recomputed, slowing down your Spark job.
Persist(StorageLevel.LEVEL) is nuanced, providing control over how your data is cached via StorageLevel
|MEMORY_ONLY||Data is stored directly as objects and stored only in memory.|
|MEMORY_ONLY_SER||Data is serialized as compact byte array representation and stored only in memory. To use it, it has to be deserialized at a cost.|
|MEMORY_AND_DISK||Data is stored directly as objects in memory, but if there’s insufficient memory the rest is serialized and stored on disk.|
|DISK_ONLY||Data is serialized and stored on disk.|
|OFF_HEAP||Data is stored off-heap. Off-heap memory is used in Spark for storage and query execution; see “Configuring Spark executors’ memory and the shuffle service”.|
|MEMORY_AND_DISK_SER||Like MEMORY_AND_DISK, but data is serialized when stored in memory. (Data is always serialized when stored on disk.)|
Use Dataframe/Dataset over RDD
- Using RDD directly leads to performance issues as Spark doesn’t know how to apply the optimization techniques and RDD serialize and de-serialize the data when it distributes across a cluster (repartition & shuffling).
- Serialization and de-serialization are very expensive operations for Spark applications or any distributed systems, most of our time is spent only on serialization of data rather than executing the operations hence try to avoid using RDD.
- The DataFrame API does two things that help to do this (through the Tungsten project).
- First, using off-heap storage for data in binary format.
- Second, generating encoder code on the fly to work with this binary format for your specific objects.
- Since Spark/PySpark DataFrame internally stores data in binary there is no need of Serialization and deserialization data when it distributes across a cluster hence you would see a performance improvement.
Use coalesce() over repartition()
- coalesce() is an optimised and improved version of repartition()
- coalesce() → decreases/increases the number of partitions while avoiding a shuffle in the network. As it does not redistribute the complete data, so this does not solve imbalance problem in distribution of data.
- repartition() → creates new partitions by full shuffling of data(costly). As data got evenly distributed, so it increases degree of parallelism.
use mapPartitions() over map()
- Both are RDD based operations
mapPartitions()provides a facility to do heavy initializations (for example Database connection) once for each partition instead of doing it on every DataFrame row. This helps the performance of the Spark jobs when you dealing with heavy-weighted initialization on larger datasets.
- Whereas in the map() it does the same on one row each time.
- Note: One key point to remember is these both transformations returns the Dataset[U] but not the DataFrame (In Spark 2.0, DataFrame = Dataset[Row])
- Try to avoid Spark/PySpark UDF’s at any cost and use when existing Spark built-in functions are not available for use.
- User-defined functions de-serialize each row to object, apply the lambda function and re-serialize it resulting in slower execution and more garbage collection time.
- UDF’s are a black box to Spark hence it can’t apply optimisation and you will lose all the optimisation Spark does on Dataframe/Dataset.
Disable unnecessary DEBUG and INFO logging
- This is one of the simple ways to improve the performance of Spark Jobs and can be easily avoided by following good coding principles.
- During the development phase of Spark/PySpark application, we usually write debug/info messages to console using
println()and logging to a file using some logging framework (log4j);
- These both methods results I/O operations hence cause performance issues when you run Spark jobs with greater workloads. Before promoting your jobs to production make sure you review your code and take care of the following.
- Checkpoint truncates the execution plan and saves the checkpointed data frame to a temporary location on the disk and reload it back in – it comes up as a performance-boosting factor
- The point is that each time you apply a transformation or perform a query on a data frame, the query plan grows. Spark keeps all history of transformations applied on a data frame that can be seen when run explain command on the data frame. When the query plan starts to be huge, the performance decreases dramatically, generating bottlenecks.
- In this manner, checkpoint helps to refresh the query plan and to materialize the data. It is ideal for scenarios including iterative algorithms and branching out a new data frame to perform different kinds of analytics. More tangibly, after checkpointing the data frame, you don’t need to recalculate all of the previous transformations applied on the data frame, it is stored on disk forever.
- Note that, Spark won’t clean up the checkpointed data even after the sparkContext is destroyed and the clean-ups need to be managed by the application.
- It is also a good property of checkpointing to debug the data pipeline by checking the status of data frames.
- Caching is also an alternative for a similar purpose in order to increase performance. It obviously requires much more memory compared to checkpointing. There is a good comparison between caching and checkpointing, and when to prefer one of them to the other. You can take a look at here.
Use proper file formats
- Apache Parquet is a columnar storage format designed to select only queried columns and skip over the rest. It gives the fastest read performance with Spark.
- Parquet arranges data in columns, putting related values close to each other to optimize query performance, minimize I/O, and facilitate compression.
- Furthermore, it implements column pruning and predicate pushdown (filters based on stats) which is simply a process of only selecting the required data for processing when querying a huge table. It prevents loading unnecessary parts of the data in-memory and reduces network usage.
use toPandas() with pyArrow
- Apache Arrow is a cross-language development platform for in-memory data. It specifies a standardized language-independent columnar memory format for flat and hierarchical data. More clearly, Apache Arrow is a bridge between cross-language platforms that facilities reading a Spark data frame and then writing data frame into Apache Cassandra without suffering enormous inefficient serialization and deserialization performance.
- Apache PyArrow is the Python implementation of Arrow. It provides a Python API that brings together the functionalities of Arrow with Python environment including leading libraries like pandas and numpy
- In Spark, data is processed very fast as long as it is in JVM. However, for some reasons like rich and talented data processing libraries in Python, the data might be moved between Python environment and JVM by Pyspark developers. In this sense, utilizing PyArrow while moving from pandas data frame to Spark data frame or vice-versa results in a huge performance improvement.
- Don’t use count() when you don’t need to return the exact number of rows. To check if data frame is empty, len(df.head(1))>0 will be more accurate considering the performance issues.
- Do not use show() in your production code.
- It is a good practice to use df.explain() to get insight into the internal representation of a data frame in Spark(the final version of the physical plan).
- Always try to minimize the data size by filtering irrelevant data(rows/columns) before joinings.
- Monitor Spark applications online/offline. It might give you any clues about unbalanced data partitions, where the jobs are stuck, and query plans. An alternative to Spark UI might be Ganglia.
- Basically, avoid using loops.
- Focus on built-in functions rather than custom solutions.
- Ensure that key columns in join operation do not include null values.
- Put the bigger dataset on the left in joins.
- Keep in mind that Spark runs with Lazy Evaluation logic. So, nothing is triggered until an action is called. That might result in meaningless error codes.
- Unpersist the data in the cache, if you don’t need it for the rest of the code.
- Close/stop your Spark session when you are done with your application.
- In Spark 3.0, significant improvements are achieved to tackle performance issues by Adaptive Query Execution, take upgrading the version into consideration.
- Prefer data frames to RDDs for data manipulations.
- In general, tasks larger than about 20 KiB are probably worth optimizing.
- In general, it is recommended 2–3 tasks per CPU core in your cluster.
- It is always good to have a block within 128MB per partition to achieve parallelism.
- Csv and Json data file formats give high write performance but are slower for reading, on the other hand, Parquet file format is very fast and gives the best performance in reading and slower than the other mentioned file formats concerning writing operation.
- The physical plan is read from the bottom up, whereas the DAG is read from the top down.
- The Exchange means a shuffle occurred between stages, and it is basically a performance degradation.
- An excessive number of stages might be a sign of a performance problem.
- Garbage collection(GC) is another key factor that might cause performance issues. Check it out from the Executors tab of Spark UI. You may typically use Java GC options in any GC-related case.
- Serialization also plays an important role in the performance of any distributed application. Formats that are slow to serialize objects into, or consume a large number of bytes, will greatly slow down the computation. For Scala/Java-based Spark applications, Kryo serialization is highly recommended. In Pyspark, Marshal and Pickle serializers are supported, MarshalSerializer is faster than PickleSerializer but supports fewer data types.
- Note that you might experience a performance loss if you prefer to use Spark in the docker environment.