When resources happen to be constrained on an executor the first time a broadcast variable is instantiated it is persisted to disk by the BlockManager. Consequently, every subsequent call to TorrentBroadcast::readBroadcastBlock from other instances of that broadcast variable spawns another instance of the underlying value. That is, broadcast variables are spawned once per executor unless memory is constrained, in which case every instance of a broadcast variable is provided with a unique copy of the underlying value.
The fix I propose is to explicitly cache the underlying values using weak references (in a ReferenceMap) - note, however, that I couldn't find a clean approach to creating the cache container here. I added that to BroadcastManager as a package-private field for want of a better solution, however if something more appropriate already exists in the project for that purpose please let me know.
The above issue was terminating our team's applications erratically - effectively, we were distributing roughly 1 GiB of data through a broadcast variable and under certain conditions memory was constrained the first time the broadcast variable was loaded on an executor. As such, the executor attempted to spawn several additional copies of the broadcast variable (we were using 8 worker threads on the executor) which quickly led to the task failing as the result of an OOM exception.