Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (revision 986112) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (working copy) @@ -102,8 +102,8 @@ } fetchOperators = new HashMap(); - - Map fetchOpJobConfMap = new HashMap(); + + Map fetchOpJobConfMap = new HashMap(); // create map local operators for (Map.Entry entry : localWork.getAliasToFetchWork() .entrySet()) { @@ -118,7 +118,7 @@ setColumnsNeeded = true; } } - + if (!setColumnsNeeded) { ColumnProjectionUtils.setFullyReadColumns(jobClone); } @@ -219,6 +219,11 @@ l4j.trace("Close called. no row processed by map."); } + // check if there are IOExceptions + if (!abort) { + abort = execContext.getIoCxt().getIOExceptions(); + } + // detecting failed executions by exceptions thrown by the operator tree // ideally hadoop should let us know whether map execution failed or not try { Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (revision 986112) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (working copy) @@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.RecordReader; public abstract class HiveContextAwareRecordReader implements RecordReader { - + private boolean initDone = false; - - /** + + /** * Reads the next key/value pair from the input for processing. * * @param key the key to read data into @@ -40,32 +40,37 @@ * @return true if a key/value was read, false if at EOF */ public abstract boolean doNext(K key, V value) throws IOException; - - /** + + /** * Close this {@link InputSplit} to future operations. - * + * * @throws IOException - */ + */ public abstract void doClose() throws IOException; - + private IOContext ioCxtRef = null; - + @Override public void close() throws IOException { doClose(); initDone = false; ioCxtRef = null; } - + @Override public boolean next(K key, V value) throws IOException { if(!initDone) { throw new IOException("Hive IOContext is not inited."); } updateIOContext(); - return doNext(key, value); + try { + return doNext(key, value); + } catch (IOException e) { + ioCxtRef.setIOExceptions(true); + throw e; + } } - + protected void updateIOContext() throws IOException { long pointerPos = this.getPos(); @@ -85,11 +90,11 @@ ioCxtRef.nextBlockStart = pointerPos; } } - + public IOContext getIOContext() { return IOContext.get(); } - + public void initIOContext(long startPos, boolean isBlockPointer, String inputFile) { ioCxtRef = this.getIOContext(); ioCxtRef.currentBlockStart = startPos; @@ -101,7 +106,7 @@ public void initIOContext(FileSplit split, JobConf job, Class inputFormatClass) throws IOException { boolean blockPointer = false; - long blockStart = -1; + long blockStart = -1; FileSplit fileSplit = (FileSplit) split; Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(job); Index: ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (revision 986112) +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (working copy) @@ -28,14 +28,14 @@ * nextBlockStart refers the end of current row and beginning of next row. */ public class IOContext { - + private static ThreadLocal threadLocal = new ThreadLocal(); static { if (threadLocal.get() == null) { - threadLocal.set(new IOContext()); + threadLocal.set(new IOContext()); } } - + public static IOContext get() { return IOContext.threadLocal.get(); } @@ -43,13 +43,15 @@ long currentBlockStart; long nextBlockStart; boolean isBlockPointer; - + boolean ioExceptions; + String inputFile; - + public IOContext() { this.currentBlockStart = 0; this.nextBlockStart = -1; this.isBlockPointer = true; + this.ioExceptions = false; } public long getCurrentBlockStart() { @@ -75,7 +77,7 @@ public void setBlockPointer(boolean isBlockPointer) { this.isBlockPointer = isBlockPointer; } - + public String getInputFile() { return inputFile; } @@ -83,4 +85,12 @@ public void setInputFile(String inputFile) { this.inputFile = inputFile; } + + public void setIOExceptions(boolean ioe) { + this.ioExceptions = ioe; + } + + public boolean getIOExceptions() { + return ioExceptions; + } }