Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-3167

Incorrect assignment of WHERE clause predicate through a grouping aggregation + outer join.

    Details

      Description

      Thanks to sobrien05 from the user list for reporting this!
      Original post:
      http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Predict-push-down-bug/m-p/38516

      Repro tables and data:

      create table orders(order_id int, address_id int);
      insert into table orders values((1, 10), (2, 20), (3, 30));
      create table addresses(address_id int, state string);
      insert into table addresses values((10, "CA"), (20, "NY"), (30, "CA"));
      

      Repro query and bad plan:

      Select *
      FROM (
          Select
            shipping_addr.state,
            count(order_id) as num_orders
          FROM
          (
              Select
                order_id,
                address_id
              FROM orders
          ) order_data
          LEFT OUTER JOIN [shuffle]
          (
              Select address_id,
                    state
              FROM addresses
          ) shipping_addr
          ON order_data.address_id = shipping_addr.address_id
          GROUP BY
            shipping_addr.state
      ) unf
      WHERE
         lower(state) = lower('NY')
      
      +------------------------------------------------------------------------------------+
      | Explain String                                                                     |
      +------------------------------------------------------------------------------------+
      | Estimated Per-Host Requirements: Memory=2.16GB VCores=2                            |
      | WARNING: The following tables are missing relevant table and/or column statistics. |
      | debug.addresses, debug.orders                                                      |
      |                                                                                    |
      | 08:EXCHANGE [UNPARTITIONED]                                                        |
      | |                                                                                  |
      | 07:AGGREGATE [FINALIZE]                                                            |
      | |  output: count:merge(order_id)                                                   |
      | |  group by: shipping_addr.state                                                   |
      | |                                                                                  |
      | 06:EXCHANGE [HASH(shipping_addr.state)]                                            |
      | |                                                                                  |
      | 03:AGGREGATE                                                                       |
      | |  output: count(order_id)                                                         |
      | |  group by: state                                                                 |
      | |                                                                                  |
      | 02:HASH JOIN [LEFT OUTER JOIN, PARTITIONED]                                        |
      | |  hash predicates: address_id = address_id                                        |
      | |                                                                                  |
      | |--05:EXCHANGE [HASH(address_id)]                                                  |
      | |  |                                                                               |
      | |  01:SCAN HDFS [debug.addresses]                                                  |
      | |     partitions=1/1 files=1 size=18B                                              |
      | |     predicates: lower(debug.addresses.state) = lower('NY')       <-- should also be assigned at node 02                |
      | |                                                                                  |
      | 04:EXCHANGE [HASH(address_id)]                                                     |
      | |                                                                                  |
      | 00:SCAN HDFS [debug.orders]                                                        |
      |    partitions=1/1 files=1 size=15B                                                 |
      +------------------------------------------------------------------------------------+
      

      Workaround
      Use an INNER JOIN instead of a LEFT OUTER JOIN. The query still has the same meaning.

        Activity

        Hide
        srus Silvius Rus added a comment -

        While this is a correctness issue, it is not considered a release blocker due to reasonably low likelihood. Also, it has a workaround. It remains a critical issue though.

        Show
        srus Silvius Rus added a comment - While this is a correctness issue, it is not considered a release blocker due to reasonably low likelihood. Also, it has a workaround. It remains a critical issue though.
        Hide
        alex.behm Alexander Behm added a comment -

        Smaller repro:

        create table impala_error_a (stream string, total int);
        insert into table impala_error_a values ('test 1', 230), ('test 2', 400), ('test 1', 300);
        create table impala_error_b (stream string, description string);
        insert into table impala_error_b ('test 1', 'Hockey'), ('test 2', 'Hand Ball');
        
        SELECT v.d,
               v.s
        FROM   (SELECT description d,
                       Sum(total)  s
                FROM   impala_error_a a
                       LEFT OUTER JOIN (SELECT b.stream,
                                               b.description
                                        FROM   impala_error_b b) bb
                                    ON a.stream = bb.stream
                GROUP  BY d) v
        WHERE  v.d = 'Hand Ball';
        
        Show
        alex.behm Alexander Behm added a comment - Smaller repro: create table impala_error_a (stream string, total int ); insert into table impala_error_a values ('test 1', 230), ('test 2', 400), ('test 1', 300); create table impala_error_b (stream string, description string); insert into table impala_error_b ('test 1', 'Hockey'), ('test 2', 'Hand Ball'); SELECT v.d, v.s FROM ( SELECT description d, Sum(total) s FROM impala_error_a a LEFT OUTER JOIN ( SELECT b.stream, b.description FROM impala_error_b b) bb ON a.stream = bb.stream GROUP BY d) v WHERE v.d = 'Hand Ball';
        Hide
        alex.behm Alexander Behm added a comment -

        commit f8377543778b654336c978a4bb97efa3c1847441
        Author: Alex Behm <alex.behm@cloudera.com>
        Date: Fri Nov 4 10:41:25 2016 -0700

        IMPALA-3167: Fix assignment of WHERE conjunct through grouping agg + OJ.

        Background: We generally allow the assignment of predicates below the
        nullable side of a left/right outer join, explained as follows using an
        example:

        SELECT * FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.id
        WHERE t2.int_col < 10

        The scan of 't2' picks up 't2.int_col < 10' via
        Analyzer.getBoundPredicates() and recognizes that the predicate must
        also be evaluated by a join later, so the predicate is not marked as
        assigned. The join then picks up the unassigned predicate via
        Analyzer.getUnassignedConjuncts().

        The bug was that our logic for detecting whether a bound predicate must
        also be evaluated at a join node was flawed because it only considered
        whether the tuples of the source or destination predicate were outer
        joined (plus other conditions).
        The underlying assumption is that either the source or destination tuple
        are bound by a tuple produced by a TableRef, but in the buggy query the
        source predicate is bound by an aggregation tuple, so we incorrectly
        marked the bound predicate as assigned in Analyzer.getBoundPredicates().

        The fix is to conservatively not mark bound predicates as assigned if
        the slots referenced by the predicate have equivalent slots that
        belong to an outer-joined tuple. As a result, a plan node may pick up
        the same predicate multiple times, once via
        Analyzer.getBoundPredicates() and another time via
        Analyzer.getUnassignedConjuncts(). Those are deduped now.

        The following example explains the duplicate predicate assignment:

        SELECT * FROM (SELECT * FROM t t1) a LEFT OUTER JOIN t b ON a.id = b.id
        WHERE a.id < 10

        1. The predicate 'a.id < 10' gets migrated into the inline view.
        'a.id < 10' is marked as assigned but is still registered as
        a single-tid conjunct in the Analyzer for potential propagation
        2. The scan node of 't1' calls Analyzer.getBoundPredicates() and
        generates 't1.id < 10' based on the source predicate 'a.id < 10'.
        3. The scan node of 't1' picks up the migrated conjunct 't1.id < 10'
        via Analyzer.getUnassignedConjuncts().

        Change-Id: I774d13a13ad1e8fe82512df98dc29983bdd232eb
        Reviewed-on: http://gerrit.cloudera.org:8080/4960
        Reviewed-by: Alex Behm <alex.behm@cloudera.com>
        Tested-by: Internal Jenkins

        Show
        alex.behm Alexander Behm added a comment - commit f8377543778b654336c978a4bb97efa3c1847441 Author: Alex Behm <alex.behm@cloudera.com> Date: Fri Nov 4 10:41:25 2016 -0700 IMPALA-3167 : Fix assignment of WHERE conjunct through grouping agg + OJ. Background: We generally allow the assignment of predicates below the nullable side of a left/right outer join, explained as follows using an example: SELECT * FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.id WHERE t2.int_col < 10 The scan of 't2' picks up 't2.int_col < 10' via Analyzer.getBoundPredicates() and recognizes that the predicate must also be evaluated by a join later, so the predicate is not marked as assigned. The join then picks up the unassigned predicate via Analyzer.getUnassignedConjuncts(). The bug was that our logic for detecting whether a bound predicate must also be evaluated at a join node was flawed because it only considered whether the tuples of the source or destination predicate were outer joined (plus other conditions). The underlying assumption is that either the source or destination tuple are bound by a tuple produced by a TableRef, but in the buggy query the source predicate is bound by an aggregation tuple, so we incorrectly marked the bound predicate as assigned in Analyzer.getBoundPredicates(). The fix is to conservatively not mark bound predicates as assigned if the slots referenced by the predicate have equivalent slots that belong to an outer-joined tuple. As a result, a plan node may pick up the same predicate multiple times, once via Analyzer.getBoundPredicates() and another time via Analyzer.getUnassignedConjuncts(). Those are deduped now. The following example explains the duplicate predicate assignment: SELECT * FROM (SELECT * FROM t t1) a LEFT OUTER JOIN t b ON a.id = b.id WHERE a.id < 10 1. The predicate 'a.id < 10' gets migrated into the inline view. 'a.id < 10' is marked as assigned but is still registered as a single-tid conjunct in the Analyzer for potential propagation 2. The scan node of 't1' calls Analyzer.getBoundPredicates() and generates 't1.id < 10' based on the source predicate 'a.id < 10'. 3. The scan node of 't1' picks up the migrated conjunct 't1.id < 10' via Analyzer.getUnassignedConjuncts(). Change-Id: I774d13a13ad1e8fe82512df98dc29983bdd232eb Reviewed-on: http://gerrit.cloudera.org:8080/4960 Reviewed-by: Alex Behm <alex.behm@cloudera.com> Tested-by: Internal Jenkins

          People

          • Assignee:
            alex.behm Alexander Behm
            Reporter:
            alex.behm Alexander Behm
          • Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development