diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java index 02216de..54c4c2d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -33,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import java.util.List; import java.util.Stack; @@ -164,30 +166,40 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, canApplyCtx.setGbyKeysFetchException(true); } for (ExprNodeDesc expr : keyList) { - checkExpression(canApplyCtx, expr); + checkExpression(canApplyCtx, expr, operator, topOp); } } return null; } - private void checkExpression(RewriteCanApplyCtx canApplyCtx, ExprNodeDesc expr){ - if(expr instanceof ExprNodeColumnDesc){ - //Add the group-by keys to RewriteCanApplyCtx's gbKeyNameList list to check later - //if all keys are from index columns - canApplyCtx.getGbKeyNameList().addAll(expr.getCols()); - }else if(expr instanceof ExprNodeGenericFuncDesc){ - ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc)expr; - List childExprs = funcExpr.getChildren(); - for (ExprNodeDesc childExpr : childExprs) { - if(childExpr instanceof ExprNodeColumnDesc){ - canApplyCtx.getGbKeyNameList().addAll(expr.getCols()); - canApplyCtx.getSelectColumnsList().add(((ExprNodeColumnDesc) childExpr).getColumn()); - }else if(childExpr instanceof ExprNodeGenericFuncDesc){ - checkExpression(canApplyCtx, childExpr); - } - } - } - } + private void checkExpression(RewriteCanApplyCtx canApplyCtx, ExprNodeDesc expr, + Operator operator, Operator topOp) + throws SemanticException { + if (expr instanceof ExprNodeColumnDesc) { + // Add the group-by keys to RewriteCanApplyCtx's gbKeyNameList list to + // check later + // if all keys are from index columns + expr = ExprNodeDescUtils.backtrack(expr, operator, topOp); + if (expr instanceof ExprNodeColumnDesc) { + canApplyCtx.getGbKeyNameList().addAll(expr.getCols()); + } else { + checkExpression(canApplyCtx, expr, operator, topOp); + } + } else if (expr instanceof ExprNodeConstantDesc) { + canApplyCtx.getGbKeyNameList().add(((ExprNodeConstantDesc) (expr)).getFoldedFromCol()); + } else if (expr instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc) expr; + List childExprs = funcExpr.getChildren(); + for (ExprNodeDesc childExpr : childExprs) { + if (childExpr instanceof ExprNodeColumnDesc) { + canApplyCtx.getGbKeyNameList().addAll(expr.getCols()); + canApplyCtx.getSelectColumnsList().add(((ExprNodeColumnDesc) childExpr).getColumn()); + } else { + checkExpression(canApplyCtx, childExpr, operator, topOp); + } + } + } + } } public static CheckGroupByProc canApplyOnGroupByOperator(TableScanOperator topOp) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java index 0f06ec9..40fa847 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java @@ -378,7 +378,7 @@ private void rewriteOriginalQuery() throws SemanticException { RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb, canApplyCtx.getIndexTableName(), canApplyCtx.getAlias(), - canApplyCtx.getAllColumns(), canApplyCtx.getAggFunction()); + canApplyCtx.getAllColumns(), canApplyCtx.getAggFunction(), canApplyCtx.getGbKeyNameList()); rewriteQueryCtx.invokeRewriteQueryProc(topOp); parseContext = rewriteQueryCtx.getParseContext(); parseContext.setOpParseCtx((LinkedHashMap, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java index 74614f3..0efb710 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java @@ -74,18 +74,27 @@ private RewriteQueryUsingAggregateIndex() { //this prevents the class from getting instantiated } + // for SEL1-SEL2-GRY-...-SEL3 + // we need to modify SelectOperator which precedes the GroupByOperator, e.g., SEL1, SEL2 + // and keep SelectOperator which comes after the GroupByOperator, e.g., SEL3 + private static boolean precedeGroupbyOp(Stack stack) { + for (Node node : stack) { + if (node instanceof GroupByOperator) + return false; + } + return true; + } + private static class NewQuerySelectSchemaProc implements NodeProcessor { public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { SelectOperator operator = (SelectOperator)nd; RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; - List> childOps = operator.getChildOperators(); - Operator childOp = childOps.iterator().next(); //we need to set the colList, outputColumnNames, colExprMap, // rowSchema for only that SelectOperator which precedes the GroupByOperator // count(indexed_key_column) needs to be replaced by sum(`_count_of_indexed_key_column`) - if (childOp instanceof GroupByOperator){ + if (precedeGroupbyOp(stack)) { List selColList = operator.getConf().getColList(); selColList.add(rewriteQueryCtx.getAggrExprNode()); @@ -94,6 +103,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, operator.getConf().getOutputColumnNames(); selOutputColNames.add(rewriteQueryCtx.getAggrExprNode().getColumn()); + operator.getColumnExprMap().put(rewriteQueryCtx.getAggrExprNode().getColumn(), + rewriteQueryCtx.getAggrExprNode()); + RowSchema selRS = operator.getSchema(); List selRSSignature = selRS.getSignature(); @@ -229,22 +241,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, //We need to replace the GroupByOperator which is in //groupOpToInputTables map with the new GroupByOperator - if(rewriteQueryCtx.getParseContext().getGroupOpToInputTables().containsKey(operator)){ - List gbyKeyList = operator.getConf().getKeys(); - String gbyKeys = null; - Iterator gbyKeyListItr = gbyKeyList.iterator(); - while(gbyKeyListItr.hasNext()){ - ExprNodeDesc expr = gbyKeyListItr.next().clone(); - if(expr instanceof ExprNodeColumnDesc){ - ExprNodeColumnDesc colExpr = (ExprNodeColumnDesc)expr; - gbyKeys = colExpr.getColumn(); - if(gbyKeyListItr.hasNext()){ - gbyKeys = gbyKeys + ","; - } - } + if (rewriteQueryCtx.getParseContext().getGroupOpToInputTables().containsKey(operator)) { + Iterator gbyKeyListItr = rewriteQueryCtx.getGbKeyNameList().iterator(); + String gbyKeys = gbyKeyListItr.next(); + while (gbyKeyListItr.hasNext()) { + gbyKeys = gbyKeys + "," + gbyKeyListItr.next(); } - //the query contains the sum aggregation GenericUDAF String selReplacementCommand = "select sum(`" + rewriteQueryCtx.getAggregateFunction() + "`)" diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java index d699308..77b3ce5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java @@ -55,7 +55,8 @@ public final class RewriteQueryUsingAggregateIndexCtx implements NodeProcessorCtx { private RewriteQueryUsingAggregateIndexCtx(ParseContext parseContext, Hive hiveDb, - String indexTableName, String alias, Set columns, String aggregateFunction) { + String indexTableName, String alias, Set columns, String aggregateFunction, + Set gbKeyNameList) { this.parseContext = parseContext; this.hiveDb = hiveDb; this.indexTableName = indexTableName; @@ -63,13 +64,14 @@ private RewriteQueryUsingAggregateIndexCtx(ParseContext parseContext, Hive hiveD this.aggregateFunction = aggregateFunction; this.columns = columns; this.opc = parseContext.getOpParseCtx(); + this.gbKeyNameList = gbKeyNameList; } public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseContext, Hive hiveDb, String indexTableName, String alias, - Set columns, String aggregateFunction) { + Set columns, String aggregateFunction, Set gbKeyNameList) { return new RewriteQueryUsingAggregateIndexCtx( - parseContext, hiveDb, indexTableName, alias, columns, aggregateFunction); + parseContext, hiveDb, indexTableName, alias, columns, aggregateFunction, gbKeyNameList); } @@ -84,6 +86,7 @@ public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseC private final String aggregateFunction; private final Set columns; private ExprNodeColumnDesc aggrExprNode = null; + private final Set gbKeyNameList; public Map, OpParseContext> getOpc() { return opc; @@ -116,6 +119,10 @@ public void setAggrExprNode(ExprNodeColumnDesc aggrExprNode) { public ExprNodeColumnDesc getAggrExprNode() { return aggrExprNode; } + + public Set getGbKeyNameList() { + return gbKeyNameList; + } /** * Walk the original operator tree using the {@link DefaultGraphWalker} using the rules.