diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 6e9c4cd..3a2f711 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -69,6 +69,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; +import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput; import org.apache.tez.runtime.library.output.OnFileSortedOutput; import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; @@ -168,7 +169,7 @@ public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, case BROADCAST_EDGE: dataMovementType = DataMovementType.BROADCAST; logicalOutputClass = OnFileUnorderedKVOutput.class; - logicalInputClass = MRInput.class; + logicalInputClass = ShuffledUnorderedKVInput.class; break; case SIMPLE_EDGE: diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index 6a83d37..9463b63 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -4,6 +4,8 @@ import java.util.List; import java.util.Stack; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -20,6 +22,8 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { + protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); + /* (non-Javadoc) * This processor addresses the RS-MJ case that occurs in tez on the small/hash * table side of things. The connection between the work that RS will be a part of @@ -37,9 +41,16 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, context.currentRootOperator = null; MapJoinOperator mapJoinOp = (MapJoinOperator)nd; + Operator childOp = mapJoinOp.getChildOperators().get(0); - OperatorchildOp = mapJoinOp.getChildOperators().get(0); ReduceSinkOperator parentRS = (ReduceSinkOperator)stack.get(stack.size() - 2); + + // remember the original parent list before we start modifying it. + if (!context.mapJoinParentMap.containsKey(mapJoinOp)) { + List> parents = new ArrayList(mapJoinOp.getParentOperators()); + context.mapJoinParentMap.put(mapJoinOp, parents); + } + while (childOp != null) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof FileSinkOperator)) { /* @@ -54,6 +65,15 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, BaseWork myWork = context.operatorWorkMap.get(childOp); BaseWork parentWork = context.operatorWorkMap.get(parentRS); + + // set the link between mapjoin and parent vertex + int pos = context.mapJoinParentMap.get(mapJoinOp).indexOf(parentRS); + if (pos == -1) { + throw new SemanticException("Cannot find position of parent in mapjoin"); + } + LOG.debug("Mapjoin "+mapJoinOp+", pos: "+pos+" --> "+parentWork.getName()); + mapJoinOp.getConf().getParentToInput().put(pos, parentWork.getName()); + if (myWork != null) { // link the work with the work associated with the reduce sink that triggered this rule TezWork tezWork = context.currentTask.getWork(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index a53bd5a..088fe79 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -89,6 +90,9 @@ // a map that maintains operator (file-sink or reduce-sink) to work mapping public final Map, BaseWork> operatorWorkMap; + // we need to keep the original list of operators in the map join to know + // what position in the mapjoin the different parent work items will have. + public final Map>> mapJoinParentMap; @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, @@ -106,5 +110,6 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext, this.rootOperators = rootOperators; this.linkOpWithWorkMap = new HashMap, List>(); this.operatorWorkMap = new HashMap, BaseWork>(); + this.mapJoinParentMap = new HashMap>>(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index e609633..cf0ca57 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -47,6 +47,9 @@ private transient String bigTableAlias; + // for tez. used to remember which position maps to which logical input + private Map parentToInput = new HashMap(); + // table alias (small) --> input file name (big) --> target file names (small) private Map>> aliasBucketFileNameMapping; private Map bigTableBucketNumMapping; @@ -74,6 +77,7 @@ public MapJoinDesc(MapJoinDesc clone) { this.bigTableBucketNumMapping = clone.bigTableBucketNumMapping; this.bigTablePartSpecToFileMapping = clone.bigTablePartSpecToFileMapping; this.dumpFilePrefix = clone.dumpFilePrefix; + this.parentToInput = clone.parentToInput; } public MapJoinDesc(final Map> keys, @@ -106,6 +110,14 @@ private void initRetainExprList() { } } + public Map getParentToInput() { + return parentToInput; + } + + public void setParentToInput(Map parentToInput) { + this.parentToInput = parentToInput; + } + public Map> getRetainList() { return retainList; }