Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (revision 1579403) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (working copy) @@ -154,14 +154,18 @@ MRInputLegacy in = TezProcessor.getMRInput(inputs); KeyValueReader reader = in.getReader(); - //process records until done - while(reader.next()){ - //ignore the key for maps - reader.getCurrentKey(); - Object value = reader.getCurrentValue(); - boolean needMore = processRow(value); - if(!needMore){ - break; + try { + //process records until done + while(reader.next()){ + //ignore the key for maps - reader.getCurrentKey(); + Object value = reader.getCurrentValue(); + boolean needMore = processRow(value); + if(!needMore){ + break; + } } + } finally { + closeInternal(); } } @@ -199,7 +203,13 @@ } @Override - void close(){ + void close(){ + // we have to close in the processor, because tez closes inputs + // before calling close (TEZ-955). We might need to read inputs + // when we flush the pipeline though. + } + + void closeInternal(){ // check if there are IOExceptions if (!abort) { abort = execContext.getIoCxt().getIOExceptions(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (revision 1579403) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (working copy) @@ -216,15 +216,18 @@ throw new IOException(e); } - while(kvsReader.next()){ - Object key = kvsReader.getCurrentKey(); - Iterable values = kvsReader.getCurrentValues(); - boolean needMore = processKeyValues(key, values); - if(!needMore){ - break; + try { + while(kvsReader.next()){ + Object key = kvsReader.getCurrentKey(); + Iterable values = kvsReader.getCurrentValues(); + boolean needMore = processKeyValues(key, values); + if(!needMore){ + break; + } } + } finally { + closeInternal(); } - } /** @@ -347,7 +350,13 @@ } @Override - void close(){ + void close(){ + // we have to close in the processor, because tez closes inputs + // before calling close (TEZ-955). We might need to read inputs + // when we flush the pipeline though. + } + + void closeInternal() { // check if there are IOExceptions if (!abort) { abort = execContext.getIoCxt().getIOExceptions();