Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 927179) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -226,6 +226,7 @@ HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join + HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), ; public final String varname; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 927179) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy) @@ -64,6 +64,9 @@ } transformations.add(new UnionProcessor()); transformations.add(new JoinReorder()); + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { + transformations.add(new ReduceSinkDeDuplication()); + } } /** Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (revision 0) @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; + +/** + * If two reducer sink operators share the same partition/sort columns, we + * should merge them. This should happen after map join optimization because map + * join optimization will remove reduce sink operators. + */ +public class ReduceSinkDeDuplication implements Transform{ + + protected ParseContext pGraphContext; + + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + pGraphContext = pctx; + + // generate pruned column list for all relevant operators + ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext); + + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "RS%.*RS%"), ReduceSinkDeduplicateProcFactory + .getReducerReducerProc()); + opRules.put(new RuleRegExp("R2", "GBY%.*RS%"), ReduceSinkDeduplicateProcFactory + .getRejectedReduceSinkProc()); + + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(ReduceSinkDeduplicateProcFactory + .getDefaultProc(), opRules, cppCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pGraphContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + return pGraphContext; + } + + class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx{ + ParseContext pctx; + List rejectedRSList; + + public ReduceSinkDeduplicateProcCtx(ParseContext pctx) { + rejectedRSList = new ArrayList(); + this.pctx = pctx; + } + + public boolean contains (ReduceSinkOperator rsOp) { + return rejectedRSList.contains(rsOp); + } + + public void addRejectedReduceSinkOperator(ReduceSinkOperator rsOp) { + if (!rejectedRSList.contains(rsOp)) { + rejectedRSList.add(rsOp); + } + } + + public ParseContext getPctx() { + return pctx; + } + + public void setPctx(ParseContext pctx) { + this.pctx = pctx; + } + } + + + static class ReduceSinkDeduplicateProcFactory { + + + public static NodeProcessor getReducerReducerProc() { + return new ReducerReducerProc(); + } + + public static NodeProcessor getDefaultProc() { + return new DefaultProc(); + } + + public static NodeProcessor getRejectedReduceSinkProc() { + return new RejectedReduceSinkProc(); + } + + /* + * do nothing. + */ + static class DefaultProc implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + return null; + } + } + + /* + * add the reduce sink operator to the rejected list (it should not be + * merged with its parent reduce sink operator). + */ + static class RejectedReduceSinkProc implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx; + ctx.addRejectedReduceSinkOperator((ReduceSinkOperator)nd); + return null; + } + } + + static class ReducerReducerProc implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx; + ReduceSinkOperator childReduceSink = (ReduceSinkOperator)nd; + + if(ctx.contains(childReduceSink)) { + return null; + } + + ParseContext pGraphContext = ctx.getPctx(); + HashMap childColumnMapping = getPartitionAndKeyColumnMapping(childReduceSink); + ReduceSinkOperator parentRS = null; + parentRS = findSingleParentReduceSink(childReduceSink); + if (parentRS == null) { + return null; + } + HashMap parentColumnMapping = getPartitionAndKeyColumnMapping(parentRS); + Operator stopBacktrackFlagOp = null; + if (parentRS.getParentOperators() == null) { + stopBacktrackFlagOp = parentRS; + } else if (parentRS.getParentOperators().size() != 1) { + return null; + } else { + stopBacktrackFlagOp = parentRS.getParentOperators().get(0); + } + + boolean succeed = backTrackColumnNames(childColumnMapping, childReduceSink, stopBacktrackFlagOp, pGraphContext); + if (!succeed) { + return null; + } + succeed = backTrackColumnNames(parentColumnMapping, parentRS, stopBacktrackFlagOp, pGraphContext); + if (!succeed) { + return null; + } + + boolean same = compareReduceSink(childReduceSink, parentRS, childColumnMapping, parentColumnMapping); + if (!same) { + return null; + } + replaceReduceSinkWithSelectOperator(childReduceSink); + return null; + } + + private void replaceReduceSinkWithSelectOperator( + ReduceSinkOperator childReduceSink) { + + } + + private boolean compareReduceSink(ReduceSinkOperator childReduceSink, + ReduceSinkOperator parentRS, + HashMap childColumnMapping, + HashMap parentColumnMapping) { + java.util.ArrayList childPartitionCols = childReduceSink.getConf().getPartitionCols(); + java.util.ArrayList parentPartitionCols = parentRS.getConf().getPartitionCols(); + + boolean ret = compareExprNodes(childColumnMapping, parentColumnMapping, + childPartitionCols, parentPartitionCols); + if (!ret) { + return false; + } + + java.util.ArrayList childReduceKeyCols = childReduceSink.getConf().getKeyCols(); + java.util.ArrayList parentReduceKeyCols = parentRS.getConf().getKeyCols(); + ret = compareExprNodes(childColumnMapping, parentColumnMapping, + childReduceKeyCols, parentReduceKeyCols); + if (!ret) { + return false; + } + + return true; + } + + private boolean compareExprNodes(HashMap childColumnMapping, + HashMap parentColumnMapping, + java.util.ArrayList childColExprs, + java.util.ArrayList parentColExprs) { + + boolean childEmpty = childColExprs == null || childColExprs.size() == 0; + boolean parentEmpty = parentColExprs == null || parentColExprs.size() == 0; + + if (childEmpty) { //both empty + return true; + } + + //child not empty here + if (parentEmpty) { // child not empty, but parent empty + return false; + } + + if (childColExprs.size() != parentColExprs.size()) { + return false; + } + int i = 0; + while (i < childColExprs.size()) { + ExprNodeDesc childExpr = childColExprs.get(i); + ExprNodeDesc parentExpr = parentColExprs.get(i); + + if ((childExpr instanceof ExprNodeColumnDesc) + && (parentExpr instanceof ExprNodeColumnDesc)) { + String childCol = childColumnMapping + .get(((ExprNodeColumnDesc) childExpr).getColumn()); + String parentCol = parentColumnMapping + .get(((ExprNodeColumnDesc) childExpr).getColumn()); + + if (!childCol.equals(parentCol)) { + return false; + } + } else { + return false; + } + i++; + } + return true; + } + + /* + * back track column names to find their corresponding original column + * names. Only allow simple operators like 'select column' or filter. + */ + private boolean backTrackColumnNames( + HashMap columnMapping, + ReduceSinkOperator reduceSink, + Operator stopBacktrackFlagOp, ParseContext pGraphContext) { + Operator startOperator = reduceSink; + while (startOperator != null && startOperator != stopBacktrackFlagOp) { + startOperator = startOperator.getParentOperators().get(0); + Map colExprMap = startOperator.getColumnExprMap(); + if(colExprMap == null || colExprMap.size()==0) { + continue; + } + Iterator keyIter = columnMapping.keySet().iterator(); + while (keyIter.hasNext()) { + String key = keyIter.next(); + String oldCol = columnMapping.get(key); + ExprNodeDesc exprNode = colExprMap.get(oldCol); + if(exprNode instanceof ExprNodeColumnDesc) { + String col = ((ExprNodeColumnDesc)exprNode).getColumn(); + columnMapping.put(key, col); + } else { + return false; + } + } + } + + return true; + } + + private HashMap getPartitionAndKeyColumnMapping(ReduceSinkOperator reduceSink) { + HashMap columnMapping = new HashMap (); + ReduceSinkDesc reduceSinkDesc = reduceSink.getConf(); + java.util.ArrayList partitionCols = reduceSinkDesc.getPartitionCols(); + java.util.ArrayList reduceKeyCols = reduceSinkDesc.getKeyCols(); + if(partitionCols != null) { + for (ExprNodeDesc desc : partitionCols) { + List cols = desc.getCols(); + for(String col : cols) { + columnMapping.put(col, col); + } + } + } + if(reduceKeyCols != null) { + for (ExprNodeDesc desc : reduceKeyCols) { + List cols = desc.getCols(); + for(String col : cols) { + columnMapping.put(col, col); + } + } + } + return columnMapping; + } + + private ReduceSinkOperator findSingleParentReduceSink(ReduceSinkOperator childReduceSink) { + Operator start = childReduceSink; + while(start != null) { + if (start.getParentOperators() == null + || start.getParentOperators().size() != 1) { + // this potentially is a join operator + return null; + } + start = start.getParentOperators().get(0); + if(start instanceof ReduceSinkOperator) { + return (ReduceSinkOperator)start; + } + } + return null; + } + } + + } +}