Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-5251

DecimalAvgFinalize() gets the wrong arg type

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: Impala 2.8.0
    • Fix Version/s: Impala 2.9.0
    • Component/s: Frontend
    • Labels:

      Description

      We mean to pass in the agg fn's logical input type to the UDAs, but appear to be passing in the physical input type (i.e. intermediate type) in this case.

      Steps to repro:

      use tpcds;
      select avg(ss_list_price) B1_LP
                  ,count(ss_list_price) B1_CNT
                  ,count(distinct ss_list_price) B1_CNTD
            from store_sales
            where ss_quantity between 0 and 5
              and (ss_list_price between 8 and 8+10
                   or ss_coupon_amt between 459 and 459+1000
                   or ss_wholesale_cost between 57 and 57+20);
      

      Results in this assert:

      #3  0x00007fd78a7fdca2 in __GI___assert_fail (assertion=0x2edc118 "arg_types[i].type == FunctionContext::TYPE_DECIMAL", file=0x2edbb80 "/home/dhecht/src/Impala/be/src/udf/udf.cc", line=592,
          function=0x2edc7c0 <impala::FunctionContextImpl::GetConstFnAttr(impala::RuntimeState const*, impala_udf::FunctionContext::TypeDesc const&, std::vector<impala_udf::FunctionContext::TypeDesc, std::alloc
      ator<impala_udf::FunctionContext::TypeDesc> > const&, impala::FunctionContextImpl::ConstFnAttr, int)::__PRETTY_FUNCTION__> "static int impala::FunctionContextImpl::GetConstFnAttr(const impala::RuntimeStat
      e*, const impala_udf::FunctionContext::TypeDesc&, const std::vector<impala_udf::FunctionContext::TypeDesc>&, impala::Fun"...) at assert.c:101
      #4  0x0000000001a98aa1 in impala::FunctionContextImpl::GetConstFnAttr (state=0xb3bdc00, return_type=..., arg_types=std::vector of length 1, capacity 1 = {...},
          t=impala::FunctionContextImpl::ARG_TYPE_SCALE, i=0) at /home/dhecht/src/Impala/be/src/udf/udf.cc:592
      #5  0x0000000001a987a2 in impala::FunctionContextImpl::GetConstFnAttr (this=0x6984640, t=impala::FunctionContextImpl::ARG_TYPE_SCALE, i=0) at /home/dhecht/src/Impala/be/src/udf/udf.cc:561
      #6  0x00000000018a88a0 in impala::AggregateFunctions::DecimalAvgGetValue (ctx=0x4944d78, src=...) at /home/dhecht/src/Impala/be/src/exprs/aggregate-functions-ir.cc:445
      #7  0x00000000018a89f2 in impala::AggregateFunctions::DecimalAvgFinalize (ctx=0x4944d78, src=...) at /home/dhecht/src/Impala/be/src/exprs/aggregate-functions-ir.cc:462
      #8  0x00000000018a3d5a in impala::AggFnEvaluator::SerializeOrFinalize (this=0xcf2f000, agg_fn_ctx=0x4944d78, src=0xb11d000, dst_slot_desc=0xce0be60, dst=0xd02a000,
          fn=0x18a89aa <impala::AggregateFunctions::DecimalAvgFinalize(impala_udf::FunctionContext*, impala_udf::StringVal const&)>) at /home/dhecht/src/Impala/be/src/exprs/agg-fn-evaluator.cc:499
      #9  0x00000000017b2b3e in impala::AggFnEvaluator::Finalize (this=0xcf2f000, agg_fn_ctx=0x4944d78, src=0xb11d000, dst=0xd02a000) at /home/dhecht/src/Impala/be/src/exprs/agg-fn-evaluator.h:266
      #10 0x00000000017b2d76 in impala::AggFnEvaluator::Finalize (evaluators=std::vector of length 3, capacity 4 = {...}, fn_ctxs=std::vector of length 3, capacity 4 = {...}, src=0xb11d000, dst=0xd02a000)
          at /home/dhecht/src/Impala/be/src/exprs/agg-fn-evaluator.h:312
      #11 0x00000000017a8976 in impala::PartitionedAggregationNode::GetOutputTuple (this=0xd714780, agg_fn_ctxs=std::vector of length 3, capacity 4 = {...}, tuple=0xb11d000, pool=0xca844a8)
          at /home/dhecht/src/Impala/be/src/exec/partitioned-aggregation-node.cc:1109
      #12 0x00000000017a2866 in impala::PartitionedAggregationNode::GetSingletonOutput (this=0xd714780, row_batch=0xca84480) at /home/dhecht/src/Impala/be/src/exec/partitioned-aggregation-node.cc:466
      #13 0x00000000017a2186 in impala::PartitionedAggregationNode::GetNextInternal (this=0xd714780, state=0xb3bdc00, row_batch=0xca84480, eos=0x7fd7000b9ebf)
          at /home/dhecht/src/Impala/be/src/exec/partitioned-aggregation-node.cc:442
      #14 0x00000000017a170b in impala::PartitionedAggregationNode::GetNext (this=0xd714780, state=0xb3bdc00, row_batch=0xca84480, eos=0x7fd7000b9ebf)
          at /home/dhecht/src/Impala/be/src/exec/partitioned-aggregation-node.cc:376
      #15 0x0000000001a738b8 in impala::PlanFragmentExecutor::ExecInternal (this=0xb3b89d0) at /home/dhecht/src/Impala/be/src/runtime/plan-fragment-executor.cc:353
      #16 0x0000000001a735f6 in impala::PlanFragmentExecutor::Exec (this=0xb3b89d0) at /home/dhecht/src/Impala/be/src/runtime/plan-fragment-executor.cc:337
      #17 0x0000000001a6cd95 in impala::FragmentInstanceState::Exec (this=0xb3b8700) at /home/dhecht/src/Impala/be/src/runtime/fragment-instance-state.cc:68
      #18 0x0000000001a78367 in impala::QueryExecMgr::ExecFInstance (this=0xbc6ad20, fis=0xb3b8700) at /home/dhecht/src/Impala/be/src/runtime/query-exec-mgr.cc:110
      

        Issue Links

          Activity

          Hide
          kwho Michael Ho added a comment -

          Taking a look now. It appears that the FunctionContext wasn't set up correctly to have the logical type as Dan said.

          (gdb) pvector arg_types
          elem[0]: $1 = {
            type = impala_udf::FunctionContext::TYPE_STRING,
            precision = 32537,
            scale = -347404976,
            len = 32537
          }
          Vector size = 1
          Vector capacity = 1
          Element type = std::_Vector_base<impala_udf::FunctionContext::TypeDesc, std::allocator<impala_udf::FunctionContext::TypeDesc> >::pointer
          
          Show
          kwho Michael Ho added a comment - Taking a look now. It appears that the FunctionContext wasn't set up correctly to have the logical type as Dan said. (gdb) pvector arg_types elem[0]: $1 = { type = impala_udf::FunctionContext::TYPE_STRING, precision = 32537, scale = -347404976, len = 32537 } Vector size = 1 Vector capacity = 1 Element type = std::_Vector_base<impala_udf::FunctionContext::TypeDesc, std::allocator<impala_udf::FunctionContext::TypeDesc> >::pointer
          Hide
          kwho Michael Ho added a comment - - edited

          This appears to be a frontend issue. In particular, when there is any distinct aggregation in the select list, two phase aggregation will be generated as part of the plan. The problem is that the aggregate expressions generated for the second phase aggregation uses the intermediate result type of the first phase aggregation as input argument and we didn't preserve the types of the original arguments of the aggregate function anywhere when generating the merging aggregation of the second phase. This resulted in propagating the wrong input types to the second phase merging aggregation. Confirmed this theory with some logging output:

              // map all the remaining agg fns
              for (int i = 0; i < aggregateExprs_.size(); ++i) {
                FunctionCallExpr inputExpr = aggregateExprs_.get(i);
                Preconditions.checkState(inputExpr.isAggregateFunction());
                // we're aggregating an intermediate slot of the 1st agg phase
                Expr aggExprParam =
                    new SlotRef(inputDesc.getSlots().get(i + getGroupingExprs().size()));
                LOG.info("XXX second phase: " + inputExpr.getFnName() + " type: " + aggExprParam.getType());
                FunctionCallExpr aggExpr = FunctionCallExpr.createMergeAggCall(
                    inputExpr, Lists.newArrayList(aggExprParam));
                secondPhaseAggExprs.add(aggExpr);
              }
              Preconditions.checkState(
                  secondPhaseAggExprs.size() == aggregateExprs_.size() + distinctAggExprs.size());
          
              for (FunctionCallExpr aggExpr: secondPhaseAggExprs) {
                aggExpr.analyzeNoThrow(analyzer);
                Preconditions.checkState(aggExpr.isAggregateFunction());
              }
          
              ArrayList<Expr> substGroupingExprs =
                  Expr.substituteList(origGroupingExprs, intermediateTupleSmap_, analyzer, false);
              secondPhaseDistinctAggInfo_ =
                  new AggregateInfo(substGroupingExprs, secondPhaseAggExprs, AggPhase.SECOND);
              secondPhaseDistinctAggInfo_.createTupleDescs(analyzer);
              secondPhaseDistinctAggInfo_.createSecondPhaseAggSMap(this, distinctAggExprs);
              secondPhaseDistinctAggInfo_.createMergeAggInfo(analyzer);
            }
          

          Debugging output:

          I0424 13:55:46.040596 26589 FunctionCallExpr.java:84] XXX mergeAgg avg
          I0424 13:55:46.040648 26589 FunctionCallExpr.java:87] 0: DECIMAL(7,2)
          I0424 13:55:46.040740 26589 FunctionCallExpr.java:84] XXX mergeAgg count
          I0424 13:55:46.040779 26589 FunctionCallExpr.java:87] 0: DECIMAL(7,2)
          I0424 13:55:46.040891 26589 AggregateInfo.java:487] XXX second phase: avg type: STRING
          I0424 13:55:46.040935 26589 FunctionCallExpr.java:84] XXX mergeAgg avg
          I0424 13:55:46.042296 26589 FunctionCallExpr.java:87] 0: DECIMAL(7,2)
          I0424 13:55:46.042379 26589 AggregateInfo.java:487] XXX second phase: count type: BIGINT
          I0424 13:55:46.042424 26589 FunctionCallExpr.java:84] XXX mergeAgg count
          I0424 13:55:46.042461 26589 FunctionCallExpr.java:87] 0: DECIMAL(7,2)
          I0424 13:55:46.042665 26589 FunctionCallExpr.java:84] XXX mergeAgg count
          I0424 13:55:46.042726 26589 FunctionCallExpr.java:87] 0: DECIMAL(7,2)
          I0424 13:55:46.042798 26589 FunctionCallExpr.java:84] XXX mergeAgg avg
          I0424 13:55:46.042834 26589 FunctionCallExpr.java:87] 0: STRING
          
          Show
          kwho Michael Ho added a comment - - edited This appears to be a frontend issue. In particular, when there is any distinct aggregation in the select list, two phase aggregation will be generated as part of the plan. The problem is that the aggregate expressions generated for the second phase aggregation uses the intermediate result type of the first phase aggregation as input argument and we didn't preserve the types of the original arguments of the aggregate function anywhere when generating the merging aggregation of the second phase. This resulted in propagating the wrong input types to the second phase merging aggregation. Confirmed this theory with some logging output: // map all the remaining agg fns for (int i = 0; i < aggregateExprs_.size(); ++i) { FunctionCallExpr inputExpr = aggregateExprs_.get(i); Preconditions.checkState(inputExpr.isAggregateFunction()); // we're aggregating an intermediate slot of the 1st agg phase Expr aggExprParam = new SlotRef(inputDesc.getSlots().get(i + getGroupingExprs().size())); LOG.info("XXX second phase: " + inputExpr.getFnName() + " type: " + aggExprParam.getType()); FunctionCallExpr aggExpr = FunctionCallExpr.createMergeAggCall( inputExpr, Lists.newArrayList(aggExprParam)); secondPhaseAggExprs.add(aggExpr); } Preconditions.checkState( secondPhaseAggExprs.size() == aggregateExprs_.size() + distinctAggExprs.size()); for (FunctionCallExpr aggExpr: secondPhaseAggExprs) { aggExpr.analyzeNoThrow(analyzer); Preconditions.checkState(aggExpr.isAggregateFunction()); } ArrayList<Expr> substGroupingExprs = Expr.substituteList(origGroupingExprs, intermediateTupleSmap_, analyzer, false); secondPhaseDistinctAggInfo_ = new AggregateInfo(substGroupingExprs, secondPhaseAggExprs, AggPhase.SECOND); secondPhaseDistinctAggInfo_.createTupleDescs(analyzer); secondPhaseDistinctAggInfo_.createSecondPhaseAggSMap(this, distinctAggExprs); secondPhaseDistinctAggInfo_.createMergeAggInfo(analyzer); } Debugging output: I0424 13:55:46.040596 26589 FunctionCallExpr.java:84] XXX mergeAgg avg I0424 13:55:46.040648 26589 FunctionCallExpr.java:87] 0: DECIMAL(7,2) I0424 13:55:46.040740 26589 FunctionCallExpr.java:84] XXX mergeAgg count I0424 13:55:46.040779 26589 FunctionCallExpr.java:87] 0: DECIMAL(7,2) I0424 13:55:46.040891 26589 AggregateInfo.java:487] XXX second phase: avg type: STRING I0424 13:55:46.040935 26589 FunctionCallExpr.java:84] XXX mergeAgg avg I0424 13:55:46.042296 26589 FunctionCallExpr.java:87] 0: DECIMAL(7,2) I0424 13:55:46.042379 26589 AggregateInfo.java:487] XXX second phase: count type: BIGINT I0424 13:55:46.042424 26589 FunctionCallExpr.java:84] XXX mergeAgg count I0424 13:55:46.042461 26589 FunctionCallExpr.java:87] 0: DECIMAL(7,2) I0424 13:55:46.042665 26589 FunctionCallExpr.java:84] XXX mergeAgg count I0424 13:55:46.042726 26589 FunctionCallExpr.java:87] 0: DECIMAL(7,2) I0424 13:55:46.042798 26589 FunctionCallExpr.java:84] XXX mergeAgg avg I0424 13:55:46.042834 26589 FunctionCallExpr.java:87] 0: STRING
          Hide
          kwho Michael Ho added a comment -

          https://github.com/apache/incubator-impala/commit/42ca45e8307ba4c831ad7ac8da86bbbd957fe4cd

          IMPALA-5251: Fix propagation of input exprs' types in 2-phase agg
          Since commit d2d3f4c (on asf-master), TAggregateExpr contains
          the logical input types of the Aggregate Expr. The reason they
          are included is that merging aggregate expressions will have
          input tyes of the intermediate values which aren't necessarily
          the same as the input types. For instance, NDV() uses a binary
          blob as its intermediate value and it's passed to its merge
          aggregate expressions as a StringVal but the input type of NDV()
          in the query could be DecimalVal. In this case, we consider
          DecimalVal as the logical input type while StringVal is the
          intermediate type. The logical input types are accessed by the
          BE via GetConstFnAttr() during interpretation and constant
          propagation during codegen.

          To handle distinct aggregate expressions (e.g. select count(distinct)),
          the FE uses 2-phase aggregation by introducing an extra phase of
          split/merge aggregation in which the distinct aggregate expressions'
          inputs are coverted and added to the group-by expressions in the first
          phase while the non-distinct aggregate expressions go through the normal
          split/merge treatement.

          The bug is that the existing code incorrectly propagates the intermediate
          types of the non-grouping aggregate expressions as the logical input types
          to the merging aggregate expressions in the second phase of aggregation.
          The input aggregate expressions for the non-distinct aggregate expressions
          in the second phase aggregation are already merging aggregate expressions
          (from phase one) in which case we should not treat its input types as
          logical input types.

          This change fixes the problem above by checking if the input aggregate
          expression passed to FunctionCallExpr.createMergeAggCall() is already
          a merging aggregate expression. If so, it will use the logical input
          types recorded in its 'mergeAggInputFn_' as references for its logical
          input types instead of the aggregate expression input types themselves.

          Change-Id: I158303b20d1afdff23c67f3338b9c4af2ad80691
          Reviewed-on: http://gerrit.cloudera.org:8080/6724
          Reviewed-by: Alex Behm <alex.behm@cloudera.com>
          Tested-by: Impala Public Jenkins

          Show
          kwho Michael Ho added a comment - https://github.com/apache/incubator-impala/commit/42ca45e8307ba4c831ad7ac8da86bbbd957fe4cd IMPALA-5251 : Fix propagation of input exprs' types in 2-phase agg Since commit d2d3f4c (on asf-master), TAggregateExpr contains the logical input types of the Aggregate Expr. The reason they are included is that merging aggregate expressions will have input tyes of the intermediate values which aren't necessarily the same as the input types. For instance, NDV() uses a binary blob as its intermediate value and it's passed to its merge aggregate expressions as a StringVal but the input type of NDV() in the query could be DecimalVal. In this case, we consider DecimalVal as the logical input type while StringVal is the intermediate type. The logical input types are accessed by the BE via GetConstFnAttr() during interpretation and constant propagation during codegen. To handle distinct aggregate expressions (e.g. select count(distinct)), the FE uses 2-phase aggregation by introducing an extra phase of split/merge aggregation in which the distinct aggregate expressions' inputs are coverted and added to the group-by expressions in the first phase while the non-distinct aggregate expressions go through the normal split/merge treatement. The bug is that the existing code incorrectly propagates the intermediate types of the non-grouping aggregate expressions as the logical input types to the merging aggregate expressions in the second phase of aggregation. The input aggregate expressions for the non-distinct aggregate expressions in the second phase aggregation are already merging aggregate expressions (from phase one) in which case we should not treat its input types as logical input types. This change fixes the problem above by checking if the input aggregate expression passed to FunctionCallExpr.createMergeAggCall() is already a merging aggregate expression. If so, it will use the logical input types recorded in its 'mergeAggInputFn_' as references for its logical input types instead of the aggregate expression input types themselves. Change-Id: I158303b20d1afdff23c67f3338b9c4af2ad80691 Reviewed-on: http://gerrit.cloudera.org:8080/6724 Reviewed-by: Alex Behm <alex.behm@cloudera.com> Tested-by: Impala Public Jenkins

            People

            • Assignee:
              kwho Michael Ho
              Reporter:
              dhecht Dan Hecht
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development