diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 8333cf5..c177a43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -94,7 +94,6 @@ mo.initializeLocalWork(jc); mo.initializeMapOperator(jc); - OperatorUtils.setChildrenCollector(mo.getChildOperators(), output); mo.setReporter(rp); if (localWork == null) { @@ -124,6 +123,10 @@ @Override public void processRow(Object key, Object value) throws IOException { + if (!anyRow) { + OperatorUtils.setChildrenCollector(mo.getChildOperators(), oc); + anyRow = true; + } // reset the execContext for each new row execContext.resetRow(); @@ -156,7 +159,7 @@ public void processRow(Object key, Object value) throws IOException { @Override public void close() { // No row was processed - if (oc == null) { + if (!anyRow) { LOG.trace("Close called. no row processed by map."); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java index 2421885..d7488b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java @@ -49,6 +49,8 @@ private long rowNumber = 0; private long nextLogThreshold = 1; + protected boolean anyRow = false; + public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception { jc = job; MapredContext.init(false, new JobConf(jc)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 7c1164b..06b0fae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -37,8 +37,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; @@ -295,6 +293,9 @@ public void remove() { */ @Override public void processRow(Object key, final Object value) throws IOException { + if (!anyRow) { + anyRow = true; + } if (vectorized) { processVectorRow(key, value); } else { @@ -307,6 +308,9 @@ public void processRow(Object key, final Object value) throws IOException { @Override public void processRow(Object key, Iterator values) throws IOException { + if (!anyRow) { + anyRow = true; + } if (vectorized) { processVectorRows(key, values); return; @@ -577,7 +581,7 @@ private Object deserializeValue(BytesWritable valueWritable, byte tag) throws Hi public void close() { // No row was processed - if (oc == null) { + if (!anyRow) { LOG.trace("Close called without any rows processed"); }