Apache Spark on YARN – Performance and Bottlenecks

Apache Spark on YARN – Performance and Bottlenecks


Apache Spark 2.x version ships with second-generation Tungsten engine. This engine is built upon ideas from modern compilers to emit optimized code at runtime that collapses the entire query into a single function by using “whole-stage code generation” technique. Thereby, eliminating virtual function calls and leveraging CPU registers for intermediate data. This optimization is applied only to Spark high-level APIs such as DataFrame and Dataset and not to low-level RDD API.

Though Tungsten optimizes the Spark application code at runtime, Spark application performance can be improved by tuning configuration parameters, parallelism, JVM tuning, and YARN configuration tuning if the Spark application runs on YARN.

This is the first article of a four-part series about Apache Spark on YARN. In this blog, let us discuss about high-level and low-level Spark API performances. SFO Fire department call service dataset and YARN cluster manager are chosen to test as well as tune the application performance.

Our other articles of the four-part series are:

About Dataset

SFO Fire Calls-For-Service dataset includes responses of all fire units to calls. This dataset has 34 columns and 4.36 million of rows. This dataset will be updated on daily basis. For more details about this dataset, refer SFO website (the link is provided in the reference section).

SFO Fire Department Dataset in HDFS

About Apache Hadoop Cluster

2 node Apache Hadoop cluster is set up using HDP 2.6 distribution, which comes with Spark 2.1. This distribution is used for Spark application execution.

Instance details: m4.xlarge (4 cores, 16 GB RAM)

Cluster details: The summary of cluster setup is shown in the below diagram:

HDP Cluster Summary

Use Case

To understand the Spark performance and application tuning, a Spark application is created using RDD, DataFrame, Spark SQL, and Dataset APIs to answer the below questions from the SFO Fire department call service dataset.

  • How many types of calls were made to the fire department?
  • How many incidents of each call type were there?
  • How many years of fire service calls are in the data file?
  • How many service calls were logged in for the past 7 days?
  • Which neighborhood in SF generated the most calls last year?

To answer all the questions except first question, data grouping should be performed (it is data shuffle in terms of Spark).

Note: One Spark task can handle one partition (partition = data + computation logic).

Low-Level and High-Level API Implementation

In this section, let us discuss about low-level and high-level Spark API implementation to answer the above questions. For more details about API, please refer Spark website.

Resilient Distributed Dataset (RDD) Implementation (Low-Level API)

  • The RDD API, in Spark since 1.0 release, can easily and efficiently process both structured and unstructured data.
  • RDD do not take advantage of Spark’s optimizers such as Catalyst and Tungsten. Developers need to optimize each RDD based on its characteristics attributes

DataFrame Implementation (High-Level API)

  • Introduced as part of the Project Tungsten initiative in Spark 1.3 to improve performance and scalability of Spark.
  • Introduces the concept of a schema to describe the data and is radically different from the RDD API as it is an API for building a relational query plan that Spark’s Catalyst optimizer can execute.
  • Gains the advantage of Spark’s optimizers such as Catalyst and Tungsten.

Spark SQL Implementation (High-Level API)

  • Spark SQL lets you query the data using SQL, both inside a Spark program and from external tools that are connected to Spark SQL through standard database connectors (JDBC/ ODBC) such as Business Intelligence tools like Tableau.
  • It provides a DataFrame abstraction in Python, Java, and Scala to simplify working with structured datasets. DataFrames are similar to tables in a relational database.
  • Spark SQL gains the advantage of Spark’s optimizers such as Catalyst and Tungsten as its abstraction is DataFrame.

Dataset Implementation (High-Level API)

  • The Dataset API, released as an API preview in Spark 1.6, provides the best of both RDD and DataFrame.
  • Datasets acquire two discrete APIs characteristics such as strongly typed and untyped.
  • Datasets and DataFrame use very advanced Spark built-in encoders. The encoders provide on-demand access to individual attributes without de-serializing an entire object and generate byte code to interact with off-heap data.
  • Dataset API gains the advantage of Spark’s optimizers such as Catalyst and Tungsten.

Running Spark on YARN

There are two deployment modes such as cluster and client modes for launching Spark applications on YARN.

  • In cluster mode, the Spark driver runs inside an application master process managed by YARN on the cluster. The client goes away after initiating the application.
  • In client mode, the application master only requests resources from YARN and the Spark driver runs in client process.

Resource (executors, cores, and memory) planning is an essential part when running Spark application as Standalone, and on YARN and Apache Mesos. Especially in YARN, “memory overhead” is a vital configuration while planning for Spark application resource.

Default Spark Configuration for YARN

Plenty of properties can be configured while submitting Spark application on YARN. As part of resource planning, the following are important:


Note: In Cluster mode: Spark driver runs inside a YARN Application Master (AM), which will be launched as per the resource allocated for driver with memory overhead. In Client mode: Spark driver runs in the client process and YARN Application Master (AM) resource should be allocated. In both modes, executor resource should be planned and allocated.

Submitting Spark Application in YARN

Pre-requisites: Let us assemble all the Spark applications as a Jar using Scala Build Tool (SBT). We have launched the Spark application in YARN in cluster mode with default Spark configuration.

Monitoring Driver and Executor Resource

On successfully submitting the Spark application, the below message will be displayed in the console. The message states the amount of memory allocated for Application Master (AM). It is 1408 MB including 384 MB memory overhead i.e. driver default configuration.

Driver AM Resource

Executor memory and core can be monitored in both Resource Manager UI and Spark UI. Executor tab in the Spark UI displays the number of executors and resources allocated to the executor. Driver core in the below diagram is ‘0’ though default driver core is 1. This 1 core is used by YARN application master. Storage memory under executor is shown based on memory used / total available memory for storage of data like RDD partitions cached in memory.

Fire Service Analysis DF Executor Stats

Understanding Spark Internals

Spark constructs Direct Acyclic Graph (DAG) using DAGScheduler based on transformation and action used in the application. Jobs, Stages, and tasks are the internal part of Spark execution. To understand about Spark DAG and its internal, refer our blog on Text Normalization with Spark – Part 2.

RDD implementation for Spark application Jobs is shown in the below diagram. Jobs view in Spark UI provides the high-level overview of Spark application statistics such as number of jobs, overall and individual job duration, number of stages, and total number of tasks.

Fire Service Analysis RDD Jobs Stats

RDD, DataFrame, Spark SQL, and Dataset implementation of the Spark Application Jobs statistics are as follows:


Note: The above statistics are based on the default Spark configuration for different Spark API implementation in our use case scenario and no tuning has been applied. The performance bottlenecks are identified using Stages view in Spark UI.

RDD implementation of Stages View

Fire Service Analysis RDD Stages Stats

DataFrame Implementation of Stages View

Fire Service Analysis DF Stages Stats

Spark SQL Implementation of Stages View

Fire Service Analysis DF SQL Stages Stats

Dataset Implementation of Stages View

Fire Service Analysis DS Stages Stats

Low-Level and High-Level API Outputs

The results for five questions in this use case with different Spark API implementations are same. But, the duration taken by different implementations are varied.

  • High-level API implementation of the application was completed and the results were provided in 1.8 and 1.9 minutes.
  • Low-level RDD API implementation of the application was completed in 22 minutes. Even with Kyro serialized way, the implementation of the application was completed in 21 minutes.

The reason for the time difference is caused due to Spark optimizers such as Catalyst and Tungsten when the Spark application was written using High-level API and not Low-level API.

Fire Service Call Output

Note: The results of these implementations and source codes has been uploaded into GitHub. Please look into the Reference section for the GitHub location and dataset link.

Identifying Performance Bottlenecks

To do performance tuning, identify the bottlenecks in the application. The following bottlenecks were identified during Spark application implementation of RDD, DataFrame, Spark SQL, and Dataset API:

Resource Planning (Executors, core and memory)

Balanced number of executors, core, and memory will significantly improve the performance without any code changes in the Spark application while running on YARN.

Degree of Parallelism – Partition Tuning (Avoid Small Partition Problems)

On considering the Stages view of both high-level and low-level APIs, bunch of tasks (200) were found at few stages. Dig deep into stages and look into those 200 tasks in the Event Timeline, tasks computation time will be very low when compared to scheduler delay. The thumb rule for partition size while running in YARN is ~ 128 MB.

Parallelism Bottelneck

Straggler Tasks (Long Running Tasks)

The straggler tasks can be identified in the Stages view and takes long time to complete. In this use case, the following are the straggler tasks that took longer time.

RDD Implementation Straggler Task

RDD Straggler Task

DataFrame Implementation Straggler Task

DF Straggler Task


In this blog, we have discussed about running Spark application on YARN with default configuration by implementing in high-level and low- level APIs. All the implementations were completed within the default resources allocated to the application for this use case. But, this may not be the case for all the use cases. Resource planning helps us to decide the balanced executors, cores, and memory planning.

The application written in high-level APIs are completed in less time when compared to low-level APIs. So, programming using high-level API is recommended using Spark. Bottlenecks were identified during both high-level and low-level API implementation.The above bottlenecks and performance tuning to eliminate those bottlenecks will be covered in our upcoming blog posts listed below:

After performance tuning and fixing bottleneck, the final time taken to complete the application in both high-level and low-level APIs are as shown in the below diagram:

Straggler Fix Output

High-level API implementation of the application was completed and the results were provided in 1.8 and 1.9 minutes. After performance tuning, the time was reduced to ~41 seconds. Low-level RDD API implementation of the application was completed in 22 minutes and even with Kyro serialized way the application was completed in 21 minutes. After performance tuning, the time was reduced to ~3 minutes.


12410 Views 5 Views Today