Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
Impala 4.1.2
-
None
-
None
-
ghx-label-12
Description
When using a window function, and performing a left join, filtering based on the field used by the window function, inconsistencies can occur during the pushdown process.
create table if not exists test.a( a string , b string ); insert into test.a values ('a', '1') , ('b', '2') ; create table if not exists test.b( a string , b string , c string ); insert into test.b values ('a', '1', '1') , ('b', '2', '2') , ('c', '3', '3') , ('c', '3', '4') , ('c', '3', '5') ; select * from (SELECT t2.b FROM test.a t1 LEFT JOIN (SELECT a , ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b -- , count(1) b FROM test.b) T2 ON t1.a=t2.a ) t where b = 10
The correct result should have no records at all. However, in version 4.1.2, the result that was executed contains 4 records with NULL values.
Here is the execution plan.
PLAN-ROOT SINK | 09:EXCHANGE [UNPARTITIONED] | 05:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED] | hash predicates: a = t1.a | row-size=44B cardinality=224 | |--08:EXCHANGE [HASH(t1.a)] | | | 00:SCAN HDFS [test.a t1] | HDFS partitions=1/1 files=2 size=1.06KB | row-size=12B cardinality=224 | 04:SELECT | predicates: row_number() = 10 <---- There seems to be a problem here. | row-size=32B cardinality=218 | 03:ANALYTIC | functions: row_number() | partition by: a | order by: b ASC | window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | row-size=32B cardinality=218 | 07:TOP-N | partition by: a | order by: b ASC | partition limit: 10 | row-size=24B cardinality=218 | 06:EXCHANGE [HASH(a)] | 02:TOP-N | partition by: a | order by: b ASC | partition limit: 10 | source expr: row_number() = CAST(10 AS BIGINT) | row-size=24B cardinality=218 | 01:SCAN HDFS [test.b] HDFS partitions=1/1 files=2 size=1.54KB row-size=24B cardinality=218
However, using the count aggregate function is OK.
select * from (SELECT t2.b FROM test.a t1 LEFT JOIN (SELECT a -- , ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) b , count(1) b FROM test.b group by 1) T2 ON t1.a=t2.a ) t where b = 10
PLAN-ROOT SINK | 07:EXCHANGE [UNPARTITIONED] | 03:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED] | hash predicates: a = t1.a | other predicates: count(*) = 10 <------------ It's OK here. | runtime filters: RF000 <- t1.a | row-size=32B cardinality=224 | |--06:EXCHANGE [HASH(t1.a)] | | | 00:SCAN HDFS [test.a t1] | HDFS partitions=1/1 files=2 size=1.06KB | row-size=12B cardinality=224 | 05:AGGREGATE [FINALIZE] | output: count:merge(*) | group by: a | having: count(*) = 10 | row-size=20B cardinality=218 | 04:EXCHANGE [HASH(a)] | 02:AGGREGATE [STREAMING] | output: count(*) | group by: a | row-size=20B cardinality=218 | 01:SCAN HDFS [test.b] HDFS partitions=1/1 files=2 size=1.54KB runtime filters: RF000 -> test.b.a row-size=12B cardinality=218