diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 9fc7afa..3b9a653 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; /** @@ -209,7 +208,6 @@ public RowSchema getSchema() { // non-bean .. protected transient HashMap, LongWritable> statsMap = new HashMap, LongWritable>(); - protected transient OutputCollector out; protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled(); protected transient String alias; @@ -255,19 +253,6 @@ public void setReporter(Reporter rep) { } } - public void setOutputCollector(OutputCollector out) { - this.out = out; - - // the collector is same across all operators - if (childOperators == null) { - return; - } - - for (Operator op : childOperators) { - op.setOutputCollector(out); - } - } - /** * Store the alias this operator is working on behalf of. */ @@ -330,7 +315,6 @@ public void initialize(Configuration hconf, ObjectInspector[] inputOIs) } this.configuration = hconf; - this.out = null; if (!areAllParentsInitialized()) { return; } @@ -613,8 +597,6 @@ public void close(boolean abort) throws HiveException { op.close(abort); } - out = null; - LOG.info(id + " Close done"); } catch (HiveException e) { e.printStackTrace(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index 42b10a8..1070d16 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -20,8 +20,12 @@ import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Set; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.mapred.OutputCollector; + public class OperatorUtils { public static Set findOperators(Operator start, Class clazz) { @@ -53,4 +57,17 @@ } return found; } + + public static void setChildrenCollector(List> childOperators, OutputCollector out) { + if (childOperators == null) { + return; + } + for (Operator op : childOperators) { + if(op.getName().equals(ReduceSinkOperator.getOperatorName())) { //TODO: + ((ReduceSinkOperator)op).setOutputCollector(out); + } else { + setChildrenCollector(op.getChildOperators(), out); + } + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 9bf2140..d1405c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -46,6 +46,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.OutputCollector; /** * Reduce Sink Operator sends output to the reduce stage. @@ -54,6 +55,7 @@ implements Serializable, TopNHash.BinaryCollector { private static final long serialVersionUID = 1L; + protected transient OutputCollector out; /** * The evaluators for the key columns. Key columns decide the sort order on @@ -91,6 +93,10 @@ public String getInputAlias() { return inputAlias; } + public void setOutputCollector(OutputCollector _out) { + this.out = _out; + } + // picks topN K:V pairs from input. protected transient TopNHash reducerHash = new TopNHash(); @Override @@ -366,6 +372,7 @@ protected void closeOp(boolean abort) throws HiveException { reducerHash.flush(); } super.closeOp(abort); + out = null; } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index e69aaa6..8e44720 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner; import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.PartitionKeySampler; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; @@ -81,6 +82,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.RunningJob; import org.apache.log4j.Appender; @@ -543,7 +545,7 @@ private void handleSampling(DriverContext context, MapWork mWork, JobConf job, H FetchOperator fetcher = PartitionKeySampler.createSampler(fetchWork, conf, job, ts); try { ts.initialize(conf, new ObjectInspector[]{fetcher.getOutputObjectInspector()}); - ts.setOutputCollector(sampler); + OperatorUtils.setChildrenCollector(ts.getChildOperators(), sampler); while (fetcher.pushRow()) { } } finally { fetcher.clearFetchContext(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index c1b5508..ffae204 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -146,7 +147,7 @@ public void map(Object key, Object value, OutputCollector output, if (oc == null) { oc = output; rp = reporter; - mo.setOutputCollector(oc); + OperatorUtils.setChildrenCollector(mo.getChildOperators(), output); mo.setReporter(rp); MapredContext.get().setReporter(reporter); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index 2578907..12be0d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -177,7 +177,6 @@ public void reduce(Object key, Iterator values, OutputCollector output, // propagate reporter and output collector to all operators oc = output; rp = reporter; - reducer.setOutputCollector(oc); reducer.setReporter(rp); MapredContext.get().setReporter(reporter); }