Description
import org.apache.spark.sql.functions val left = List("0", "1", "2").toDS() .map{ k => ((k, 0), "l") } val right = List("0", "1", "2").toDS() .map{ k => ((k, 0), "r") } val result = left.toDF("k", "v").as[((String, Int), String)].alias("left") .joinWith(right.toDF("k", "v").as[((String, Int), String)].alias("right"), functions.col("left.k") === functions.col("right.k"), "inner") .as[(((String, Int), String), ((String, Int), String))]
When broadcast joins are enabled, we get the expected output:
(((0,0),l),((0,0),r)) (((1,0),l),((1,0),r)) (((2,0),l),((2,0),r))
However, when broadcast joins are disabled (i.e. setting spark.sql.autoBroadcastJoinThreshold to -1), the result is incorrect:
(((2,0),l),((2,-1),)) (((0,0),l),((0,-313907893),)) (((1,0),l),((null,-313907893),))