Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (revision 1579403) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (working copy) @@ -74,9 +74,9 @@ @Override public void close() throws IOException { - if(rproc != null){ - rproc.close(); - } + // we have to close in the processor's run method, because tez closes inputs + // before calling close (TEZ-955) and we might need to read inputs + // when we flush the pipeline. } @Override @@ -123,42 +123,48 @@ @Override public void run(Map inputs, Map outputs) throws Exception { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR); - // in case of broadcast-join read the broadcast edge inputs - // (possibly asynchronously) + try{ + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR); + // in case of broadcast-join read the broadcast edge inputs + // (possibly asynchronously) + + LOG.info("Running task: " + processorContext.getUniqueIdentifier()); + + if (isMap) { + rproc = new MapRecordProcessor(); + MRInputLegacy mrInput = getMRInput(inputs); + try { + mrInput.init(); + } catch (IOException e) { + throw new RuntimeException("Failed while initializing MRInput", e); + } + } else { + rproc = new ReduceRecordProcessor(); + } - LOG.info("Running task: " + processorContext.getUniqueIdentifier()); - - if (isMap) { - rproc = new MapRecordProcessor(); - MRInputLegacy mrInput = getMRInput(inputs); - try { - mrInput.init(); - } catch (IOException e) { - throw new RuntimeException("Failed while initializing MRInput", e); + TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf); + // Start the actual Inputs. After MRInput initialization. + for (Entry inputEntry : inputs.entrySet()) { + if (!cacheAccess.isInputCached(inputEntry.getKey())) { + inputEntry.getValue().start(); + } else { + LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start"); + } } - } else { - rproc = new ReduceRecordProcessor(); - } + + // Outputs will be started later by the individual Processors. + + MRTaskReporter mrReporter = new MRTaskReporter(processorContext); + rproc.init(jobConf, processorContext, mrReporter, inputs, outputs); + rproc.run(); - TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf); - // Start the actual Inputs. After MRInput initialization. - for (Entry inputEntry : inputs.entrySet()) { - if (!cacheAccess.isInputCached(inputEntry.getKey())) { - inputEntry.getValue().start(); - } else { - LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start"); + //done - output does not need to be committed as hive does not use outputcommitter + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR); + } finally { + if(rproc != null){ + rproc.close(); } } - - // Outputs will be started later by the individual Processors. - - MRTaskReporter mrReporter = new MRTaskReporter(processorContext); - rproc.init(jobConf, processorContext, mrReporter, inputs, outputs); - rproc.run(); - - //done - output does not need to be committed as hive does not use outputcommitter - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR); } /**