Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-5850

Partitioned hash join inside union may return wrong results

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • Impala 2.3.0, Impala 2.5.0, Impala 2.4.0, Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, Impala 2.10.0
    • Impala 2.10.0
    • Frontend

    Description

      Impala may return wrong results for plans that have a partitioned join inside a union.

      Affected queries

      • plan has a partitioned join inside a union
      • tables must have stats - otherwise a partitioned join would not be chosen
      • for at least one equi-join condition, the left-hand side and right-hand side join keys have different types

      Reproduction
      Setup:

      create table a (id int);
      insert into a values (1),(2),(3),(4);
      insert into a values (5),(6),(7),(8);
      compute stats a;
      
      create table b (id bigint);
      insert into b values (1),(2),(3),(4);
      insert into b values (5),(6),(7),(8);
      compute stats b;
      

      Query that returns correct results:

      select v.id from
      (select distinct id from a) v join b
       on v.id = b.id
      
      +----+
      | id |
      +----+
      | 1  |
      | 2  |
      | 3  |
      | 4  |
      | 5  |
      | 6  |
      | 7  |
      | 8  |
      +----+
      Fetched 8 row(s) in 0.20s
      

      Query that returns wrong results:

      select null from a limit 0
      union all
      select v.id from
      (select distinct id from a) v join b
       on v.id = b.id
      
      +------+
      | null |
      +------+
      | 3    |
      | 5    |
      | 6    |
      | 7    |
      | 8    |
      +------+
      Fetched 5 row(s) in 0.12s
      
      Plan:
      +--------------------------------------------------+
      | Explain String                                   |
      +--------------------------------------------------+
      | Max Per-Host Resource Reservation: Memory=3.88MB |
      | Per-Host Resource Estimates: Memory=85.94MB      |
      | Codegen disabled by planner                      |
      |                                                  |
      | PLAN-ROOT SINK                                   |
      | |                                                |
      | 08:EXCHANGE [UNPARTITIONED]                      |
      | |                                                |
      | 00:UNION                                         |
      | |                                                |
      | 04:HASH JOIN [INNER JOIN, PARTITIONED]      <--- Partitioned join inside union     |
      | |  hash predicates: b.id = id                    |
      | |  runtime filters: RF000 <- id                  |
      | |                                                |
      | |--06:AGGREGATE [FINALIZE]                       |
      | |  |  group by: id                               |
      | |  |                                             |
      | |  05:EXCHANGE [HASH(id)]                        |
      | |  |                                             |
      | |  02:AGGREGATE [STREAMING]                      |
      | |  |  group by: id                               |
      | |  |                                             |
      | |  01:SCAN HDFS [default.a]                      |
      | |     partitions=1/1 files=2 size=16B            |
      | |                                                |
      | 07:EXCHANGE [HASH(b.id)]                         |
      | |                                                |
      | 03:SCAN HDFS [default.b]                         |
      |    partitions=1/1 files=2 size=16B               |
      |    runtime filters: RF000 -> b.id                |
      +--------------------------------------------------+
      

      Analysis
      The bug is a missing implicit cast in the EXCHANGE 05. The id should be cast to BIGINT to be consistent with the left input of the join.
      We already have code to properly cast partition expressions in exchanges, but the code incorrectly assumes that we only need to do so for hash-partitioned fragments. The problem is that the UNION makes the fragment RANDOM partitioned (because the union children could be arbitrarily partitioned there is no guarantee on which partition is produced by the fragment).
      The buggy code is in PlanFragment#finalizeExchanges():

      public void finalizeExchanges(Analyzer analyzer)
            throws InternalException, NotImplementedException {
          if (destNode_ != null) {
            Preconditions.checkState(sink_ == null);
            // we're streaming to an exchange node
            DataStreamSink streamSink = new DataStreamSink(destNode_, outputPartition_);
            streamSink.setFragment(this);
            sink_ = streamSink;
          }
      
          if (!dataPartition_.isHashPartitioned()) return; <--- Problem here
      ...
         The following code adds casts to exchanges
      

      Workaround

      • Use the broadcast and straight_join hints to force the join to use a broadcast distribution strategy
      • Reformulate the query to avoid the join inside a union
      • Write the join result into a separate table and use that table in the original query instead of

      Attachments

        Activity

          People

            alex.behm Alexander Behm
            alex.behm Alexander Behm
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: