Description
code to reproduce this bug:
// force to plan sort merge join spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0") val df = Seq(1 -> "a").toDF("i", "j") val df1 = df.as("t1") val df2 = df.as("t2") assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)