Description
https://issues.apache.org/jira/browse/SPARK-32748 was opened and then I believe mistakenly reverted to address this issue. The claim was local properties propagation in SubqueryBroadcastExec to the dynamic pruning thread is not necessary because they will be propagated by broadcast threads anyways. However, in a scenario where the dynamic pruning thread is first to initialize the broadcast relation future, the local properties will not be propagated correctly. This is because the local properties being propagated to the broadcast threads would already be incorrect.
I do not have a good way of reproducing this consistently because generally the SubqueryBroadcastExec is not the first to initialize the broadcast relation future, but by adding a Thread.sleep(1) into the doPrepare method of SubqueryBroadcastExec, the following test always fails.
withSQLConf(StaticSQLConf.SUBQUERY_BROADCAST_MAX_THREAD_THRESHOLD.key -> "1") { withTable("a", "b") { val confKey = "spark.sql.y" val confValue1 = UUID.randomUUID().toString() val confValue2 = UUID.randomUUID().toString() Seq((confValue1, "1")).toDF("key", "value") .write .format("parquet") .partitionBy("key") .mode("overwrite") .saveAsTable("a") val df1 = spark.table("a") def generateBroadcastDataFrame(confKey: String, confValue: String): Dataset[String] = { val df = spark.range(1).mapPartitions { _ => Iterator(TaskContext.get.getLocalProperty(confKey)) }.filter($"value".contains(confValue)).as("c") df.hint("broadcast") } // set local property and assert val df2 = generateBroadcastDataFrame(confKey, confValue1) spark.sparkContext.setLocalProperty(confKey, confValue1) val checkDF = df1.join(df2).where($"a.key" === $"c.value").select($"a.key", $"c.value") val checks = checkDF.collect() assert(checks.forall(_.toSeq == Seq(confValue1, confValue1))) // change local property and re-assert Seq((confValue2, "1")).toDF("key", "value") .write .format("parquet") .partitionBy("key") .mode("overwrite") .saveAsTable("b") val df3 = spark.table("b") val df4 = generateBroadcastDataFrame(confKey, confValue2) spark.sparkContext.setLocalProperty(confKey, confValue2) val checks2DF = df3.join(df4).where($"b.key" === $"c.value").select($"b.key", $"c.value") val checks2 = checks2DF.collect() assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2))) assert(checks2.nonEmpty) } }