Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (revision 986112) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (working copy) @@ -53,7 +53,7 @@ private Map fetchOperators; private OutputCollector oc; private JobConf jc; - private boolean abort = false; + private static boolean abort = false; private Reporter rp; public static final Log l4j = LogFactory.getLog("ExecMapper"); private static boolean done; @@ -102,8 +102,8 @@ } fetchOperators = new HashMap(); - - Map fetchOpJobConfMap = new HashMap(); + + Map fetchOpJobConfMap = new HashMap(); // create map local operators for (Map.Entry entry : localWork.getAliasToFetchWork() .entrySet()) { @@ -118,7 +118,7 @@ setColumnsNeeded = true; } } - + if (!setColumnsNeeded) { ColumnProjectionUtils.setFullyReadColumns(jobClone); } @@ -240,6 +240,9 @@ reportStats rps = new reportStats(rp); mo.preorderMap(rps); + + // reset abort flag so that ExecMapper instance can be potentially reused + setAbort(false); return; } catch (Exception e) { if (!abort) { @@ -258,8 +261,8 @@ return abort; } - public void setAbort(boolean abort) { - this.abort = abort; + public static void setAbort(boolean abrt) { + abort = abrt; } public static void setDone(boolean done) { Index: ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (revision 986112) +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (working copy) @@ -69,6 +69,7 @@ this.initIOContext(fsplit, job, inputFormatClass); } + @Override public void doClose() throws IOException { recordReader.close(); } @@ -89,10 +90,16 @@ return recordReader.getProgress(); } + @Override public boolean doNext(K key, V value) throws IOException { if (ExecMapper.getDone()) { return false; } - return recordReader.next(key, value); + try { + return recordReader.next(key, value); + } catch (IOException e) { + ExecMapper.setAbort(true); + throw e; + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (revision 986112) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (working copy) @@ -38,6 +38,7 @@ this.recordReader = recordReader; } + @Override public void doClose() throws IOException { recordReader.close(); } @@ -63,7 +64,12 @@ if (ExecMapper.getDone()) { return false; } - return recordReader.next(key, value); + try { + return recordReader.next(key, value); + } catch (IOException e) { + // should set the abort flag in ExecMapper before throwing the exception up + ExecMapper.setAbort(true); + throw e; + } } - }