Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.4.0
-
None
Description
If we do a large number of broadcast joins while holding onto the Dataset reference, it will hold onto a large amount of memory for the value of the broadcast object. The broadcast object is also held in the MemoryStore, but that will clean itself up to prevent its memory usage from going over a certain level. In my use case, I don't want to release the reference to the Dataset (which would allow the broadcast object to be GCed) because I want to be able to unpersist it at some point in the future (when it is no longer relevant).
See the following repro in Spark shell:
import org.apache.spark.sql.functions._ import org.apache.spark.SparkEnv val startDf = (1 to 1000000).toDF("num").withColumn("num", $"num".cast("string")).cache() val leftDf = startDf.withColumn("num", concat($"num", lit("0"))) val rightDf = startDf.withColumn("num", concat($"num", lit("1"))) val broadcastJoinedDf = leftDf.join(broadcast(rightDf), leftDf.col("num").eqNullSafe(rightDf.col("num"))) broadcastJoinedDf.count // Take a heap dump, see UnsafeHashedRelation with hard references in MemoryStore and Dataset // Force the MemoryStore to clear itself SparkEnv.get.blockManager.stop // Trigger GC, then take another Heap Dump. The UnsafeHashedRelation is now referenced only by the Dataset.
If we make the TorrentBroadcast hold a weak reference to the broadcast object, the second heap dump will show nothing; the UnsafeHashedRelation has been GCed.
Given that the broadcast object can be reloaded from the MemoryStore, it seems like it would be alright to use a WeakReference instead.