Dataset.joinWith is performing a BroadcastJoin on a table that is gigabytes in size due to the dataset.logicalPlan.statistics.sizeInBytes < 0.
The issue is that org.apache.spark.sql.types.ArrayType.defaultSize is of datatype Int. In my dataset, there is an Array column whose data size exceeds the limits of an Int and so the data size becomes negative.
The issue can be repeated by running this code in REPL:
val ds = (0 to 10000).map( i => (i, Seq((i, Seq((i, "This is really not that long of a string")))))).toDS()
// You might have to remove private[sql] from Dataset.logicalPlan to get this to work
val stats = ds.logicalPlan.statistics
stats: org.apache.spark.sql.catalyst.plans.logical.Statistics = Statistics(-1890686892,false)
This causes joinWith to performWith to perform a broadcast join even tho my data is gigabytes in size, which of course causes the executors to run out of memory.
Setting spark.sql.autoBroadcastJoinThreshold=-1 does not help because the logicalPlan.statistics.sizeInBytes is a large negative number and thus it is less than the join threshold of -1.
I've been able to work around this issue by setting autoBroadcastJoinThreshold to a very large negative number.