diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java index d0b1757a82..bdf995548f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java @@ -22,6 +22,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveAlgorithmsConf; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRulesRegistry; + import java.util.Set; @@ -29,17 +30,18 @@ private HiveAlgorithmsConf algoConfig; private HiveRulesRegistry registry; private CalciteConnectionConfig calciteConfig; - private Set corrScalarRexSQWithAgg; + private SubqueryConf subqueryConfig; public HivePlannerContext(HiveAlgorithmsConf algoConfig, HiveRulesRegistry registry, - CalciteConnectionConfig calciteConfig, Set corrScalarRexSQWithAgg) { + CalciteConnectionConfig calciteConfig, Set corrScalarRexSQWithAgg, + Set scalarAggNoGbyWindowing) { this.algoConfig = algoConfig; this.registry = registry; this.calciteConfig = calciteConfig; // this is to keep track if a subquery is correlated and contains aggregate // this is computed in CalcitePlanner while planning and is later required by subuery remove rule // hence this is passed using HivePlannerContext - this.corrScalarRexSQWithAgg = corrScalarRexSQWithAgg; + this.subqueryConfig = new SubqueryConf(corrScalarRexSQWithAgg, scalarAggNoGbyWindowing); } public T unwrap(Class clazz) { @@ -52,8 +54,8 @@ public HivePlannerContext(HiveAlgorithmsConf algoConfig, HiveRulesRegistry regis if (clazz.isInstance(calciteConfig)) { return clazz.cast(calciteConfig); } - if(clazz.isInstance(corrScalarRexSQWithAgg)) { - return clazz.cast(corrScalarRexSQWithAgg); + if(clazz.isInstance(subqueryConfig)) { + return clazz.cast(subqueryConfig); } return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/SubqueryConf.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/SubqueryConf.java new file mode 100644 index 0000000000..ce608b0063 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/SubqueryConf.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite; + +import org.apache.calcite.rel.RelNode; + +import java.util.Set; + +public class SubqueryConf{ + + private Set corrScalarRexSQWithAgg; + private Set scalarAggWithoutGbyWindowing; + + + public SubqueryConf(Set corrScalarRexSQWithAgg, + Set scalarAggWithoutGbyWindowing) { + this.corrScalarRexSQWithAgg = corrScalarRexSQWithAgg; + this.scalarAggWithoutGbyWindowing = scalarAggWithoutGbyWindowing; + } + + public Set getCorrScalarRexSQWithAgg() { + return corrScalarRexSQWithAgg; + } + public Set getScalarAggWithoutGbyWindowing() { + return scalarAggWithoutGbyWindowing; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java index c692cc0d4c..83d3f7436d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSubQueryRemoveRule.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveSubQRemoveRelBuilder; +import org.apache.hadoop.hive.ql.optimizer.calcite.SubqueryConf; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; /** @@ -99,11 +100,12 @@ public void onMatch(RelOptRuleCall call) { final int fieldCount = builder.peek().getRowType().getFieldCount(); assert(filter instanceof HiveFilter); - Set corrScalarQueries = filter.getCluster().getPlanner().getContext().unwrap(Set.class); - boolean isCorrScalarQuery = corrScalarQueries.contains(e.rel); + SubqueryConf subqueryConfig = filter.getCluster().getPlanner().getContext().unwrap(SubqueryConf.class); + boolean isCorrScalarQuery = subqueryConfig.getCorrScalarRexSQWithAgg().contains(e.rel); + boolean hasNoWindowingAndNoGby = subqueryConfig.getScalarAggWithoutGbyWindowing().contains(e.rel); final RexNode target = apply(e, HiveFilter.getVariablesSet(e), logic, - builder, 1, fieldCount, isCorrScalarQuery); + builder, 1, fieldCount, isCorrScalarQuery, hasNoWindowingAndNoGby); final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target); builder.filter(shuttle.apply(filter.getCondition())); builder.project(fields(builder, filter.getRowType().getFieldCount())); @@ -122,11 +124,12 @@ else if(relNode instanceof Project) { builder.push(project.getInput()); final int fieldCount = builder.peek().getRowType().getFieldCount(); - Set corrScalarQueries = project.getCluster().getPlanner().getContext().unwrap(Set.class); - boolean isCorrScalarQuery = corrScalarQueries.contains(e.rel); + SubqueryConf subqueryConfig = project.getCluster().getPlanner().getContext().unwrap(SubqueryConf.class); + boolean isCorrScalarQuery = subqueryConfig.getCorrScalarRexSQWithAgg().contains(e.rel); + boolean hasNoWindowingAndNoGby = subqueryConfig.getScalarAggWithoutGbyWindowing().contains(e.rel); final RexNode target = apply(e, HiveFilter.getVariablesSet(e), - logic, builder, 1, fieldCount, isCorrScalarQuery); + logic, builder, 1, fieldCount, isCorrScalarQuery, hasNoWindowingAndNoGby); final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target); builder.project(shuttle.apply(project.getProjects()), project.getRowType().getFieldNames()); @@ -165,28 +168,32 @@ private SqlTypeName getAggTypeForScalarSub(RexSubQuery e) { protected RexNode apply(RexSubQuery e, Set variablesSet, RelOptUtil.Logic logic, HiveSubQRemoveRelBuilder builder, int inputCount, int offset, - boolean isCorrScalarAgg) { + boolean isCorrScalarAgg, + boolean hasNoWindowingAndNoGby ) { switch (e.getKind()) { case SCALAR_QUERY: - builder.push(e.rel); - // returns single row/column - builder.aggregate(builder.groupKey(), - builder.count(false, "cnt")); - - SqlFunction countCheck = new SqlFunction("sq_count_check", SqlKind.OTHER_FUNCTION, ReturnTypes.BIGINT, - InferTypes.RETURN_TYPE, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION); - - // we create FILTER (sq_count_check(count()) <= 1) instead of PROJECT because RelFieldTrimmer - // ends up getting rid of Project since it is not used further up the tree - builder.filter(builder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, - builder.call(countCheck, builder.field("cnt")), - builder.literal(1))); - if( !variablesSet.isEmpty()) - { - builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); + // if scalar query has aggregate and no windowing and no gby avoid adding sq_count_check + // since it is guaranteed to produce at most one row + if(!hasNoWindowingAndNoGby) { + builder.push(e.rel); + // returns single row/column + builder.aggregate(builder.groupKey(), builder.count(false, "cnt")); + + SqlFunction countCheck = + new SqlFunction("sq_count_check", SqlKind.OTHER_FUNCTION, ReturnTypes.BIGINT, + InferTypes.RETURN_TYPE, OperandTypes.NUMERIC, SqlFunctionCategory.USER_DEFINED_FUNCTION); + + // we create FILTER (sq_count_check(count()) <= 1) instead of PROJECT because RelFieldTrimmer + // ends up getting rid of Project since it is not used further up the tree + builder.filter(builder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, + builder.call(countCheck, builder.field("cnt")), builder.literal(1))); + if (!variablesSet.isEmpty()) { + builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); + } else + builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); + + offset++; } - else - builder.join(JoinRelType.INNER, builder.literal(true), variablesSet); if(isCorrScalarAgg) { // Transformation : // Outer Query Left Join (inner query) on correlated predicate and preserve rows only from left side. @@ -218,10 +225,8 @@ protected RexNode apply(RexSubQuery e, Set variablesSet, //Transformation is to left join for correlated predicates and inner join otherwise, // but do a count on inner side before that to make sure it generates atmost 1 row. - builder.push(e.rel); builder.join(JoinRelType.LEFT, builder.literal(true), variablesSet); - offset++; return field(builder, inputCount, offset); case IN: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 5d640be914..fa96e94f64 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -1299,6 +1299,7 @@ private RowResolver genRowResolver(Operator op, QB qb) { // this is to keep track if a subquery is correlated and contains aggregate // since this is special cased when it is rewritten in SubqueryRemoveRule Set corrScalarRexSQWithAgg = new HashSet(); + Set scalarAggNoGbyNoWin = new HashSet(); // TODO: Do we need to keep track of RR, ColNameToPosMap for every op or // just last one. @@ -1332,7 +1333,7 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu Boolean.FALSE.toString()); CalciteConnectionConfig calciteConfig = new CalciteConnectionConfigImpl(calciteConfigProperties); HivePlannerContext confContext = new HivePlannerContext(algorithmsConf, registry, calciteConfig, - corrScalarRexSQWithAgg); + corrScalarRexSQWithAgg, scalarAggNoGbyNoWin); RelOptPlanner planner = HiveVolcanoPlanner.createPlanner(confContext); final RexBuilder rexBuilder = cluster.getRexBuilder(); final RelOptCluster optCluster = RelOptCluster.create(planner, rexBuilder); @@ -2425,8 +2426,8 @@ private RelNode genFilterRelNode(ASTNode filterExpr, RelNode srcRel, } private void subqueryRestrictionCheck(QB qb, ASTNode searchCond, RelNode srcRel, - boolean forHavingClause, - Set corrScalarQueries) throws SemanticException { + boolean forHavingClause, Set corrScalarQueries, + Set scalarQueriesWithAggNoWinNoGby) throws SemanticException { List subQueriesInOriginalTree = SubQueryUtils.findSubQueries(searchCond); ASTNode clonedSearchCond = (ASTNode) SubQueryUtils.adaptor.dupTree(searchCond); @@ -2461,18 +2462,25 @@ private void subqueryRestrictionCheck(QB qb, ASTNode searchCond, RelNode srcRel, String havingInputAlias = null; - boolean isCorrScalarWithAgg = subQuery.subqueryRestrictionsCheck(inputRR, forHavingClause, havingInputAlias); - if(isCorrScalarWithAgg) { + boolean [] subqueryConfig = {false, false}; + subQuery.subqueryRestrictionsCheck(inputRR, forHavingClause, + havingInputAlias, subqueryConfig); + if(subqueryConfig[0]) { corrScalarQueries.add(originalSubQueryAST); } + if(subqueryConfig[1]) { + scalarQueriesWithAggNoWinNoGby.add(originalSubQueryAST); + } } } private boolean genSubQueryRelNode(QB qb, ASTNode node, RelNode srcRel, boolean forHavingClause, Map subQueryToRelNode) throws SemanticException { Set corrScalarQueriesWithAgg = new HashSet(); + Set scalarQueriesWithAggNoWinNoGby= new HashSet(); //disallow subqueries which HIVE doesn't currently support - subqueryRestrictionCheck(qb, node, srcRel, forHavingClause, corrScalarQueriesWithAgg); + subqueryRestrictionCheck(qb, node, srcRel, forHavingClause, corrScalarQueriesWithAgg, + scalarQueriesWithAggNoWinNoGby); Deque stack = new ArrayDeque(); stack.push(node); @@ -2502,9 +2510,14 @@ private boolean genSubQueryRelNode(QB qb, ASTNode node, RelNode srcRel, boolean subQueryToRelNode.put(next, subQueryRelNode); //keep track of subqueries which are scalar, correlated and contains aggregate // subquery expression. This will later be special cased in Subquery remove rule + // for correlated scalar queries with aggregate we have take care of the case where + // inner aggregate happens on empty result if(corrScalarQueriesWithAgg.contains(next)) { corrScalarRexSQWithAgg.add(subQueryRelNode); } + if(scalarQueriesWithAggNoWinNoGby.contains(next)) { + scalarAggNoGbyNoWin.add(subQueryRelNode); + } isSubQuery = true; break; default: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java index ec527410e6..845d316fd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java @@ -526,9 +526,9 @@ public ASTNode getOriginalSubQueryASTForRewrite() { * @return true if it is correlated scalar subquery with an aggregate * @throws SemanticException */ - boolean subqueryRestrictionsCheck(RowResolver parentQueryRR, + void subqueryRestrictionsCheck(RowResolver parentQueryRR, boolean forHavingClause, - String outerQueryAlias) + String outerQueryAlias, boolean [] subqueryConfig) throws SemanticException { ASTNode insertClause = getChildFromSubqueryAST("Insert", HiveParser.TOK_INSERT); @@ -568,31 +568,31 @@ boolean subqueryRestrictionsCheck(RowResolver parentQueryRR, hasCount = hasCount | ( r == 2 ); } - - - ASTNode whereClause = SubQueryUtils.subQueryWhere(insertClause); - - if ( whereClause == null ) { - return false; - } - ASTNode searchCond = (ASTNode) whereClause.getChild(0); - List conjuncts = new ArrayList(); - SubQueryUtils.extractConjuncts(searchCond, conjuncts); - - ConjunctAnalyzer conjunctAnalyzer = new ConjunctAnalyzer(parentQueryRR, - forHavingClause, outerQueryAlias); - + // figure out correlation and presence of non-equi join predicate boolean hasCorrelation = false; boolean hasNonEquiJoinPred = false; - for(ASTNode conjunctAST : conjuncts) { - Conjunct conjunct = conjunctAnalyzer.analyzeConjunct(conjunctAST); - if(conjunct.isCorrelated()){ - hasCorrelation = true; - } - if ( conjunct.eitherSideRefersBoth() && conjunctAST.getType() != HiveParser.EQUAL) { - hasNonEquiJoinPred = true; + + ASTNode whereClause = SubQueryUtils.subQueryWhere(insertClause); + if ( whereClause != null ) { + ASTNode searchCond = (ASTNode) whereClause.getChild(0); + List conjuncts = new ArrayList(); + SubQueryUtils.extractConjuncts(searchCond, conjuncts); + + ConjunctAnalyzer conjunctAnalyzer = + new ConjunctAnalyzer(parentQueryRR, forHavingClause, outerQueryAlias); + + for (ASTNode conjunctAST : conjuncts) { + Conjunct conjunct = conjunctAnalyzer.analyzeConjunct(conjunctAST); + if (conjunct.isCorrelated()) { + hasCorrelation = true; + } + if (conjunct.eitherSideRefersBoth() && conjunctAST.getType() != HiveParser.EQUAL) { + hasNonEquiJoinPred = true; + } } } + + // figure out if there is group by boolean noImplicityGby = true; if ( insertClause.getChild(1).getChildCount() > 3 && insertClause.getChild(1).getChild(3).getType() == HiveParser.TOK_GROUPBY ) { @@ -643,22 +643,24 @@ else if(operator.getType() == SubQueryType.SCALAR) { subQueryAST, "Scalar subqueries with aggregate cannot have non-equi join predicate")); } + if(!hasWindowing) { + subqueryConfig[1] = true; + } if(hasCorrelation) { - return true; + subqueryConfig[0] = true; } } else if(operator.getType() == SubQueryType.IN) { if(hasCount && hasCorrelation) { - return true; + subqueryConfig[0] = true; } } else if (operator.getType() == SubQueryType.NOT_IN) { if(hasCorrelation) { - return true; + subqueryConfig[0] = true; } } } - return false; } void validateAndRewriteAST(RowResolver outerQueryRR, diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBORuleFiredOnlyOnce.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBORuleFiredOnlyOnce.java index 4823950e9f..884e034731 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBORuleFiredOnlyOnce.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/TestCBORuleFiredOnlyOnce.java @@ -61,7 +61,8 @@ public void testRuleFiredOnlyOnce() { // Create rules registry to not trigger a rule more than once HiveRulesRegistry registry = new HiveRulesRegistry(); - HivePlannerContext context = new HivePlannerContext(null, registry, null, null); + HivePlannerContext context = new HivePlannerContext(null, registry, null, + null, null); HepPlanner planner = new HepPlanner(programBuilder.build(), context); // Cluster