Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (revision 986162) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (working copy) @@ -51,7 +51,7 @@ private Map fetchOperators; private OutputCollector oc; private JobConf jc; - private boolean abort = false; + private static boolean abort = false; private Reporter rp; public static final Log l4j = LogFactory.getLog("ExecMapper"); private static boolean done; @@ -213,6 +213,9 @@ reportStats rps = new reportStats(rp); mo.preorderMap(rps); + + // reset abort flag so that ExecMapper instance can be potentially reused + setAbort(false); return; } catch (Exception e) { if (!abort) { @@ -231,8 +234,8 @@ return abort; } - public void setAbort(boolean abort) { - this.abort = abort; + public static void setAbort(boolean abrt) { + abort = abrt; } public static void setDone(boolean done) { Index: ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (revision 986162) +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (working copy) @@ -92,6 +92,11 @@ if (ExecMapper.getDone()) { return false; } - return recordReader.next(key, value); + try { + return recordReader.next(key, value); + } catch (IOException e) { + ExecMapper.setAbort(true); + throw e; + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (revision 986162) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (working copy) @@ -28,7 +28,7 @@ /** * HiveRecordReader is a simple wrapper on RecordReader. * It allows us to stop reading the data when some global flag - * ExecMapper.getDone() is set. + * ExecMapper.getDone() is set. */ public class HiveRecordReader implements RecordReader { @@ -63,6 +63,11 @@ if (ExecMapper.getDone()) { return false; } - return recordReader.next(key, value); + try { + return recordReader.next(key, value); + } catch (IOException e) { + ExecMapper.setAbort(true); + throw e; + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (revision 986162) +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (working copy) @@ -24,11 +24,9 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileInputFormat; /** * BucketizedHiveRecordReader is a wrapper on a list of RecordReader. It behaves @@ -87,12 +85,17 @@ } public boolean next(K key, V value) throws IOException { - while ((curReader == null) || !curReader.next(key, value)) { - if (!initNextRecordReader()) { - return false; + try { + while ((curReader == null) || !curReader.next(key, value)) { + if (!initNextRecordReader()) { + return false; + } } + return true; + } catch (IOException e) { + ExecMapper.setAbort(true); + throw e; } - return true; } /**