At LinkedIn, we have multiple clusters, running thousands of Spark applications, and these numbers are growing rapidly. We need to ensure that these Spark applications are well tuned – cluster resources, including memory, should be used efficiently so that the cluster can support running more applications concurrently, and applications should run quickly and reliably.
Currently there is limited visibility into how much memory executors are using, and users are guessing numbers for executor and driver memory sizing. These estimates are often much larger than needed, leading to memory wastage. Examining the metrics for one cluster for a month, the average percentage of used executor memory (max JVM used memory across executors / spark.executor.memory) is 35%, leading to an average of 591GB unused memory per application (number of executors * (spark.executor.memory - max JVM used memory)). Spark has multiple memory regions (user memory, execution memory, storage memory, and overhead memory), and to understand how memory is being used and fine-tune allocation between regions, it would be useful to have information about how much memory is being used for the different regions.
To improve visibility into memory usage for the driver and executors and different memory regions, the following additional memory metrics can be be tracked for each executor and driver:
- JVM used memory: the JVM heap size for the executor/driver.
- Execution memory: memory used for computation in shuffles, joins, sorts and aggregations.
- Storage memory: memory used caching and propagating internal data across the cluster.
- Unified memory: sum of execution and storage memory.
The peak values for each memory metric can be tracked for each executor, and also per stage. This information can be shown in the Spark UI and the REST APIs. Information for peak JVM used memory can help with determining appropriate values for spark.executor.memory and spark.driver.memory, and information about the unified memory region can help with determining appropriate values for spark.memory.fraction and spark.memory.storageFraction. Stage memory information can help identify which stages are most memory intensive, and users can look into the relevant code to determine if it can be optimized.
The memory metrics can be gathered by adding the current JVM used memory, execution memory and storage memory to the heartbeat. SparkListeners are modified to collect the new metrics for the executors, stages and Spark history log. Only interesting values (peak values per stage per executor) are recorded in the Spark history log, to minimize the amount of additional logging.
We have attached our design documentation with this ticket and would like to receive feedback from the community for this proposal.