Details
-
Bug
-
Status: In Progress
-
Major
-
Resolution: Unresolved
-
2.4.7, 3.0.1, 3.1.0
-
None
-
None
Description
We use Spark + Delta Lake, recently we find our Spark driver faced full GC problem (very heavy) when users submit a MERGE INTO query. The driver held over 100GB memory (depends on how much the max heap size set) and can not be GC forever. By making a heap dump we found the root cause.
From above heap dump, Delta uses a SetAccumulator to records touched files names
// Accumulator to collect all the distinct touched files val touchedFilesAccum = new SetAccumulator[String]() spark.sparkContext.register(touchedFilesAccum, TOUCHED_FILES_ACCUM_NAME) // UDFs to records touched files names and add them to the accumulator val recordTouchedFileName = udf { (fileName: String) => { touchedFilesAccum.add(fileName) 1 }}.asNondeterministic()
In a big query, each task may hold thousands of file names, and if a stage contains dozens of thousands of tasks, DAGscheduler may hold millions of `CompletionEvent`. And each `CompletionEvent` holds the thousands of file names in its `accumUpdates`. All accumulator objects will use Spark listener event to deliver to the event loop and even a full GC can not release memory.
A PR will be submitted. With the patch, the memory problem was gone.
Before the patch: A full GC doesn't help.
After the patch: No full GC and memory is not ramp up.