Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
Impala 2.3.0, Impala 2.5.0, Impala 2.4.0, Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, Impala 2.10.0
-
ghx-label-1
Description
Impala may return wrong results for plans that have a partitioned join inside a union.
Affected queries
- plan has a partitioned join inside a union
- tables must have stats - otherwise a partitioned join would not be chosen
- for at least one equi-join condition, the left-hand side and right-hand side join keys have different types
Reproduction
Setup:
create table a (id int);
insert into a values (1),(2),(3),(4);
insert into a values (5),(6),(7),(8);
compute stats a;
create table b (id bigint);
insert into b values (1),(2),(3),(4);
insert into b values (5),(6),(7),(8);
compute stats b;
Query that returns correct results:
select v.id from (select distinct id from a) v join b on v.id = b.id +----+ | id | +----+ | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 | | 8 | +----+ Fetched 8 row(s) in 0.20s
Query that returns wrong results:
select null from a limit 0 union all select v.id from (select distinct id from a) v join b on v.id = b.id +------+ | null | +------+ | 3 | | 5 | | 6 | | 7 | | 8 | +------+ Fetched 5 row(s) in 0.12s Plan: +--------------------------------------------------+ | Explain String | +--------------------------------------------------+ | Max Per-Host Resource Reservation: Memory=3.88MB | | Per-Host Resource Estimates: Memory=85.94MB | | Codegen disabled by planner | | | | PLAN-ROOT SINK | | | | | 08:EXCHANGE [UNPARTITIONED] | | | | | 00:UNION | | | | | 04:HASH JOIN [INNER JOIN, PARTITIONED] <--- Partitioned join inside union | | | hash predicates: b.id = id | | | runtime filters: RF000 <- id | | | | | |--06:AGGREGATE [FINALIZE] | | | | group by: id | | | | | | | 05:EXCHANGE [HASH(id)] | | | | | | | 02:AGGREGATE [STREAMING] | | | | group by: id | | | | | | | 01:SCAN HDFS [default.a] | | | partitions=1/1 files=2 size=16B | | | | | 07:EXCHANGE [HASH(b.id)] | | | | | 03:SCAN HDFS [default.b] | | partitions=1/1 files=2 size=16B | | runtime filters: RF000 -> b.id | +--------------------------------------------------+
Analysis
The bug is a missing implicit cast in the EXCHANGE 05. The id should be cast to BIGINT to be consistent with the left input of the join.
We already have code to properly cast partition expressions in exchanges, but the code incorrectly assumes that we only need to do so for hash-partitioned fragments. The problem is that the UNION makes the fragment RANDOM partitioned (because the union children could be arbitrarily partitioned there is no guarantee on which partition is produced by the fragment).
The buggy code is in PlanFragment#finalizeExchanges():
public void finalizeExchanges(Analyzer analyzer) throws InternalException, NotImplementedException { if (destNode_ != null) { Preconditions.checkState(sink_ == null); // we're streaming to an exchange node DataStreamSink streamSink = new DataStreamSink(destNode_, outputPartition_); streamSink.setFragment(this); sink_ = streamSink; } if (!dataPartition_.isHashPartitioned()) return; <--- Problem here ... The following code adds casts to exchanges
Workaround
- Use the broadcast and straight_join hints to force the join to use a broadcast distribution strategy
- Reformulate the query to avoid the join inside a union
- Write the join result into a separate table and use that table in the original query instead of