diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index d892391..d964eb1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -207,6 +207,9 @@ void close(){ // detecting failed executions by exceptions thrown by the operator tree try { + if (mapOp == null || mapWork == null) { + return; + } mapOp.close(abort); // Need to close the dummyOps as well. The operator pipeline @@ -236,5 +239,4 @@ void close(){ MapredContext.close(); } } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 4fa6701..e7d22da 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.util.StringUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.processor.MRTaskReporter; @@ -45,8 +46,8 @@ */ public class TezProcessor implements LogicalIOProcessor { - - + + private static final Log LOG = LogFactory.getLog(TezProcessor.class); private boolean isMap = false; @@ -74,7 +75,7 @@ public TezProcessor(boolean isMap) { @Override public void close() throws IOException { - // we have to close in the processor's run method, because tez closes inputs + // we have to close in the processor's run method, because tez closes inputs // before calling close (TEZ-955) and we might need to read inputs // when we flush the pipeline. } @@ -124,16 +125,16 @@ private void setupMRLegacyConfigs(TezProcessorContext processorContext) { @Override public void run(Map inputs, Map outputs) throws Exception { - - Exception processingException = null; - + + Throwable originalThrowable = null; + try{ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR); // in case of broadcast-join read the broadcast edge inputs // (possibly asynchronously) - + LOG.info("Running task: " + processorContext.getUniqueIdentifier()); - + if (isMap) { rproc = new MapRecordProcessor(); MRInputLegacy mrInput = getMRInput(inputs); @@ -156,29 +157,34 @@ public void run(Map inputs, Map out LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start"); } } - + // Outputs will be started later by the individual Processors. - + MRTaskReporter mrReporter = new MRTaskReporter(processorContext); rproc.init(jobConf, processorContext, mrReporter, inputs, outputs); rproc.run(); //done - output does not need to be committed as hive does not use outputcommitter perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR); - } catch (Exception e) { - processingException = e; + } catch (Throwable t) { + originalThrowable = t; } finally { + if (originalThrowable != null && originalThrowable instanceof Error) { + throw new RuntimeException(originalThrowable); + } + try { if(rproc != null){ rproc.close(); } - } catch (Exception e) { - if (processingException == null) { - processingException = e; + } catch (Throwable t) { + if (originalThrowable == null) { + originalThrowable = t; } } - if (processingException != null) { - throw processingException; + if (originalThrowable != null) { + LOG.error(StringUtils.stringifyException(originalThrowable)); + throw new RuntimeException(originalThrowable); } } } @@ -186,7 +192,7 @@ public void run(Map inputs, Map out /** * KVOutputCollector. OutputCollector that writes using KVWriter. * Must be initialized before it is used. - * + * */ static class TezKVOutputCollector implements OutputCollector { private KeyValueWriter writer;