Details
Description
There is a period in time where an InMemoryRelation will have the cached buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> size in bytes reported by accumulators). When AQE is enabled, it is possible that join planning strategies will happen in this window. In this scenario, join children sizes including InMemoryRelation are greatly underestimated and a broadcast join can be planned when it shouldn't be. We have seen scenarios where a broadcast join is planned with the builder size greater than 8GB because at planning time, the optimizer believes the InMemoryRelation is 0 bytes.
Here is an example test case where the broadcast threshold is being ignored. It can mimic the 8GB error by increasing the size of the tables.
withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") { // Spark estimates a string column as 20 bytes so with 60k rows, these relations should be // estimated at ~120m bytes which is greater than the broadcast join threshold Seq.fill(60000)("a").toDF("key") .createOrReplaceTempView("temp") Seq.fill(60000)("b").toDF("key") .createOrReplaceTempView("temp2") Seq("a").toDF("key").createOrReplaceTempView("smallTemp") spark.sql("SELECT key as newKey FROM temp").persist() val query = s""" |SELECT t3.newKey |FROM | (SELECT t1.newKey | FROM (SELECT key as newKey FROM temp) as t1 | JOIN | (SELECT key FROM smallTemp) as t2 | ON t1.newKey = t2.key | ) as t3 | JOIN | (SELECT key FROM temp2) as t4 | ON t3.newKey = t4.key |UNION |SELECT t1.newKey |FROM | (SELECT key as newKey FROM temp) as t1 | JOIN | (SELECT key FROM temp2) as t2 | ON t1.newKey = t2.key |""".stripMargin val df = spark.sql(query) df.collect() val adaptivePlan = df.queryExecution.executedPlan val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.length == 1)