diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java index ff21a52..310c233 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java @@ -52,7 +52,7 @@ public HiveMapFunction(byte[] jobConfBuffer, SparkReporter sparkReporter) { } HiveMapFunctionResultList result = new HiveMapFunctionResultList(it, mapRecordHandler); - mapRecordHandler.init(jobConf, result, sparkReporter); + mapRecordHandler.init(jobConf, sparkReporter); return result; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java index 4767cd5..69ae027 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java @@ -45,7 +45,7 @@ public HiveMapFunctionResultList( @Override protected void processNextRecord(Tuple2 inputRecord) throws IOException { - recordHandler.processRow(inputRecord._1(), inputRecord._2()); + recordHandler.processRow(inputRecord._1(), inputRecord._2(), this); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index 2b6e2de..7f0b800 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -43,7 +43,7 @@ public HiveReduceFunction(byte[] buffer, SparkReporter sparkReporter) { SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler(); HiveReduceFunctionResultList result = new HiveReduceFunctionResultList(it, reducerRecordhandler); - reducerRecordhandler.init(jobConf, result, sparkReporter); + reducerRecordhandler.init(jobConf, sparkReporter); return result; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java index 1f1517d..26f871d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java @@ -51,9 +51,9 @@ protected void processNextRecord(Tuple2 inputRecord) if (value instanceof Iterable) { @SuppressWarnings("unchecked") Iterable values = (Iterable)value; - reduceRecordHandler.processRow(key, values.iterator()); + reduceRecordHandler.processRow(key, values.iterator(), this); } else { - reduceRecordHandler.processRow(key, value); + reduceRecordHandler.processRow(key, value, this); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 8333cf5..8db3708 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -61,9 +61,9 @@ private ExecMapperContext execContext; @Override - public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception { + public void init(JobConf job, Reporter reporter) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS); - super.init(job, output, reporter); + super.init(job, reporter); try { jc = job; @@ -94,7 +94,6 @@ mo.initializeLocalWork(jc); mo.initializeMapOperator(jc); - OperatorUtils.setChildrenCollector(mo.getChildOperators(), output); mo.setReporter(rp); if (localWork == null) { @@ -123,7 +122,11 @@ } @Override - public void processRow(Object key, Object value) throws IOException { + public void processRow(Object key, Object value, OutputCollector output) throws IOException { + if (oc == null) { + oc = output; + OperatorUtils.setChildrenCollector(mo.getChildOperators(), output); + } // reset the execContext for each new row execContext.resetRow(); @@ -149,7 +152,8 @@ public void processRow(Object key, Object value) throws IOException { } @Override - public void processRow(Object key, Iterator values) throws IOException { + public void processRow(Object key, Iterator values, OutputCollector output) + throws IOException { throw new UnsupportedOperationException("Do not support this method in SparkMapRecordHandler."); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java index 4af372a..02f03f0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java @@ -53,8 +53,8 @@ @SuppressWarnings("unchecked") @Override - public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception { - super.init(job, output, reporter); + public void init(JobConf job, Reporter reporter) throws Exception { + super.init(job, reporter); try { jc = job; @@ -89,7 +89,10 @@ } @Override - public void processRow(Object key, Object value) throws IOException { + public void processRow(Object key, Object value, OutputCollector output) throws IOException { + if (oc == null) { + oc = output; + } row[0] = key; row[1] = value; try { @@ -101,7 +104,8 @@ public void processRow(Object key, Object value) throws IOException { } @Override - public void processRow(Object key, Iterator values) throws IOException { + public void processRow(Object key, Iterator values, OutputCollector output) + throws IOException { throw new UnsupportedOperationException("Do not support this method in " + this.getClass().getSimpleName()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java index 2421885..61f9e17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java @@ -49,12 +49,11 @@ private long rowNumber = 0; private long nextLogThreshold = 1; - public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception { + public void init(JobConf job, Reporter reporter) throws Exception { jc = job; MapredContext.init(false, new JobConf(jc)); MapredContext.get().setReporter(reporter); - oc = output; rp = reporter; LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); @@ -73,12 +72,14 @@ /** * Process row with key and single value. */ - public abstract void processRow(Object key, Object value) throws IOException; + public abstract void processRow(Object key, Object value, OutputCollector output) + throws IOException; /** * Process row with key and value collection. */ - public abstract void processRow(Object key, Iterator values) throws IOException; + public abstract void processRow(Object key, Iterator values, OutputCollector output) + throws IOException; /** * Logger processed row number and used memory info. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 7c1164b..19148bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -120,9 +121,9 @@ @Override @SuppressWarnings("unchecked") - public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception { + public void init(JobConf job, Reporter reporter) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS); - super.init(job, output, reporter); + super.init(job, reporter); rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; @@ -231,8 +232,6 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) throws reducer.passExecContext(execContext); reducer.setReporter(rp); - OperatorUtils.setChildrenCollector( - Arrays.>asList(reducer), output); // initialize reduce operator tree try { @@ -294,19 +293,28 @@ public void remove() { * Process one row using a dummy iterator. Or, add row to vector batch. */ @Override - public void processRow(Object key, final Object value) throws IOException { + public void processRow(Object key, final Object value, OutputCollector output) + throws IOException { + if (oc == null) { + oc = output; + OperatorUtils.setChildrenCollector(Collections.singletonList(reducer), output); + } if (vectorized) { processVectorRow(key, value); } else { dummyIterator.setValue(value); - processRow(key, dummyIterator); + processRow(key, dummyIterator, output); } } - - @Override - public void processRow(Object key, Iterator values) throws IOException { + public void processRow(Object key, Iterator values, OutputCollector output) + throws IOException { + if (oc == null) { + oc = output; + OperatorUtils.setChildrenCollector(Collections.singletonList(reducer), output); + } + if (vectorized) { processVectorRows(key, values); return;