Details
-
Sub-task
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
3.4.0, 3.4.1
Description
Adding the following test case in KeyGroupedPartitionSuite demonstrates the problem.
test("test join key is the second partition key and a transform") { val items_partitions = Array(bucket(8, "id"), days("arrive_time")) createTable(items, items_schema, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(bucket(8, "item_id"), days("time")) createTable(purchases, purchases_schema, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + s"(1, 45.0, cast('2020-01-15' as timestamp)), " + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + s"(3, 19.5, cast('2020-02-01' as timestamp))") withSQLConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "true") { val df = sql("SELECT id, name, i.price as purchase_price, " + "p.item_id, p.price as sale_price " + s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + "ON i.arrive_time = p.time " + "ORDER BY id, purchase_price, p.item_id, sale_price") val shuffles = collectShuffles(df.queryExecution.executedPlan) assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys are partition keys") checkAnswer(df, Seq( Row(1, "aa", 40.0, 1, 42.0), Row(1, "aa", 40.0, 2, 11.0), Row(1, "aa", 41.0, 1, 44.0), Row(1, "aa", 41.0, 1, 45.0), Row(2, "bb", 10.0, 1, 42.0), Row(2, "bb", 10.0, 2, 11.0), Row(2, "bb", 10.5, 1, 42.0), Row(2, "bb", 10.5, 2, 11.0), Row(3, "cc", 15.5, 3, 19.5) ) ) } }
Note: this tests has setup the datasourceV2 to return multiple splits for same partition.
In this case, SPJ is not triggered (because join key does not match partition key), but the following code in DSV2Scan:
intended to fill the empty partition for 'pushdown-vallue' will still iterate through non-grouped partition and lookup from grouped partition to fill the map, resulting in some duplicate input data fed into the join.