diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index 1070d16..9caf79e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -21,13 +21,18 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.mapred.OutputCollector; public class OperatorUtils { + private static final Log LOG = LogFactory.getLog(OperatorUtils.class); + public static Set findOperators(Operator start, Class clazz) { return findOperators(start, clazz, new HashSet()); } @@ -63,11 +68,29 @@ public static void setChildrenCollector(List> c return; } for (Operator op : childOperators) { - if(op.getName().equals(ReduceSinkOperator.getOperatorName())) { //TODO: + if(op.getName().equals(ReduceSinkOperator.getOperatorName())) { ((ReduceSinkOperator)op).setOutputCollector(out); } else { setChildrenCollector(op.getChildOperators(), out); } } } + + public static void setChildrenCollector(List> childOperators, Map outMap) { + if (childOperators == null) { + return; + } + for (Operator op : childOperators) { + if(op.getName().equals(ReduceSinkOperator.getOperatorName())) { + ReduceSinkOperator rs = ((ReduceSinkOperator)op); + if (outMap.containsKey(rs.getConf().getOutputName())) { + LOG.info("Setting output collector: " + rs + " --> " + + rs.getConf().getOutputName()); + rs.setOutputCollector(outMap.get(rs.getConf().getOutputName())); + } + } else { + setChildrenCollector(op.getChildOperators(), outMap); + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index da3503c..e498b8f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -64,9 +64,9 @@ @Override void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, - OutputCollector out){ + Map outMap){ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); - super.init(jconf, mrReporter, inputs, out); + super.init(jconf, mrReporter, inputs, outMap); //Update JobConf using MRInput, info like filename comes via this MRInputLegacy mrInput = getMRInput(inputs); @@ -124,7 +124,7 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in } } - OperatorUtils.setChildrenCollector(mapOp.getChildOperators(), out); + OperatorUtils.setChildrenCollector(mapOp.getChildOperators(), outMap); mapOp.setReporter(reporter); MapredContext.get().setReporter(reporter); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index e9dc16d..d40b557 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -39,7 +39,7 @@ protected JobConf jconf; protected Map inputs; - protected OutputCollector out; + protected Map outMap; public static final Log l4j = LogFactory.getLog(RecordProcessor.class); @@ -63,11 +63,11 @@ * @param out */ void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, - OutputCollector out){ + Map outMap){ this.jconf = jconf; this.reporter = mrReporter; this.inputs = inputs; - this.out = out; + this.outMap = outMap; // Allocate the bean at the beginning - memoryMXBean = ManagementFactory.getMemoryMXBean(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 9fe6ac1..a650d64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -90,9 +90,9 @@ @Override void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, - OutputCollector out){ + Map outMap){ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); - super.init(jconf, mrReporter, inputs, out); + super.init(jconf, mrReporter, inputs, outMap); ObjectCache cache = ObjectCacheFactory.getCache(jconf); @@ -162,8 +162,8 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in if (dummyOps != null) { children.addAll(dummyOps); } - OperatorUtils.setChildrenCollector(children, out); - + OperatorUtils.setChildrenCollector(children, outMap); + reducer.setReporter(reporter); MapredContext.get().setReporter(reporter); @@ -329,10 +329,6 @@ void close(){ if (!abort) { abort = execContext.getIoCxt().getIOExceptions(); } - // No row was processed - if (out == null) { - l4j.trace("Close called no row"); - } try { if (groupKey != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 90826db..b1e22d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.KVOutputCollector; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; @@ -89,17 +91,14 @@ public void run(Map inputs, Map out LOG.info("Running map: " + processorContext.getUniqueIdentifier()); - if(outputs.size() > 1) { - throw new IOException("Cannot handle more than one output" - + ", outputCount=" + outputs.size()); - } - - LogicalOutput out = outputs.values().iterator().next(); - + Map outMap = new HashMap(); - - KeyValueWriter kvWriter = (KeyValueWriter)out.getWriter(); - OutputCollector collector = new KVOutputCollector(kvWriter); + for (String outputName: outputs.keySet()) { + LOG.info("Handling output: " + outputName); + KeyValueWriter kvWriter = (KeyValueWriter) outputs.get(outputName).getWriter(); + OutputCollector collector = new KVOutputCollector(kvWriter); + outMap.put(outputName, collector); + } if(isMap){ rproc = new MapRecordProcessor(); @@ -109,7 +108,7 @@ public void run(Map inputs, Map out } MRTaskReporter mrReporter = new MRTaskReporter(processorContext); - rproc.init(jobConf, mrReporter, inputs, collector); + rproc.init(jobConf, mrReporter, inputs, outMap); rproc.run(); //done - output does not need to be committed as hive does not use outputcommitter diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index 5f3a7ca..7c2324f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -88,6 +88,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, // link the work with the work associated with the reduce sink that triggered this rule TezWork tezWork = context.currentTask.getWork(); tezWork.connect(parentWork, myWork, EdgeType.BROADCAST_EDGE); + + // remember the output name of the reduce sink + parentRS.getConf().setOutputName(myWork.getName()); + } else { List linkWorkList = context.linkOpWithWorkMap.get(childOp); if (linkWorkList == null) { @@ -95,6 +99,14 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, } linkWorkList.add(parentWork); context.linkOpWithWorkMap.put(childOp, linkWorkList); + + List reduceSinks + = context.linkWorkWithReduceSinkMap.get(parentWork); + if (reduceSinks == null) { + reduceSinks = new ArrayList(); + } + reduceSinks.add(parentRS); + context.linkWorkWithReduceSinkMap.put(parentWork, reduceSinks); } break; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index 1f07594..3c3dcc0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -24,10 +24,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Stack; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -89,6 +91,10 @@ // traversing an operator tree public final Map, List> linkOpWithWorkMap; + // a map to keep track of what reduce sinks have to be hooked up to + // map join work + public final Map> linkWorkWithReduceSinkMap; + // a map that maintains operator (file-sink or reduce-sink) to work mapping public final Map, BaseWork> operatorWorkMap; @@ -102,6 +108,15 @@ // used to group dependent tasks for multi table inserts public final DependencyCollectionTask dependencyTask; + // root of last multi child operator encountered + public Stack> lastRootOfMultiChildOperator; + + // branches of current multi-child operator + public Stack currentBranchCount; + + // work generated for last multi-child operator + public Stack lastWorkForMultiChildOperator; + @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, List> moveTask, List> rootTasks, @@ -117,10 +132,14 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext, this.leafOperatorToFollowingWork = new HashMap, BaseWork>(); this.rootOperators = rootOperators; this.linkOpWithWorkMap = new HashMap, List>(); + this.linkWorkWithReduceSinkMap = new HashMap>(); this.operatorWorkMap = new HashMap, BaseWork>(); this.mapJoinParentMap = new HashMap>>(); this.linkChildOpWithDummyOp = new HashMap, List>>(); this.dependencyTask = (DependencyCollectionTask) TaskFactory.get(new DependencyCollectionWork(), conf); + this.lastRootOfMultiChildOperator = new Stack>(); + this.currentBranchCount = new Stack(); + this.lastWorkForMultiChildOperator = new Stack(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index fe7359e..724ed8f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -71,17 +71,18 @@ public Object process(Node nd, Stack stack, // packing into a vertex, typically a table scan, union or join Operator root = context.currentRootOperator; if (root == null) { - // if there are no more rootOperators we're dealing with multiple - // file sinks off of the same table scan. Bail. - if (context.rootOperators.isEmpty()) { - return null; - } - // null means that we're starting with a new table scan // the graph walker walks the rootOperators in the same // order so we can just take the next context.preceedingWork = null; - root = context.rootOperators.pop(); + + // if there are branches remaining we can't pop the next + // root operator yet. + if (context.currentBranchCount.isEmpty() + || (!context.lastWorkForMultiChildOperator.isEmpty() + && context.lastWorkForMultiChildOperator.peek() == null)) { + root = context.rootOperators.pop(); + } } LOG.debug("Root operator: " + root); @@ -93,18 +94,51 @@ public Object process(Node nd, Stack stack, // a reduce vertex BaseWork work; if (context.preceedingWork == null) { - assert root.getParentOperators().isEmpty(); - MapWork mapWork = new MapWork("Map "+ (++sequenceNumber)); - LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root); - - // map work starts with table scan operators - assert root instanceof TableScanOperator; - String alias = ((TableScanOperator)root).getConf().getAlias(); - - GenMapRedUtils.setMapWork(mapWork, context.parseContext, - context.inputs, null, root, alias, context.conf, false); - tezWork.add(mapWork); - work = mapWork; + if (root == null) { + // this is the multi-insert case. we need to reuse the last + // table scan work. + root = context.lastRootOfMultiChildOperator.peek(); + work = context.lastWorkForMultiChildOperator.peek(); + LOG.debug("Visiting additional branch in: "+root); + + } else { + assert root.getParentOperators().isEmpty(); + MapWork mapWork = new MapWork("Map "+ (++sequenceNumber)); + LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root); + + // map work starts with table scan operators + assert root instanceof TableScanOperator; + String alias = ((TableScanOperator)root).getConf().getAlias(); + + GenMapRedUtils.setMapWork(mapWork, context.parseContext, + context.inputs, null, root, alias, context.conf, false); + tezWork.add(mapWork); + work = mapWork; + + // remember this table scan and work item. this is needed for multiple + // insert statements where multiple operator pipelines hang of a single + // table scan + if (!context.lastWorkForMultiChildOperator.isEmpty() + && context.lastWorkForMultiChildOperator.peek() == null) { + LOG.debug("Capturing current work for 'multiple branches' case"); + context.lastWorkForMultiChildOperator.pop(); + context.lastWorkForMultiChildOperator.push(work); + } + } + + if (!context.currentBranchCount.isEmpty()) { + // we've handled one branch. Adjust the counts. + int branches = context.currentBranchCount.pop(); + if (--branches != 0) { + LOG.debug("Remaining branches: "+branches); + context.currentBranchCount.push(branches); + } else { + LOG.debug("No more remaining branches."); + context.lastRootOfMultiChildOperator.pop(); + context.lastWorkForMultiChildOperator.pop(); + } + } + } else { assert !root.getParentOperators().isEmpty(); ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); @@ -119,6 +153,8 @@ public Object process(Node nd, Stack stack, assert context.parentOfRoot instanceof ReduceSinkOperator; ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot; + LOG.debug("Setting up reduce sink: " + reduceSink); + reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); // need to fill in information about the key and value in the reducer @@ -128,12 +164,25 @@ public Object process(Node nd, Stack stack, reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), context.preceedingWork.getName()); + // remember the output name of the reduce sink + reduceSink.getConf().setOutputName(reduceWork.getName()); + tezWork.add(reduceWork); tezWork.connect( context.preceedingWork, reduceWork, EdgeType.SIMPLE_EDGE); work = reduceWork; + + // remember this work item. this is needed for multiple + // insert statements where multiple operator pipelines hang of a forward + // operator + if (!context.lastWorkForMultiChildOperator.isEmpty() + && context.lastWorkForMultiChildOperator.peek() == null) { + LOG.debug("Capturing current work for 'multiple branches' case"); + context.lastWorkForMultiChildOperator.pop(); + context.lastWorkForMultiChildOperator.push(work); + } } // We're scanning the operator from table scan to final file sink. @@ -162,9 +211,11 @@ public Object process(Node nd, Stack stack, // remember which parent belongs to which tag rWork.getTagToInput().put(rs.getConf().getTag(), work.getName()); + // remember the output name of the reduce sink + rs.getConf().setOutputName(rWork.getName()); + // add dependency between the two work items - tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator), - EdgeType.SIMPLE_EDGE); + tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE); } // This is where we cut the tree as described above. We also remember that @@ -183,6 +234,7 @@ public Object process(Node nd, Stack stack, context.currentRootOperator = operator.getChildOperators().get(0); context.preceedingWork = work; } else { + LOG.debug("Leaf operator - resetting context: " + context.currentRootOperator); context.parentOfRoot = null; context.currentRootOperator = null; context.preceedingWork = null; @@ -214,6 +266,13 @@ public Object process(Node nd, Stack stack, } for (BaseWork parentWork : linkWorkList) { tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE); + + // need to set up output name for reduce sink not that we know the name + // of the downstream work + for (ReduceSinkOperator r: + context.linkWorkWithReduceSinkMap.get(parentWork)) { + r.getConf().setOutputName(work.getName()); + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index ee90940..5e586aa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -34,10 +34,12 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.ForwardOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.tez.TezTask; @@ -153,6 +155,40 @@ public Object process(Node n, Stack s, } }); + opRules.put(new RuleRegExp("Setup table scan", + TableScanOperator.getOperatorName() + "%"), new NodeProcessor() + { + @Override + public Object process(Node n, Stack s, + NodeProcessorCtx procCtx, Object... os) throws SemanticException { + GenTezProcContext context = (GenTezProcContext) procCtx; + TableScanOperator tableScan = (TableScanOperator) n; + LOG.debug("TableScan operator ("+tableScan + +"). Number of branches: "+tableScan.getNumChild()); + context.lastRootOfMultiChildOperator.push(tableScan); + context.currentBranchCount.push(tableScan.getNumChild()); + context.lastWorkForMultiChildOperator.push(null); + return null; + } + }); + + opRules.put(new RuleRegExp("Handle Forward opertor", + ForwardOperator.getOperatorName() + "%"), new NodeProcessor() + { + @Override + public Object process(Node n, Stack s, + NodeProcessorCtx procCtx, Object... os) throws SemanticException { + GenTezProcContext context = (GenTezProcContext) procCtx; + ForwardOperator forward = (ForwardOperator) n; + LOG.debug("Forward operator ("+forward+ + "). Number of branches: "+forward.getNumChild()); + context.lastRootOfMultiChildOperator.push(context.currentRootOperator); + context.currentBranchCount.push(forward.getNumChild()); + context.lastWorkForMultiChildOperator.push(null); + return null; + } + }); + // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 5837fac..e193734 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -60,6 +60,12 @@ private int numDistributionKeys; /** + * Used in tez. Holds the name of the output + * that this reduce sink is writing to. + */ + private String outputName; + + /** * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language). * Partition columns decide the reducer that the current row goes to. * Partition columns are not passed to reducer. @@ -273,4 +279,12 @@ public void setDistinctColumnIndices( List> distinctColumnIndices) { this.distinctColumnIndices = distinctColumnIndices; } + + public String getOutputName() { + return outputName; + } + + public void setOutputName(String outputName) { + this.outputName = outputName; + } }