diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 231dc9f364..f7345677c1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1663,6 +1663,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true, "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + "and follow-up operators in the query plan and merges them if they meet some preconditions."), + HIVE_REMOVE_SQ_COUNT_CHECK("hive.optimize.remove.sq_count_check", false, + "Whether to remove an extra join with sq_count_check for scalar subqueries " + + "with constant group by keys."), // CTE HIVE_CTE_MATERIALIZE_THRESHOLD("hive.optimize.cte.materialize.threshold", -1, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRemoveSqCountCheck.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRemoveSqCountCheck.java new file mode 100644 index 0000000000..7f3a4a4d89 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRemoveSqCountCheck.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import org.apache.calcite.plan.RelOptPredicateList; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; + +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * Planner rule that removes UDF sq_count_check from a + * plan if group by keys in a subquery are constant + * and there is no windowing or grouping sets + */ +public class HiveRemoveSqCountCheck extends RelOptRule { + + public static final HiveRemoveSqCountCheck INSTANCE = + new HiveRemoveSqCountCheck(); + + //match if there is filter (sq_count_check) as right input of a join which is left + // input of another join + public HiveRemoveSqCountCheck() { + super(operand(Join.class, + some( + operand(Project.class, + operand(Join.class, + some( + operand(RelNode.class, any()), + operand(Filter.class, any()))) + ), + operand(Project.class, + operand(Aggregate.class, + any())) + ) + ), HiveRelFactories.HIVE_BUILDER, "HiveRemoveSqCountCheck"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final RelNode filter = call.rel(4); + if(filter instanceof HiveFilter) { + HiveFilter hiveFilter = (HiveFilter)filter; + // check if it has sq_count_check + if(isSqlCountCheck(hiveFilter)) { + return true; + } + } + return false; + } + + private boolean isSqlCountCheck(final HiveFilter filter) { + // look at hivesubqueryremoverule to see how is this filter created + if(filter.getCondition() instanceof RexCall) { + final RexCall condition = (RexCall)filter.getCondition(); + if(condition.getKind() == SqlKind.LESS_THAN_OR_EQUAL) { + final List operands = condition.getOperands(); + if(operands.get(0) instanceof RexCall) { + final RexCall op = (RexCall)operands.get(0); + if(op.getOperator().getName().equals("sq_count_check")) { + return true; + } + } + } + } + return false; + } + + + @Override public void onMatch(RelOptRuleCall call) { + final Join topJoin= call.rel(0); + final Join join = call.rel(2); + final Aggregate aggregate = call.rel(6); + + // in presence of grouping sets we can't remove sq_count_check + if(aggregate.indicator) { + return; + } + + final int groupCount = aggregate.getGroupCount(); + + final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder(); + final RelMetadataQuery mq = RelMetadataQuery.instance(); + final RelOptPredicateList predicates = + mq.getPulledUpPredicates(aggregate.getInput()); + if (predicates == null) { + return; + } + final NavigableMap map = new TreeMap<>(); + for (int key : aggregate.getGroupSet()) { + final RexInputRef ref = + rexBuilder.makeInputRef(aggregate.getInput(), key); + if (predicates.constantMap.containsKey(ref)) { + map.put(key, predicates.constantMap.get(ref)); + } + } + + // None of the group expressions are constant. Nothing to do. + if (map.isEmpty()) { + return; + } + + if (groupCount == map.size()) { + // join(left, join.getRight) + RelNode newJoin = HiveJoin.getJoin(topJoin.getCluster(), join.getLeft(), topJoin.getRight(), + topJoin.getCondition(), topJoin.getJoinType()); + call.transformTo(newJoin); + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java index 83d3f7436d..2dca6a25ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java @@ -54,6 +54,8 @@ import java.util.List; import java.util.Set; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveSubQRemoveRelBuilder; import org.apache.hadoop.hive.ql.optimizer.calcite.SubqueryConf; @@ -74,75 +76,78 @@ * the rewrite, and the product of the rewrite will be a {@link Correlate}. * The Correlate can be removed using {@link RelDecorrelator}. */ -public abstract class HiveSubQueryRemoveRule extends RelOptRule{ - - public static final HiveSubQueryRemoveRule REL_NODE = - new HiveSubQueryRemoveRule( - operand(RelNode.class, null, HiveSubQueryFinder.RELNODE_PREDICATE, - any()), - HiveRelFactories.HIVE_BUILDER, "SubQueryRemoveRule:Filter") { - public void onMatch(RelOptRuleCall call) { - final RelNode relNode = call.rel(0); - //TODO: replace HiveSubQRemoveRelBuilder with calcite's once calcite 1.11.0 is released - final HiveSubQRemoveRelBuilder builder = new HiveSubQRemoveRelBuilder(null, call.rel(0).getCluster(), null); - - // if subquery is in FILTER - if(relNode instanceof Filter) { - final Filter filter = call.rel(0); - final RexSubQuery e = - RexUtil.SubQueryFinder.find(filter.getCondition()); - assert e != null; - - final RelOptUtil.Logic logic = - LogicVisitor.find(RelOptUtil.Logic.TRUE, - ImmutableList.of(filter.getCondition()), e); - builder.push(filter.getInput()); - final int fieldCount = builder.peek().getRowType().getFieldCount(); - - assert(filter instanceof HiveFilter); - SubqueryConf subqueryConfig = filter.getCluster().getPlanner().getContext().unwrap(SubqueryConf.class); - boolean isCorrScalarQuery = subqueryConfig.getCorrScalarRexSQWithAgg().contains(e.rel); - boolean hasNoWindowingAndNoGby = subqueryConfig.getScalarAggWithoutGbyWindowing().contains(e.rel); - - final RexNode target = apply(e, HiveFilter.getVariablesSet(e), logic, - builder, 1, fieldCount, isCorrScalarQuery, hasNoWindowingAndNoGby); - final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target); - builder.filter(shuttle.apply(filter.getCondition())); - builder.project(fields(builder, filter.getRowType().getFieldCount())); - call.transformTo(builder.build()); - } - // if subquery is in PROJECT - else if(relNode instanceof Project) { - final Project project = call.rel(0); - final RexSubQuery e = - RexUtil.SubQueryFinder.find(project.getProjects()); - assert e != null; - - final RelOptUtil.Logic logic = - LogicVisitor.find(RelOptUtil.Logic.TRUE_FALSE_UNKNOWN, - project.getProjects(), e); - builder.push(project.getInput()); - final int fieldCount = builder.peek().getRowType().getFieldCount(); - - SubqueryConf subqueryConfig = project.getCluster().getPlanner().getContext().unwrap(SubqueryConf.class); - boolean isCorrScalarQuery = subqueryConfig.getCorrScalarRexSQWithAgg().contains(e.rel); - boolean hasNoWindowingAndNoGby = subqueryConfig.getScalarAggWithoutGbyWindowing().contains(e.rel); - - final RexNode target = apply(e, HiveFilter.getVariablesSet(e), - logic, builder, 1, fieldCount, isCorrScalarQuery, hasNoWindowingAndNoGby); - final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target); - builder.project(shuttle.apply(project.getProjects()), - project.getRowType().getFieldNames()); - call.transformTo(builder.build()); - } - } - }; +public class HiveSubQueryRemoveRule extends RelOptRule{ + + private HiveConf conf; + + public HiveSubQueryRemoveRule(HiveConf conf) { + super(operand(RelNode.class, null, HiveSubQueryFinder.RELNODE_PREDICATE, + any()), + HiveRelFactories.HIVE_BUILDER, "SubQueryRemoveRule:Filter") ; + this.conf = conf; + + } + public void onMatch(RelOptRuleCall call) { + final RelNode relNode = call.rel(0); + //TODO: replace HiveSubQRemoveRelBuilder with calcite's once calcite 1.11.0 is released + final HiveSubQRemoveRelBuilder builder = new HiveSubQRemoveRelBuilder(null, call.rel(0).getCluster(), null); + + // if subquery is in FILTER + if(relNode instanceof Filter) { + final Filter filter = call.rel(0); + final RexSubQuery e = + RexUtil.SubQueryFinder.find(filter.getCondition()); + assert e != null; + + final RelOptUtil.Logic logic = + LogicVisitor.find(RelOptUtil.Logic.TRUE, + ImmutableList.of(filter.getCondition()), e); + builder.push(filter.getInput()); + final int fieldCount = builder.peek().getRowType().getFieldCount(); + + assert(filter instanceof HiveFilter); + SubqueryConf subqueryConfig = filter.getCluster().getPlanner().getContext().unwrap(SubqueryConf.class); + boolean isCorrScalarQuery = subqueryConfig.getCorrScalarRexSQWithAgg().contains(e.rel); + boolean hasNoWindowingAndNoGby = subqueryConfig.getScalarAggWithoutGbyWindowing().contains(e.rel); + + final RexNode target = apply(e, HiveFilter.getVariablesSet(e), logic, + builder, 1, fieldCount, isCorrScalarQuery, hasNoWindowingAndNoGby); + final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target); + builder.filter(shuttle.apply(filter.getCondition())); + builder.project(fields(builder, filter.getRowType().getFieldCount())); + call.transformTo(builder.build()); + } + // if subquery is in PROJECT + else if(relNode instanceof Project) { + final Project project = call.rel(0); + final RexSubQuery e = + RexUtil.SubQueryFinder.find(project.getProjects()); + assert e != null; + + final RelOptUtil.Logic logic = + LogicVisitor.find(RelOptUtil.Logic.TRUE_FALSE_UNKNOWN, + project.getProjects(), e); + builder.push(project.getInput()); + final int fieldCount = builder.peek().getRowType().getFieldCount(); + + SubqueryConf subqueryConfig = project.getCluster().getPlanner().getContext().unwrap(SubqueryConf.class); + boolean isCorrScalarQuery = subqueryConfig.getCorrScalarRexSQWithAgg().contains(e.rel); + boolean hasNoWindowingAndNoGby = subqueryConfig.getScalarAggWithoutGbyWindowing().contains(e.rel); + + final RexNode target = apply(e, HiveFilter.getVariablesSet(e), + logic, builder, 1, fieldCount, isCorrScalarQuery, hasNoWindowingAndNoGby); + final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target); + builder.project(shuttle.apply(project.getProjects()), + project.getRowType().getFieldNames()); + call.transformTo(builder.build()); + } + } - private HiveSubQueryRemoveRule(RelOptRuleOperand operand, + /*private HiveSubQueryRemoveRule(RelOptRuleOperand operand, RelBuilderFactory relBuilderFactory, String description) { super(operand, relBuilderFactory, description); - } + } */ // given a subquery it checks to see what is the aggegate function /// if COUNT returns true since COUNT produces 0 on empty result set @@ -166,264 +171,277 @@ private SqlTypeName getAggTypeForScalarSub(RexSubQuery e) { } protected RexNode apply(RexSubQuery e, Set variablesSet, - RelOptUtil.Logic logic, - HiveSubQRemoveRelBuilder builder, int inputCount, int offset, - boolean isCorrScalarAgg, - boolean hasNoWindowingAndNoGby ) { + RelOptUtil.Logic logic, + HiveSubQRemoveRelBuilder builder, int inputCount, int offset, + boolean isCorrScalarAgg, + boolean hasNoWindowingAndNoGby ) { switch (e.getKind()) { - case SCALAR_QUERY: - // if scalar query has aggregate and no windowing and no gby avoid adding sq_count_check - // since it is guaranteed to produce at most one row - if(!hasNoWindowingAndNoGby) { - builder.push(e.rel); - // returns single row/column - builder.aggregate(builder.groupKey(), builder.count(false, "cnt")); - - SqlFunction countCheck = - new SqlFunction("sq_count_check", SqlKind.OTHER_FUNCTION, ReturnTypes.BIGINT, - InferTypes.RETURN_TYPE, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION); - - // we create FILTER (sq_count_check(count()) <= 1) instead of PROJECT because RelFieldTrimmer - // ends up getting rid of Project since it is not used further up the tree - builder.filter(builder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, - builder.call(countCheck, builder.field("cnt")), builder.literal(1))); - if (!variablesSet.isEmpty()) { - builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); - } else - builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); - - offset++; - } - if(isCorrScalarAgg) { - // Transformation : - // Outer Query Left Join (inner query) on correlated predicate and preserve rows only from left side. - builder.push(e.rel); - final List parentQueryFields = new ArrayList<>(); + case SCALAR_QUERY: + // if scalar query has aggregate and no windowing and no gby avoid adding sq_count_check + // since it is guaranteed to produce at most one row + if(!hasNoWindowingAndNoGby) { + final List parentQueryFields = new ArrayList<>(); + if (conf.getBoolVar(ConfVars.HIVE_REMOVE_SQ_COUNT_CHECK)) { + // we want to have project after join since sq_count_check's count() expression wouldn't + // be needed further up parentQueryFields.addAll(builder.fields()); + } - // id is appended since there could be multiple scalar subqueries and FILTER - // is created using field name - String indicator = "alwaysTrue" + e.rel.getId(); - parentQueryFields.add(builder.alias(builder.literal(true), indicator)); - builder.project(parentQueryFields); + builder.push(e.rel); + // returns single row/column + builder.aggregate(builder.groupKey(), builder.count(false, "cnt")); + + SqlFunction countCheck = + new SqlFunction("sq_count_check", SqlKind.OTHER_FUNCTION, ReturnTypes.BIGINT, + InferTypes.RETURN_TYPE, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION); + + // we create FILTER (sq_count_check(count()) <= 1) instead of PROJECT because RelFieldTrimmer + // ends up getting rid of Project since it is not used further up the tree + builder.filter(builder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, + builder.call(countCheck, builder.field("cnt")), builder.literal(1))); + if (!variablesSet.isEmpty()) { builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); + } else + builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); - final ImmutableList.Builder operands = ImmutableList.builder(); - RexNode literal; - if(isAggZeroOnEmpty(e)) { - // since count has a return type of BIG INT we need to make a literal of type big int - // relbuilder's literal doesn't allow this - literal = e.rel.getCluster().getRexBuilder().makeBigintLiteral(new BigDecimal(0)); - } - else { - literal = e.rel.getCluster().getRexBuilder().makeNullLiteral(getAggTypeForScalarSub(e)); - } - operands.add((builder.isNull(builder.field(indicator))), literal); - operands.add(field(builder, 1, builder.fields().size()-2)); - return builder.call(SqlStdOperatorTable.CASE, operands.build()); + if (conf.getBoolVar(ConfVars.HIVE_REMOVE_SQ_COUNT_CHECK)) { + builder.project(parentQueryFields); + } + else { + offset++; } - //Transformation is to left join for correlated predicates and inner join otherwise, - // but do a count on inner side before that to make sure it generates atmost 1 row. + } + if(isCorrScalarAgg) { + // Transformation : + // Outer Query Left Join (inner query) on correlated predicate and preserve rows only from left side. builder.push(e.rel); + final List parentQueryFields = new ArrayList<>(); + parentQueryFields.addAll(builder.fields()); + + // id is appended since there could be multiple scalar subqueries and FILTER + // is created using field name + String indicator = "alwaysTrue" + e.rel.getId(); + parentQueryFields.add(builder.alias(builder.literal(true), indicator)); + builder.project(parentQueryFields); builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); - return field(builder, inputCount, offset); - case IN: - case EXISTS: - // Most general case, where the left and right keys might have nulls, and - // caller requires 3-valued logic return. - // - // select e.deptno, e.deptno in (select deptno from emp) - // - // becomes - // - // select e.deptno, - // case - // when ct.c = 0 then false - // when dt.i is not null then true - // when e.deptno is null then null - // when ct.ck < ct.c then null - // else false - // end - // from e - // left join ( - // (select count(*) as c, count(deptno) as ck from emp) as ct - // cross join (select distinct deptno, true as i from emp)) as dt - // on e.deptno = dt.deptno - // - // If keys are not null we can remove "ct" and simplify to - // - // select e.deptno, - // case - // when dt.i is not null then true - // else false - // end - // from e - // left join (select distinct deptno, true as i from emp) as dt - // on e.deptno = dt.deptno - // - // We could further simplify to - // - // select e.deptno, - // dt.i is not null - // from e - // left join (select distinct deptno, true as i from emp) as dt - // on e.deptno = dt.deptno - // - // but have not yet. - // - // If the logic is TRUE we can just kill the record if the condition - // evaluates to FALSE or UNKNOWN. Thus the query simplifies to an inner - // join: - // - // select e.deptno, - // true - // from e - // inner join (select distinct deptno from emp) as dt - // on e.deptno = dt.deptno - // + final ImmutableList.Builder operands = ImmutableList.builder(); + RexNode literal; + if(isAggZeroOnEmpty(e)) { + // since count has a return type of BIG INT we need to make a literal of type big int + // relbuilder's literal doesn't allow this + literal = e.rel.getCluster().getRexBuilder().makeBigintLiteral(new BigDecimal(0)); + } + else { + literal = e.rel.getCluster().getRexBuilder().makeNullLiteral(getAggTypeForScalarSub(e)); + } + operands.add((builder.isNull(builder.field(indicator))), literal); + operands.add(field(builder, 1, builder.fields().size()-2)); + return builder.call(SqlStdOperatorTable.CASE, operands.build()); + } - builder.push(e.rel); - final List fields = new ArrayList<>(); - switch (e.getKind()) { - case IN: - fields.addAll(builder.fields()); - // Transformation: sq_count_check(count(*), true) FILTER is generated on top - // of subquery which is then joined (LEFT or INNER) with outer query - // This transformation is done to add run time check using sq_count_check to - // throw an error if subquery is producing zero row, since with aggregate this - // will produce wrong results (because we further rewrite such queries into JOIN) - if(isCorrScalarAgg) { - // returns single row/column - builder.aggregate(builder.groupKey(), - builder.count(false, "cnt_in")); - - if (!variablesSet.isEmpty()) { - builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); - } else { - builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); - } + //Transformation is to left join for correlated predicates and inner join otherwise, + // but do a count on inner side before that to make sure it generates atmost 1 row. + builder.push(e.rel); + builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); + return field(builder, inputCount, offset); + + case IN: + case EXISTS: + // Most general case, where the left and right keys might have nulls, and + // caller requires 3-valued logic return. + // + // select e.deptno, e.deptno in (select deptno from emp) + // + // becomes + // + // select e.deptno, + // case + // when ct.c = 0 then false + // when dt.i is not null then true + // when e.deptno is null then null + // when ct.ck < ct.c then null + // else false + // end + // from e + // left join ( + // (select count(*) as c, count(deptno) as ck from emp) as ct + // cross join (select distinct deptno, true as i from emp)) as dt + // on e.deptno = dt.deptno + // + // If keys are not null we can remove "ct" and simplify to + // + // select e.deptno, + // case + // when dt.i is not null then true + // else false + // end + // from e + // left join (select distinct deptno, true as i from emp) as dt + // on e.deptno = dt.deptno + // + // We could further simplify to + // + // select e.deptno, + // dt.i is not null + // from e + // left join (select distinct deptno, true as i from emp) as dt + // on e.deptno = dt.deptno + // + // but have not yet. + // + // If the logic is TRUE we can just kill the record if the condition + // evaluates to FALSE or UNKNOWN. Thus the query simplifies to an inner + // join: + // + // select e.deptno, + // true + // from e + // inner join (select distinct deptno from emp) as dt + // on e.deptno = dt.deptno + // + + builder.push(e.rel); + final List fields = new ArrayList<>(); + switch (e.getKind()) { + case IN: + fields.addAll(builder.fields()); + // Transformation: sq_count_check(count(*), true) FILTER is generated on top + // of subquery which is then joined (LEFT or INNER) with outer query + // This transformation is done to add run time check using sq_count_check to + // throw an error if subquery is producing zero row, since with aggregate this + // will produce wrong results (because we further rewrite such queries into JOIN) + if(isCorrScalarAgg) { + // returns single row/column + builder.aggregate(builder.groupKey(), + builder.count(false, "cnt_in")); - SqlFunction inCountCheck = new SqlFunction("sq_count_check", SqlKind.OTHER_FUNCTION, ReturnTypes.BIGINT, - InferTypes.RETURN_TYPE, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION); - - // we create FILTER (sq_count_check(count()) > 0) instead of PROJECT because RelFieldTrimmer - // ends up getting rid of Project since it is not used further up the tree - builder.filter(builder.call(SqlStdOperatorTable.GREATER_THAN, - //true here indicates that sq_count_check is for IN/NOT IN subqueries - builder.call(inCountCheck, builder.field("cnt_in"), builder.literal(true)), - builder.literal(0))); - offset = offset + 1; - builder.push(e.rel); - } - } + if (!variablesSet.isEmpty()) { + builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); + } else { + builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); + } - // First, the cross join - switch (logic) { - case TRUE_FALSE_UNKNOWN: - case UNKNOWN_AS_TRUE: - // Since EXISTS/NOT EXISTS are not affected by presence of - // null keys we do not need to generate count(*), count(c) - if (e.getKind() == SqlKind.EXISTS) { - logic = RelOptUtil.Logic.TRUE_FALSE; - break; - } - builder.aggregate(builder.groupKey(), - builder.count(false, "c"), - builder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, "ck", - builder.fields())); - builder.as("ct"); - if( !variablesSet.isEmpty()) - { - //builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); - builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); - } - else - builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); + SqlFunction inCountCheck = new SqlFunction("sq_count_check", SqlKind.OTHER_FUNCTION, ReturnTypes.BIGINT, + InferTypes.RETURN_TYPE, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION); - offset += 2; - builder.push(e.rel); - break; + // we create FILTER (sq_count_check(count()) > 0) instead of PROJECT because RelFieldTrimmer + // ends up getting rid of Project since it is not used further up the tree + builder.filter(builder.call(SqlStdOperatorTable.GREATER_THAN, + //true here indicates that sq_count_check is for IN/NOT IN subqueries + builder.call(inCountCheck, builder.field("cnt_in"), builder.literal(true)), + builder.literal(0))); + offset = offset + 1; + builder.push(e.rel); } + } - // Now the left join - switch (logic) { - case TRUE: - if (fields.isEmpty()) { - builder.project(builder.alias(builder.literal(true), "i" + e.rel.getId())); - builder.aggregate(builder.groupKey(0)); - } else { - builder.aggregate(builder.groupKey(fields)); - } - break; - default: - fields.add(builder.alias(builder.literal(true), "i" + e.rel.getId())); - builder.project(fields); - builder.distinct(); - } - builder.as("dt"); - final List conditions = new ArrayList<>(); - for (Pair pair - : Pair.zip(e.getOperands(), builder.fields())) { - conditions.add( - builder.equals(pair.left, RexUtil.shift(pair.right, offset))); + // First, the cross join + switch (logic) { + case TRUE_FALSE_UNKNOWN: + case UNKNOWN_AS_TRUE: + // Since EXISTS/NOT EXISTS are not affected by presence of + // null keys we do not need to generate count(*), count(c) + if (e.getKind() == SqlKind.EXISTS) { + logic = RelOptUtil.Logic.TRUE_FALSE; + break; } - switch (logic) { - case TRUE: - builder.join(JoinRelType.INNER, builder.and(conditions), variablesSet); - return builder.literal(true); + builder.aggregate(builder.groupKey(), + builder.count(false, "c"), + builder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, "ck", + builder.fields())); + builder.as("ct"); + if( !variablesSet.isEmpty()) + { + //builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); + builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); } - builder.join(JoinRelType.LEFT, builder.and(conditions), variablesSet); + else + builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); - final List keyIsNulls = new ArrayList<>(); - for (RexNode operand : e.getOperands()) { - if (operand.getType().isNullable()) { - keyIsNulls.add(builder.isNull(operand)); - } - } - final ImmutableList.Builder operands = ImmutableList.builder(); - switch (logic) { - case TRUE_FALSE_UNKNOWN: - case UNKNOWN_AS_TRUE: - operands.add( - builder.equals(builder.field("ct", "c"), builder.literal(0)), - builder.literal(false)); - //now that we are using LEFT OUTER JOIN to join inner count, count(*) - // with outer table, we wouldn't be able to tell if count is zero - // for inner table since inner join with correlated values will get rid - // of all values where join cond is not true (i.e where actual inner table - // will produce zero result). To handle this case we need to check both - // count is zero or count is null - operands.add((builder.isNull(builder.field("ct", "c"))), builder.literal(false)); - break; - } - operands.add(builder.isNotNull(builder.field("dt", "i" + e.rel.getId())), - builder.literal(true)); - if (!keyIsNulls.isEmpty()) { - //Calcite creates null literal with Null type here but because HIVE doesn't support null type - // it is appropriately typed boolean - operands.add(builder.or(keyIsNulls), e.rel.getCluster().getRexBuilder().makeNullLiteral(SqlTypeName.BOOLEAN)); - // we are creating filter here so should not be returning NULL. Not sure why Calcite return NULL - //operands.add(builder.or(keyIsNulls), builder.literal(false)); + offset += 2; + builder.push(e.rel); + break; + } + + // Now the left join + switch (logic) { + case TRUE: + if (fields.isEmpty()) { + builder.project(builder.alias(builder.literal(true), "i" + e.rel.getId())); + builder.aggregate(builder.groupKey(0)); + } else { + builder.aggregate(builder.groupKey(fields)); } - RexNode b = builder.literal(true); - switch (logic) { - case TRUE_FALSE_UNKNOWN: - b = e.rel.getCluster().getRexBuilder().makeNullLiteral(SqlTypeName.BOOLEAN); - // fall through - case UNKNOWN_AS_TRUE: - operands.add( - builder.call(SqlStdOperatorTable.LESS_THAN, - builder.field("ct", "ck"), builder.field("ct", "c")), - b); - break; + break; + default: + fields.add(builder.alias(builder.literal(true), "i" + e.rel.getId())); + builder.project(fields); + builder.distinct(); + } + builder.as("dt"); + final List conditions = new ArrayList<>(); + for (Pair pair + : Pair.zip(e.getOperands(), builder.fields())) { + conditions.add( + builder.equals(pair.left, RexUtil.shift(pair.right, offset))); + } + switch (logic) { + case TRUE: + builder.join(JoinRelType.INNER, builder.and(conditions), variablesSet); + return builder.literal(true); + } + builder.join(JoinRelType.LEFT, builder.and(conditions), variablesSet); + + final List keyIsNulls = new ArrayList<>(); + for (RexNode operand : e.getOperands()) { + if (operand.getType().isNullable()) { + keyIsNulls.add(builder.isNull(operand)); } - operands.add(builder.literal(false)); - return builder.call(SqlStdOperatorTable.CASE, operands.build()); + } + final ImmutableList.Builder operands = ImmutableList.builder(); + switch (logic) { + case TRUE_FALSE_UNKNOWN: + case UNKNOWN_AS_TRUE: + operands.add( + builder.equals(builder.field("ct", "c"), builder.literal(0)), + builder.literal(false)); + //now that we are using LEFT OUTER JOIN to join inner count, count(*) + // with outer table, we wouldn't be able to tell if count is zero + // for inner table since inner join with correlated values will get rid + // of all values where join cond is not true (i.e where actual inner table + // will produce zero result). To handle this case we need to check both + // count is zero or count is null + operands.add((builder.isNull(builder.field("ct", "c"))), builder.literal(false)); + break; + } + operands.add(builder.isNotNull(builder.field("dt", "i" + e.rel.getId())), + builder.literal(true)); + if (!keyIsNulls.isEmpty()) { + //Calcite creates null literal with Null type here but because HIVE doesn't support null type + // it is appropriately typed boolean + operands.add(builder.or(keyIsNulls), e.rel.getCluster().getRexBuilder().makeNullLiteral(SqlTypeName.BOOLEAN)); + // we are creating filter here so should not be returning NULL. Not sure why Calcite return NULL + //operands.add(builder.or(keyIsNulls), builder.literal(false)); + } + RexNode b = builder.literal(true); + switch (logic) { + case TRUE_FALSE_UNKNOWN: + b = e.rel.getCluster().getRexBuilder().makeNullLiteral(SqlTypeName.BOOLEAN); + // fall through + case UNKNOWN_AS_TRUE: + operands.add( + builder.call(SqlStdOperatorTable.LESS_THAN, + builder.field("ct", "ck"), builder.field("ct", "c")), + b); + break; + } + operands.add(builder.literal(false)); + return builder.call(SqlStdOperatorTable.CASE, operands.build()); - default: - throw new AssertionError(e.getKind()); + default: + throw new AssertionError(e.getKind()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 88054e7379..268f333e08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -200,6 +200,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveReduceExpressionsWithStatsRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRelDecorrelator; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRelFieldTrimmer; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRemoveSqCountCheck; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRulesRegistry; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSemiJoinRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortJoinReduceRule; @@ -1379,7 +1380,7 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu //Remove subquery LOG.debug("Plan before removing subquery:\n" + RelOptUtil.toString(calciteGenPlan)); calciteGenPlan = hepPlan(calciteGenPlan, false, mdProvider.getMetadataProvider(), null, - HiveSubQueryRemoveRule.REL_NODE); + new HiveSubQueryRemoveRule(conf)); LOG.debug("Plan just after removing subquery:\n" + RelOptUtil.toString(calciteGenPlan)); calciteGenPlan = HiveRelDecorrelator.decorrelateQuery(calciteGenPlan); @@ -1529,6 +1530,17 @@ public RelOptMaterialization apply(RelOptMaterialization materialization) { perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Semijoin conversion"); } + // 8. Get rid of sq_count_check if group by key is constant (HIVE-) + if (conf.getBoolVar(ConfVars.HIVE_REMOVE_SQ_COUNT_CHECK)) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + calciteOptimizedPlan = + hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, + HiveRemoveSqCountCheck.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: Removing sq_count_check UDF "); + } + + // 8. Run rule to fix windowing issue when it is done over // aggregation columns (HIVE-10627) diff --git a/ql/src/test/queries/clientpositive/subquery_scalar.q b/ql/src/test/queries/clientpositive/subquery_scalar.q index 876a1e98f8..7dc47931cc 100644 --- a/ql/src/test/queries/clientpositive/subquery_scalar.q +++ b/ql/src/test/queries/clientpositive/subquery_scalar.q @@ -214,3 +214,13 @@ explain select * from part where p_size > (select max(p_size) from part group b -- same as above, for correlated columns explain select * from part where p_size > (select max(p_size) from part p where p.p_type = part.p_type group by p_type); +-- following queries shouldn't have a join with sq_count_check +set hive.optimize.remove.sq_count_check = true; +explain select key, count(*) from src group by key having count(*) > + (select count(*) from src s1 group by 4); + +explain select key, count(*) from src group by key having count(*) > + (select count(*) from src s1 where s1.key = '90' group by s1.key ); + +set hive.optimize.remove.sq_count_check = false; + diff --git a/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out b/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out index 2a10adb226..aedfd7f2d3 100644 --- a/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out +++ b/ql/src/test/results/clientpositive/llap/subquery_scalar.q.out @@ -5934,3 +5934,249 @@ STAGE PLANS: Processor Tree: ListSink +Warning: Shuffle Join MERGEJOIN[18][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 3' is a cross product +PREHOOK: query: explain select key, count(*) from src group by key having count(*) > + (select count(*) from src s1 group by 4) +PREHOOK: type: QUERY +POSTHOOK: query: explain select key, count(*) from src group by key having count(*) > + (select count(*) from src s1 group by 4) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 5 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 205 Data size: 19475 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 205 Data size: 19475 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint) + Execution mode: llap + LLAP IO: no inputs + Map 4 + Map Operator Tree: + TableScan + alias: s1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + keys: 4 (type: int) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint) + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 205 Data size: 19475 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 205 Data size: 19475 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1, _col2 + residual filter predicates: {(_col1 > _col2)} + Statistics: Num rows: 68 Data size: 7004 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 68 Data size: 6460 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 68 Data size: 6460 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: int) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col1 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +Warning: Shuffle Join MERGEJOIN[20][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 3' is a cross product +PREHOOK: query: explain select key, count(*) from src group by key having count(*) > + (select count(*) from src s1 where s1.key = '90' group by s1.key ) +PREHOOK: type: QUERY +POSTHOOK: query: explain select key, count(*) from src group by key having count(*) > + (select count(*) from src s1 where s1.key = '90' group by s1.key ) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE), Reducer 4 (CUSTOM_SIMPLE_EDGE) + Reducer 4 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 205 Data size: 19475 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 205 Data size: 19475 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint) + Filter Operator + predicate: (key = '90') (type: boolean) + Statistics: Num rows: 2 Data size: 174 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 2 Data size: 174 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + keys: '90' (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 94 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 94 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint) + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 205 Data size: 19475 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 205 Data size: 19475 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1, _col2 + residual filter predicates: {(_col1 > _col2)} + Statistics: Num rows: 68 Data size: 7004 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 68 Data size: 6460 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 68 Data size: 6460 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 94 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col1 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + diff --git a/ql/src/test/results/clientpositive/perf/query23.q.out b/ql/src/test/results/clientpositive/perf/query23.q.out index ebd2271108..0e34b90a1a 100644 --- a/ql/src/test/results/clientpositive/perf/query23.q.out +++ b/ql/src/test/results/clientpositive/perf/query23.q.out @@ -1,5 +1,5 @@ -Warning: Shuffle Join MERGEJOIN[367][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 25' is a cross product Warning: Shuffle Join MERGEJOIN[369][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 30' is a cross product +Warning: Shuffle Join MERGEJOIN[367][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 25' is a cross product PREHOOK: query: explain with frequent_ss_items as (select substr(i_item_desc,1,30) itemdesc,i_item_sk item_sk,d_date solddate,count(*) cnt