Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (revision b9f634a50405df496eef3a5cd241531c6df8deb9) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (revision ) @@ -32,6 +32,8 @@ import org.apache.hadoop.hive.ql.exec.ExtractOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.ForwardOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -51,8 +53,11 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; @@ -63,7 +68,7 @@ */ public class ReduceSinkDeDuplication implements Transform{ - protected ParseContext pGraphContext; + private ParseContext pGraphContext; @Override public ParseContext transform(ParseContext pctx) throws SemanticException { @@ -73,7 +78,9 @@ ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext); Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", "RS%.*RS%"), ReduceSinkDeduplicateProcFactory + opRules.put(new RuleRegExp("R1", "RS%.*RS%GBY%"), ReduceSinkDeduplicateProcFactory + .getReducerGbyReducerGbyProc()); + opRules.put(new RuleRegExp("R2", "RS%.*RS%"), ReduceSinkDeduplicateProcFactory .getReducerReducerProc()); // The dispatcher fires the processor corresponding to the closest matching @@ -120,6 +127,9 @@ static class ReduceSinkDeduplicateProcFactory { + public static NodeProcessor getReducerGbyReducerGbyProc() { + return new ReducerReducerGbyProc(); + } public static NodeProcessor getReducerReducerProc() { return new ReducerReducerProc(); @@ -152,7 +162,13 @@ if(ctx.contains(childReduceSink)) { return null; } - + + List> childOp = childReduceSink.getChildOperators(); + if (childOp != null && childOp.size() == 1 && childOp.get(0) instanceof GroupByOperator) { + ctx.addRejectedReduceSinkOperator(childReduceSink); + return null; + } + ParseContext pGraphContext = ctx.getPctx(); HashMap childColumnMapping = getPartitionAndKeyColumnMapping(childReduceSink); ReduceSinkOperator parentRS = null; @@ -433,7 +449,7 @@ if (!allowed) { return null; } - + if ((start instanceof ScriptOperator) && !HiveConf.getBoolVar(pGraphContext.getConf(), HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST)) { @@ -448,6 +464,241 @@ return null; } } - + + static class ReducerReducerGbyProc implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx; + ParseContext pGraphContext = ctx.getPctx(); + + if (pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { + // not yet + return false; - } + } + boolean trustScript = pGraphContext.getConf().getBoolVar(HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST); + + GroupByOperator childGroupBy = (GroupByOperator)nd; + for (AggregationDesc aggr : childGroupBy.getConf().getAggregators()) { + if (aggr.getDistinct()) { + return false; -} + } + } + ReduceSinkOperator childRS = (ReduceSinkOperator) getSingleParent(childGroupBy); + Operator parent = getSingleParent(childRS); + + GroupByOperator parentGroupBy = null; + if (parent instanceof GroupByOperator) { + // map aggregation group by + parentGroupBy = (GroupByOperator) parent; + parent = getSingleParent(parent); + } + + ReduceSinkOperator[] possibleRS = findPossibleReduceSinks(parent, trustScript); + if (possibleRS == null) { + return false; + } + + for (int tag = 0; tag < possibleRS.length; tag++) { + ReduceSinkOperator parentRS = possibleRS[tag]; + boolean[] result = checkStatus(childRS, parentRS, tag); + if (result == null) { + continue; + } + if(result[0]) { + parentRS.getConf().setOrder(childRS.getConf().getOrder()); + } + if(result[1]) { + parentRS.getConf().setNumReducers(childRS.getConf().getNumReducers()); + } + if (parentGroupBy != null) { + removeOperator(childGroupBy); + removeOperator(childRS); + return true; + } + Map mapping = childRS.getColumnExprMap(); + for (AggregationDesc aggr : childGroupBy.getConf().getAggregators()) { + aggr.setParameters(backtrack(aggr.getParameters(), mapping)); + } + childGroupBy.getConf().setKeys(backtrack(childGroupBy.getConf().getKeys(), mapping)); + + for (String output : childGroupBy.getConf().getOutputColumnNames()) { + ExprNodeDesc source = backtrack(childGroupBy.getColumnExprMap().get(output), mapping); + if (source != null) { + childGroupBy.getColumnExprMap().put(output, source); + } else { + childGroupBy.getColumnExprMap().remove(output); + } + } + childGroupBy.getConf().setMode(GroupByDesc.Mode.HASH); + removeOperator(childRS); + return true; + } + return false; + } + + private boolean[] checkStatus(ReduceSinkOperator childRS, ReduceSinkOperator parentRS, int tag) { + List ckeys = childRS.getConf().getKeyCols(); + List pkeys = parentRS.getConf().getKeyCols(); + if (ckeys != null && !ckeys.isEmpty() && !sameKeys(ckeys, pkeys, childRS, parentRS, tag)) { + return null; + } + List cpars = childRS.getConf().getPartitionCols(); + List ppars = parentRS.getConf().getPartitionCols(); + if (cpars != null && !cpars.isEmpty() && !sameKeys(cpars, ppars, childRS, parentRS, tag)) { + return null; + } + boolean moveChildRSOrderToParent = false; + String corder = childRS.getConf().getOrder(); + String porder = parentRS.getConf().getOrder(); + if (corder != null && !corder.trim().equals("")) { + if (porder == null || !corder.trim().equals(porder.trim())) { + return null; + } + } else if (porder == null || porder.trim().equals("")) { + moveChildRSOrderToParent = true; + } + boolean moveChildReducerNumToParent = false; + int creduce = childRS.getConf().getNumReducers(); + int preduce = parentRS.getConf().getNumReducers(); + if (creduce != preduce) { + if (creduce >= 0 && preduce >= 0) { + return null; + } + if (preduce == -1) { + moveChildReducerNumToParent = true; + } + } + return new boolean[] {moveChildRSOrderToParent, moveChildReducerNumToParent}; + } + + private boolean sameKeys(List einits, List eterms, Operator init, Operator terminal, int tag) { + if (eterms == null || eterms.size() != einits.size()) { + return false; + } + for (int i = 0; i < einits.size(); i++) { + System.out.println(backtrack(einits.get(i), init, terminal, tag) + " --> " + eterms.get(i)); + if (!backtrack(einits.get(i), init, terminal, tag).isSame(eterms.get(i))) { + return false; + } + } + return true; + } + + private ArrayList backtrack(List exprs, Map mapping) { + ArrayList result = new ArrayList(); + for (ExprNodeDesc expr : exprs) { + result.add(backtrack(expr, mapping)); + } + return result; + } + + private ExprNodeDesc backtrack(ExprNodeDesc source, Map mapping) { + if (source instanceof ExprNodeColumnDesc) { + ExprNodeColumnDesc column = (ExprNodeColumnDesc) source; + return mapping.get(column.getColumn()); + } + if (source instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc function = (ExprNodeGenericFuncDesc) source; + function.setChildExprs(backtrack(function.getChildren(), mapping)); + } + return source; + } + + private ExprNodeDesc backtrack(ExprNodeDesc source, Operator current, Operator terminal, int joinTag) { + if (source instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc function = (ExprNodeGenericFuncDesc) source.clone(); + ArrayList params = new ArrayList(); + for (ExprNodeDesc param : function.getChildren()) { + params.add(backtrack(param, current, terminal, joinTag)); + } + function.setChildExprs(params); + return function; + } + Operator parent = getSingleParent(current, joinTag); + if (parent == null) { + return source; + } + Map mapping = parent.getColumnExprMap(); + if (mapping != null && source instanceof ExprNodeColumnDesc) { + ExprNodeColumnDesc column = (ExprNodeColumnDesc) source; + ExprNodeDesc resolved = mapping.get(column.getColumn()); + source = resolved == null ? source : resolved; + } + return parent == terminal ? source : backtrack(source, parent, terminal, joinTag); + } + + private void removeOperator(Operator operator) { + List> parents = operator.getParentOperators(); + List> children = operator.getChildOperators(); + + if (parents == null || parents.size() != 1 || children == null || children.size() != 1) { + throw new IllegalStateException("never"); + } + + children.get(0).replaceParent(operator, parents.get(0)); + parents.get(0).replaceChild(operator, children.get(0)); + + operator.setParentOperators(null); + operator.setChildOperators(null); + } + + private Operator getSingleParent(Operator operator) { + if (operator.getParentOperators() != null && operator.getParentOperators().size() == 1) { + return operator.getParentOperators().get(0); + } + return null; + } + + private Operator getSingleParent(Operator operator, int joinTag) { + if (operator instanceof JoinOperator) { + return operator.getParentOperators().get(joinTag); + } + return getSingleParent(operator); + } + + private ReduceSinkOperator[] findPossibleReduceSinks(Operator start, boolean trustScript) { + Operator cursor = getSingleParent(start); + for (; cursor != null; cursor = getSingleParent(cursor)) { + if (cursor instanceof ReduceSinkOperator) { + return new ReduceSinkOperator[] {(ReduceSinkOperator) cursor}; + } + if (cursor instanceof JoinOperator) { + return getReduceSinkOperators((JoinOperator) cursor); + } + boolean allowed = false; + if ((cursor instanceof SelectOperator) + || (cursor instanceof FilterOperator) + || (cursor instanceof ExtractOperator) + || (cursor instanceof ForwardOperator) + || (cursor instanceof ScriptOperator && trustScript)) { + allowed = true; + } + if (!allowed) { + return null; + } + } + return null; + } + + private ReduceSinkOperator[] getReduceSinkOperators(JoinOperator join) { + List> parents = join.getParentOperators(); + ReduceSinkOperator[] result = new ReduceSinkOperator[parents.size()]; + for (int i = 0; i < result.length; i++) { + Operator cursor = parents.get(i); + for (; cursor != null; cursor = getSingleParent(cursor)) { + if (cursor instanceof ReduceSinkOperator) { + result[i] = (ReduceSinkOperator) cursor; + break; + } + } + if (result[i] == null) { + throw new IllegalStateException("failed to find RS from " + join + " on tag " + i); + } + } + return result; + } + } + } +}