Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (revision 986112) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (working copy) @@ -53,7 +53,7 @@ private Map fetchOperators; private OutputCollector oc; private JobConf jc; - private boolean abort = false; + private static boolean abort = false; private Reporter rp; public static final Log l4j = LogFactory.getLog("ExecMapper"); private static boolean done; @@ -102,8 +102,8 @@ } fetchOperators = new HashMap(); - - Map fetchOpJobConfMap = new HashMap(); + + Map fetchOpJobConfMap = new HashMap(); // create map local operators for (Map.Entry entry : localWork.getAliasToFetchWork() .entrySet()) { @@ -118,7 +118,7 @@ setColumnsNeeded = true; } } - + if (!setColumnsNeeded) { ColumnProjectionUtils.setFullyReadColumns(jobClone); } @@ -240,6 +240,9 @@ reportStats rps = new reportStats(rp); mo.preorderMap(rps); + + // reset abort flag so that ExecMapper instance can be potentially reused + setAbort(false); return; } catch (Exception e) { if (!abort) { @@ -258,8 +261,8 @@ return abort; } - public void setAbort(boolean abort) { - this.abort = abort; + public static void setAbort(boolean abrt) { + abort = abrt; } public static void setDone(boolean done) { Index: ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (revision 986112) +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (working copy) @@ -69,6 +69,7 @@ this.initIOContext(fsplit, job, inputFormatClass); } + @Override public void doClose() throws IOException { recordReader.close(); } @@ -89,10 +90,16 @@ return recordReader.getProgress(); } + @Override public boolean doNext(K key, V value) throws IOException { if (ExecMapper.getDone()) { return false; } - return recordReader.next(key, value); + try { + return recordReader.next(key, value); + } catch (IOException e) { + ExecMapper.setAbort(true); + throw e; + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (revision 986112) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (working copy) @@ -38,6 +38,7 @@ this.recordReader = recordReader; } + @Override public void doClose() throws IOException { recordReader.close(); } @@ -63,7 +64,12 @@ if (ExecMapper.getDone()) { return false; } - return recordReader.next(key, value); + try { + return recordReader.next(key, value); + } catch (IOException e) { + // should set the abort flag in ExecMapper before throwing the exception up + ExecMapper.setAbort(true); + throw e; + } } - } Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (revision 986112) +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (working copy) @@ -20,14 +20,13 @@ import java.io.IOException; +import org.apache.hadoop.hive.ql.exec.ExecMapper; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileInputFormat; /** * BucketizedHiveRecordReader is a wrapper on a list of RecordReader. It behaves @@ -54,6 +53,7 @@ initNextRecordReader(); } + @Override public void doClose() throws IOException { if (curReader != null) { curReader.close(); @@ -85,13 +85,19 @@ / (float) (split.getLength())); } + @Override public boolean doNext(K key, V value) throws IOException { - while ((curReader == null) || !curReader.next(key, value)) { - if (!initNextRecordReader()) { - return false; + try { + while ((curReader == null) || !curReader.next(key, value)) { + if (!initNextRecordReader()) { + return false; + } } + return true; + } catch (IOException e) { + ExecMapper.setAbort(true); + throw e; } - return true; } /**