Table of Content
This is the third article of a four-part series about Apache Spark on YARN. Apache Spark allows developers to run multiple tasks in parallel across machines in a cluster or across multiple cores on a desktop. A partition, aka split, is a logical chunk of a distributed data set. Apache Spark builds Directed Acyclic Graph (DAG) with jobs, stages, and tasks for the submitted application. The number of tasks will be determined based on number of partitions.
Few performance bottlenecks were identified in the SFO Fire department call service dataset use case with YARN cluster manager. To understand about the use case and performance bottlenecks identified, refer our previous blog on Apache Spark on YARN – Performance and Bottlenecks. The Resource planning bottleneck is addressed and notable performance improvements achieved in the use case Spark application is discussed in our previous blog on Apache Spark on YARN – Resource Planning.
In this blog post, let us discuss about the partition problem and tuning the partitions of the use case Spark application.
Our other articles of the four-part series are:
- Part 1 – Apache Spark on YARN – Performance and Bottlenecks
- Part 2 – Apache Spark on YARN – Resource Planning
- Part 4 – Apache Spark Performance Tuning – Straggler Tasks
Spark Partition Principles
The general principles to be followed when tuning partition for Spark application is as follows:
- Too few partitions – Cannot utilize all cores available in the cluster.
- Too many partitions – Excessive overhead in managing many small tasks.
- Reasonable partitions – Helps us to utilize the cores available in the cluster and avoids excessive overhead in managing small tasks.
Understanding Use Case Performance
The performance duration (without any performance tuning) based on different API implementations of the use case Spark application running on YARN is shown in the below diagram:
The performance duration (after performance tuning) after tuning the number of executors, cores, and memory for RDD and DataFrame implementation of the use case Spark application is shown in the below diagram:
For tuning of the number of executors, cores, and memory for RDD and DataFrame implementation of the use case Spark application, refer our previous blog on Apache Spark on YARN – Resource Planning.
Let us understand the Spark data partitions of the use case application and decide on increasing or decreasing the partition using Spark configuration properties.
Understanding Spark Data Partitions
The two configuration properties in Spark to tune the number of partitions at run time are as follows:
Default parallelism and shuffle partition problems in both RDD and DataFrame API based application implementation is shown in the below diagram:
count () action stage using default parallelism (12 partitions) is shown in the below diagram:
From the Summary Metrics for Input Size/Records section, the Max partition size is ~128 MB.
On considering the Event Timeline to understand those 200 shuffled partition tasks, there are tasks with more scheduler delay and less computation time. It indicates 200 tasks are not necessary here and can be tuned to decrease the shuffle partition to reduce scheduler burden.
The Stages view in Spark UI indicates that most of the tasks are simply launched and terminated without any computation as shown in the below diagram:
Spark Partition Tuning
Let us first decide the number of partitions based on the input dataset size. The Thumb Rule to decide the partition size while working with HDFS is 128 MB. As our input dataset size is about 1.5 GB (1500 MB) and going with 128 MB per partition, the number of partitions will be:
Total input dataset size / partition size => 1500 / 128 = 11.71 = ~12 partitions
This is equal to the Spark default parallelism (spark.default.parallelism) value. The metrics based on default parallelism is shown in the above section.
Now, let us perform a test by reducing the partition size and increasing number of partitions.
Consider partition size as 64 MB
Number of partitions = Total input dataset size / partition size => 1500 / 64 = 23.43 = ~23 partitions
DataFrame API implementation is executed using the below partition configurations:
The RDD API implementation is executed using the below partition configurations:
Note: spark.sql.shuffle.partitions property is not applicable for RDD API based implementation.
Running Spark on YARN with Partition Tuning
./bin/spark-submit --name FireServiceCallAnalysisDataFramePartitionTest --master yarn --deploy-mode cluster --executor-memory 2g --executor-cores 2 --num-executors 2 --conf spark.sql.shuffle.partitions=23 --conf spark.default.parallelism=23 --class com.treselle.fscalls.analysis.FireServiceCallAnalysisDF /data/SFFireServiceCall/SFFireServiceCallAnalysis.jar /user/tsldp/FireServiceCallDataSet/Fire_Department_Calls_for_Service.csv
The Stages view based on spark.default.parallelism=23 and spark.sql.shuffle.partitions=23 is shown in the below diagram:
Consider Tasks: Succeeded/Total column in the above diagram. Both default and shuffle partitions are applied and number of tasks are 23.
count () action stage using default parallelism (23 partitions) is shown in the below screenshot:
On considering Summary Metrics for Input Size/Records section, the max partition size is ~66 MB.
On looking into the shuffle stage tasks, the scheduler has launched 23 tasks and most of the times are occupied by shuffle (Read/Write). There are no tasks without computation.
The output obtained after executing Spark application with different number of partitions is shown in the below diagram:
In this blog, we discussed about partition principles and understood about the use case performance, deciding the number of partitions, and partition tuning using Spark configuration properties.
The Resource planning bottleneck is addressed and notable performance improvements achieved in the use case Spark application is discussed in our previous blog on Apache Spark on YARN – Resource Planning. To understand about the use case and performance bottlenecks identified, refer our previous blog on Apache Spark on YARN – Performance and Bottlenecks. But, the performance of spark application remains unchanged.
In our upcoming blog, let us discuss about the final bottleneck of the use case on “Apache Spark Performance Tuning – Straggler Tasks”.
The final performance achieved after resource tuning, partition tuning, and straggler tasks problem fixing is shown in the below diagram:
- Apache Spark on YARN – Performance and Bottlenecks: http://www.treselle.com/blog/apache-spark-on-yarn-performance-and-bottlenecks
- Apache Spark on YARN – Resource Planning: http://www.treselle.com/blog/apache-spark-on-yarn-resource-planning
- The code examples are available in GitHub: https://github.com/treselle-systems/sfo_fire_service_call_analysis_using_spark
- To understand Apache Spark jobs, stages, DAG and executors from Spark History server UI, refer our blog on: Text Normalization with Spark – Part 2.
- Introducing Apache Spark 2.0: https://databricks.com/blog/2016/07/26/introducing-apache-spark-2-0.html
- Apache Spark: RDD, DataFrame or Dataset?: http://www.kdnuggets.com/2016/02/apache-spark-rdd-dataframe-dataset.html
- RDD Partitions and Partitioning: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html