Index: src/test/org/apache/hadoop/mapred/TestSortedRanges.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestSortedRanges.java	(revision 0)
+++ src/test/org/apache/hadoop/mapred/TestSortedRanges.java	(revision 0)
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.SortedRanges.Range;
+
+public class TestSortedRanges extends TestCase {
+  public static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.mapred.TestSortedRanges");
+  
+  public void testAdd() {
+    SortedRanges sr = new SortedRanges();
+    sr.add(new SortedRanges.Range(3,10));
+    assertEquals(8, sr.getIndicesCount());
+    
+    sr.add(new Range(2,7));
+    assertEquals(9, sr.getIndicesCount());
+    
+    sr.add(new Range(7,15));
+    assertEquals(14, sr.getIndicesCount());
+    
+    sr.add(new Range(31,40));
+    sr.add(new Range(51,60));
+    sr.add(new Range(66,75));
+    assertEquals(44, sr.getIndicesCount());
+    
+    sr.add(new Range(21,70));
+    assertEquals(69, sr.getIndicesCount());
+    
+    LOG.debug(sr);
+  }
+
+}
Index: src/test/org/apache/hadoop/mapred/TestBadRecords.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestBadRecords.java	(revision 0)
+++ src/test/org/apache/hadoop/mapred/TestBadRecords.java	(revision 0)
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestBadRecords extends ClusterMapReduceTestCase {
+  
+  public static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.mapred.TestBadRecords");
+  
+  protected static final List<String> MAPPER_BAD_RECORDS = 
+    Arrays.asList("hello1","hello4","hello5");
+  
+  protected static final List<String> REDUCER_BAD_RECORDS = 
+    Arrays.asList("hello8","hello10");
+  
+  protected List<String> input;
+  
+  public static final int TOTAL_COUNT = 10;
+  
+  public TestBadRecords() {
+    input = new ArrayList<String>();
+    for(int i=1;i<=TOTAL_COUNT;i++) {
+      input.add("hello"+i);
+    }
+  }
+  
+  private void runMapReduce(JobConf conf, 
+      List<String> mapperBadRecords, List<String> redBadRecords) throws Exception {
+    createInput();
+    conf.setJobName("mr");
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.setInt("mapred.task.timeout", 30*1000);
+    
+    //the no of attempts to successfully complete the task depends 
+    //on the no of bad records.
+    conf.setMaxMapAttempts(conf.getAttemptsToEnableSkipMode()+1+mapperBadRecords.size());
+    conf.setMaxReduceAttempts(conf.getAttemptsToEnableSkipMode()+1+redBadRecords.size());
+    
+    FileInputFormat.setInputPaths(conf, getInputDir());
+    FileOutputFormat.setOutputPath(conf, getOutputDir());
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+    JobClient.runJob(conf);
+    validateOutput(conf, mapperBadRecords, redBadRecords);
+  }
+  
+  
+  private void createInput() throws Exception {
+    OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    for(String inp : input) {
+      wr.write(inp+"\n");
+    }wr.close();
+  }
+  
+  private void validateOutput(JobConf conf, 
+      List<String> mapperBadRecords, List<String> redBadRecords) 
+    throws Exception{
+    Path[] outputFiles = FileUtil.stat2Paths(
+        getFileSystem().listStatus(getOutputDir(),
+        new OutputLogFilter()));
+    
+    List<String> mapperOutput=getProcessed(input, mapperBadRecords);
+    LOG.debug("mapperOutput " + mapperOutput.size());
+    List<String> reducerOutput=getProcessed(mapperOutput, redBadRecords);
+    LOG.debug("reducerOutput " + reducerOutput.size());
+    
+   if (outputFiles.length > 0) {
+      InputStream is = getFileSystem().open(outputFiles[0]);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+      String line = reader.readLine();
+      int counter = 0;
+      while (line != null) {
+        counter++;
+        StringTokenizer tokeniz = new StringTokenizer(line, "\t");
+        String key = tokeniz.nextToken();
+        String value = tokeniz.nextToken();
+        LOG.debug("Output: key:"+key + "  value:"+value);
+        assertTrue(value.contains("hello"));
+        
+        
+        assertTrue(reducerOutput.contains(value));
+        line = reader.readLine();
+      }
+      reader.close();
+      assertEquals(reducerOutput.size(), counter);
+    }
+  }
+  
+  private List<String> getProcessed(List<String> inputs, List<String> badRecs) {
+    List<String> processed = new ArrayList<String>();
+    for(String input : inputs) {
+      if(!badRecs.contains(input)) {
+        processed.add(input);
+      }
+    }
+    return processed;
+  }
+  
+  public void testBadMapper() throws Exception {
+    JobConf conf = createJobConf();
+    conf.setMapperClass(BadMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    runMapReduce(conf, MAPPER_BAD_RECORDS, new ArrayList<String>());
+  }
+  
+  public void testBadReducer() throws Exception {
+    JobConf conf = createJobConf();
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(BadReducer.class);
+    runMapReduce(conf, new ArrayList<String>(),REDUCER_BAD_RECORDS);
+  }
+  
+  public void testBadMapRed() throws Exception {
+    JobConf conf = createJobConf();
+    conf.setMapperClass(BadMapper.class);
+    conf.setReducerClass(BadReducer.class);
+    runMapReduce(conf, MAPPER_BAD_RECORDS, REDUCER_BAD_RECORDS);
+  }
+  
+    
+  static class BadMapper extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
+    
+    public void map(LongWritable key, Text val,
+        OutputCollector<LongWritable, Text> output, Reporter reporter)
+        throws IOException {
+      String str = val.toString();
+      LOG.debug("MAP key:" +key +"  value:" + str);
+      if(MAPPER_BAD_RECORDS.get(0).equals(str)) {
+        LOG.warn("MAP Encountered BAD record");
+        System.exit(-1);
+      }
+      else if(MAPPER_BAD_RECORDS.get(1).equals(str)) {
+        LOG.warn("MAP Encountered BAD record");
+        throw new RuntimeException("Bad record "+str);
+      }
+      else if(MAPPER_BAD_RECORDS.get(2).equals(str)) {
+        try {
+          LOG.warn("MAP Encountered BAD record");
+          Thread.sleep(15*60*1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      output.collect(key, val);
+    }
+  }
+  
+  static class BadReducer extends MapReduceBase implements Reducer<LongWritable, Text, LongWritable, Text> {
+    
+    public void reduce(LongWritable key, Iterator<Text> values,
+        OutputCollector<LongWritable, Text> output, Reporter reporter)
+        throws IOException {
+      while(values.hasNext()) {
+        Text value = values.next();
+        LOG.debug("REDUCE key:" +key +"  value:" + value);
+        if(REDUCER_BAD_RECORDS.get(0).equals(value.toString())) {
+          LOG.warn("REDUCE Encountered BAD record");
+          System.exit(-1);
+        }
+        else if(REDUCER_BAD_RECORDS.get(1).equals(value.toString())) {
+          try {
+            LOG.warn("REDUCE Encountered BAD record");
+            Thread.sleep(15*60*1000);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+        output.collect(key, value);
+      }
+      
+    }
+  }
+  
+
+}
Index: src/mapred/org/apache/hadoop/mapred/Reporter.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Reporter.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/Reporter.java	(working copy)
@@ -53,6 +53,8 @@
       public InputSplit getInputSplit() throws UnsupportedOperationException {
         throw new UnsupportedOperationException("NULL reporter has no input");
       }
+      public void recordsProcessed(long amount) {
+      }
     };
 
   /**
@@ -92,4 +94,9 @@
    */
   public abstract InputSplit getInputSplit() 
     throws UnsupportedOperationException;
+  
+  /**
+   * Reports the amount of records processed since last report
+   */
+  public void recordsProcessed(long amount);
 }
Index: src/mapred/org/apache/hadoop/mapred/Task.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Task.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/Task.java	(working copy)
@@ -64,13 +64,15 @@
   protected static enum Counter { 
     MAP_INPUT_RECORDS, 
     MAP_OUTPUT_RECORDS,
+    MAP_SKIPPED_RECORDS,
     MAP_INPUT_BYTES, 
     MAP_OUTPUT_BYTES,
     COMBINE_INPUT_RECORDS,
     COMBINE_OUTPUT_RECORDS,
     REDUCE_INPUT_GROUPS,
     REDUCE_INPUT_RECORDS,
-    REDUCE_OUTPUT_RECORDS
+    REDUCE_OUTPUT_RECORDS,
+    REDUCE_SKIPPED_RECORDS
   }
   
   /**
@@ -108,6 +110,10 @@
   private int partition;                          // id within job
   TaskStatus taskStatus; 										      // current status of the task
   private Path taskOutputPath;                    // task-specific output dir
+  private SortedRanges recordsToSkip = new SortedRanges(); //failed ranges from previous attempts
+  private boolean skipMode = false;
+  private long currentRecStartIndex; //currently processing record start index
+  private Iterator<Long> currentRecIndexIterator;
   
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
@@ -175,7 +181,23 @@
   protected synchronized void setPhase(TaskStatus.Phase phase){
     this.taskStatus.setPhase(phase); 
   }
+  
+  public SortedRanges getRecordsToSkip() {
+    return recordsToSkip;
+  }
 
+  public void setRecordsToSkip(SortedRanges recordsToSkip) {
+    this.recordsToSkip = recordsToSkip;
+  }
+
+  public boolean isSkipMode() {
+    return skipMode;
+  }
+
+  public void setSkipMode(boolean skipMode) {
+    this.skipMode = skipMode;
+  }
+
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -190,6 +212,8 @@
       Text.writeString(out, "");
     }
     taskStatus.write(out);
+    recordsToSkip.write(out);
+    out.writeBoolean(skipMode);
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
@@ -203,6 +227,10 @@
     }
     taskStatus.readFields(in);
     this.mapOutputFile.setJobId(taskId.getJobID()); 
+    recordsToSkip.readFields(in);
+    currentRecIndexIterator = recordsToSkip.skipRangeIterator();
+    currentRecStartIndex = currentRecIndexIterator.next();
+    skipMode = in.readBoolean();
   }
 
   @Override
@@ -372,11 +400,24 @@
           }
           setProgressFlag();
         }
+        public void recordsProcessed(long amount) {
+          for(int i=0;i<amount;i++) {
+            currentRecStartIndex = currentRecIndexIterator.next();
+          }
+        }
         public InputSplit getInputSplit() throws UnsupportedOperationException {
           return Task.this.getInputSplit();
         }
       };
   }
+  
+  protected void reportNextRecordRange(final TaskUmbilicalProtocol umbilical, 
+      long nextRecIndex) throws IOException{
+    SortedRanges.Range range = new SortedRanges.Range(currentRecStartIndex, nextRecIndex);
+    taskStatus.setNextRecordRange(range);
+    LOG.debug("sending reportNextRecordRange " + range);
+    umbilical.reportNextRecordRange(taskId, range);
+  }
 
   public void setProgress(float progress) {
     taskProgress.set(progress);
Index: src/mapred/org/apache/hadoop/mapred/MapRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/MapRunner.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/MapRunner.java	(working copy)
@@ -45,6 +45,7 @@
       while (input.next(key, value)) {
         // map pair to output
         mapper.map(key, value, output, reporter);
+        reporter.recordsProcessed(1);
       }
     } finally {
       mapper.close();
Index: src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java	(working copy)
@@ -277,6 +277,11 @@
     public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
       // Ignore for now
     }
+    
+    public void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range)
+    throws IOException {
+      LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
+    }
 
     public boolean ping(TaskAttemptID taskid) throws IOException {
       return true;
Index: src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java	(working copy)
@@ -43,9 +43,10 @@
    * Version 8 changes {job|tip|task}id's to use their corresponding 
    * objects rather than strings.
    * Version 9 changes the counter representation for HADOOP-1915
+   * Version 10 added reportNextRecordRange for HADOOP-153
    * */
 
-  public static final long versionID = 9L;
+  public static final long versionID = 10L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(TaskAttemptID taskid) throws IOException;
@@ -68,6 +69,9 @@
    *  @param trace the text to report
    */
   void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException;
+  
+  void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) 
+    throws IOException;
 
   /** Periodically called by child to check if parent is still alive. 
    * @return True if the task is known
Index: src/mapred/org/apache/hadoop/mapred/TaskStatus.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskStatus.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/TaskStatus.java	(working copy)
@@ -56,6 +56,7 @@
   private Phase phase = Phase.STARTING; 
   private Counters counters;
   private boolean includeCounters;
+  private SortedRanges.Range nextRecordRange = new SortedRanges.Range();
 
   public TaskStatus() {}
 
@@ -89,6 +90,15 @@
   }
   public String getStateString() { return stateString; }
   public void setStateString(String stateString) { this.stateString = stateString; }
+  
+  public SortedRanges.Range getNextRecordRange() {
+    return nextRecordRange;
+  }
+
+  public void setNextRecordRange(SortedRanges.Range nextRecordRange) {
+    this.nextRecordRange = nextRecordRange;
+  }
+  
   /**
    * Get task finish time. if shuffleFinishTime and sortFinishTime 
    * are not set before, these are set to finishTime. It takes care of 
@@ -247,6 +257,7 @@
     this.progress = status.getProgress();
     this.runState = status.getRunState();
     this.stateString = status.getStateString();
+    this.nextRecordRange = status.getNextRecordRange();
 
     setDiagnosticInfo(status.getDiagnosticInfo());
     
@@ -297,6 +308,7 @@
     if (includeCounters) {
       counters.write(out);
     }
+    nextRecordRange.write(out);
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -313,6 +325,7 @@
     if (includeCounters) {
       counters.readFields(in);
     }
+    nextRecordRange.readFields(in);
   }
   
   //////////////////////////////////////////////////////////////////////////////
Index: src/mapred/org/apache/hadoop/mapred/TaskTracker.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(working copy)
@@ -1585,6 +1585,10 @@
     public synchronized void reportDiagnosticInfo(String info) {
       this.diagnosticInfo.append(info);
     }
+    
+    public synchronized void reportNextRecordRange(SortedRanges.Range range) {
+      this.taskStatus.setNextRecordRange(range);
+    }
 
     /**
      * The task is reporting that it's done running
@@ -1988,6 +1992,16 @@
       LOG.warn("Error from unknown child task: "+taskid+". Ignored.");
     }
   }
+  
+  public synchronized void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) 
+    throws IOException {
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      tip.reportNextRecordRange(range);
+    } else {
+      LOG.warn("reportNextRecordRange from unknown child task: "+taskid+". Ignored.");
+    }
+  }
 
   /** Child checking to see if we're alive.  Normally does nothing.*/
   public synchronized boolean ping(TaskAttemptID taskid) throws IOException {
Index: src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskInProgress.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/TaskInProgress.java	(working copy)
@@ -77,6 +77,8 @@
   private int completes = 0;
   private boolean failed = false;
   private boolean killed = false;
+  private volatile SortedRanges failedRecords = new SortedRanges();
+  private boolean skipMode = false;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -485,6 +487,12 @@
     if (taskState == TaskStatus.State.FAILED) {
       numTaskFailures++;
       machinesWhereFailed.add(trackerHostName);
+      LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
+      failedRecords.add(status.getNextRecordRange());
+      if(conf.getSkipModeEnabled() && 
+          numTaskFailures>=conf.getAttemptsToEnableSkipMode()) {
+        skipMode = true;
+      }
     } else {
       numKilledTasks++;
     }
@@ -709,12 +717,15 @@
     }
 
     if (isMapTask()) {
+      LOG.debug("attemdpt "+  numTaskFailures   +" sending skippedRecords "+failedRecords.getIndicesCount());
       t = new MapTask(jobFile, taskid, partition, 
           rawSplit.getClassName(), rawSplit.getBytes());
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
     t.setConf(conf);
+    t.setRecordsToSkip(failedRecords);
+    t.setSkipMode(skipMode);
     tasks.put(taskid, t);
 
     activeTasks.put(taskid, taskTracker);
Index: src/mapred/org/apache/hadoop/mapred/JobConf.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobConf.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/JobConf.java	(working copy)
@@ -236,6 +236,21 @@
     set("user.name", user);
   }
 
+  public boolean getSkipModeEnabled() {
+    return getBoolean("skip.mode.enabled", true);
+  }
+  
+  public void setSkipModeEnabled(boolean skipModeEnabled) {
+    setBoolean("skip.mode.enabled", skipModeEnabled);
+  }
+  
+  public int getAttemptsToEnableSkipMode() {
+    return getInt("attempts.to.enable.skip.mode", 2);
+  }
+  
+  public void setAttemptsToEnableSkipMode(int attemptsToEnableSkipMode) {
+    setInt("attempts.to.enable.skip.mode", attemptsToEnableSkipMode);
+  }
 
   
   /**
Index: src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java	(working copy)
@@ -222,6 +222,7 @@
       try {
         // map pair to output
         MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
+        reporter.recordsProcessed(1);
       } catch (IOException ex) {
         // If there is an IOException during the call it is set in an instance
         // variable of the MultithreadedMapRunner from where it will be
Index: src/mapred/org/apache/hadoop/mapred/ReduceTask.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/ReduceTask.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/ReduceTask.java	(working copy)
@@ -232,6 +232,40 @@
       reporter.progress();
     }
   }
+  
+  private class SkippingReduceValuesIterator<KEY,VALUE> 
+    extends ReduceValuesIterator<KEY,VALUE> {
+    private Iterator<Long> skipFailedRecIndexIterator;
+    private TaskUmbilicalProtocol umbilical;
+    private long recIndex = -1;
+    
+    public SkippingReduceValuesIterator(RawKeyValueIterator in,
+        RawComparator<KEY> comparator, Class<KEY> keyClass,
+        Class<VALUE> valClass, Configuration conf, Progressable reporter,
+        TaskUmbilicalProtocol umbilical) throws IOException {
+      super(in, comparator, keyClass, valClass, conf, reporter);
+      this.umbilical = umbilical;
+      skipFailedRecIndexIterator = getRecordsToSkip().skipRangeIterator();
+      mayBeSkip();
+    }
+    
+    void nextKey() throws IOException {
+      super.nextKey();
+      mayBeSkip();
+    }
+    
+    private void mayBeSkip() throws IOException {
+      recIndex++;
+      long nextRecIndex = skipFailedRecIndexIterator.next();
+      long skip = nextRecIndex - recIndex;
+      for(int i=0;i<skip && super.more();i++) {
+        super.nextKey();
+        recIndex++;
+      }
+      getCounters().incrCounter(Counter.REDUCE_SKIPPED_RECORDS, skip);
+      reportNextRecordRange(umbilical, nextRecIndex);
+    }
+  }
 
   @Override
   @SuppressWarnings("unchecked")
@@ -309,13 +343,18 @@
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
       
-      ReduceValuesIterator values = new ReduceValuesIterator(rIter, 
+      ReduceValuesIterator values = isSkipMode() ? 
+          new SkippingReduceValuesIterator(rIter, 
+              job.getOutputValueGroupingComparator(), keyClass, valClass, 
+              job, reporter, umbilical) :
+          new ReduceValuesIterator(rIter, 
           job.getOutputValueGroupingComparator(), keyClass, valClass, 
           job, reporter);
       values.informReduceProgress();
       while (values.more()) {
         reduceInputKeyCounter.increment(1);
         reducer.reduce(values.getKey(), values, collector, reporter);
+        reporter.recordsProcessed(1);
         values.nextKey();
         values.informReduceProgress();
       }
Index: src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/IsolationRunner.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/IsolationRunner.java	(working copy)
@@ -89,6 +89,11 @@
                                                         int fromEventId, int maxLocs) throws IOException {
       return TaskCompletionEvent.EMPTY_ARRAY;
     }
+
+    public void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range)
+        throws IOException {
+      LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
+    }
   }
   
   private static ClassLoader makeClassLoader(JobConf conf, 
Index: src/mapred/org/apache/hadoop/mapred/MapTask.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/MapTask.java	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/MapTask.java	(working copy)
@@ -31,6 +31,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -142,11 +143,19 @@
     private RecordReader<K,V> rawIn;
     private Counters.Counter inputByteCounter;
     private Counters.Counter inputRecordCounter;
+    private Iterator<Long> skipFailedRecIndexIterator;
+    private TaskUmbilicalProtocol umbilical;
+    private long recIndex = -1;
+    private long beforePos = -1;
+    private long afterPos = -1;
     
-    TrackedRecordReader(RecordReader<K,V> raw, Counters counters) {
+    TrackedRecordReader(RecordReader<K,V> raw, Counters counters, 
+        TaskUmbilicalProtocol umbilical) {
       rawIn = raw;
+      this.umbilical = umbilical;
       inputRecordCounter = counters.findCounter(MAP_INPUT_RECORDS);
       inputByteCounter = counters.findCounter(MAP_INPUT_BYTES);
+      skipFailedRecIndexIterator = getRecordsToSkip().skipRangeIterator();
     }
 
     public K createKey() {
@@ -158,17 +167,39 @@
     }
      
     public synchronized boolean next(K key, V value)
-      throws IOException {
-
-      setProgress(getProgress());
-      long beforePos = getPos();
-      boolean ret = rawIn.next(key, value);
+    throws IOException {
+      boolean ret = moveToNext(key, value);
+      if(isSkipMode() && ret) {
+        long nextRecIndex = skipFailedRecIndexIterator.next();
+        long skip = nextRecIndex - recIndex;
+        for(int i=0;i<skip && ret;i++) {
+          ret = moveToNext(key, value);
+        }
+        getCounters().incrCounter(Counter.MAP_SKIPPED_RECORDS, skip);
+        reportNextRecordRange(umbilical, nextRecIndex);
+      }
       if (ret) {
         inputRecordCounter.increment(1);
-        inputByteCounter.increment(getPos() - beforePos);
+        inputByteCounter.increment(afterPos - beforePos);
       }
       return ret;
     }
+     
+    protected boolean moveToNext(K key, V value)
+      throws IOException {
+      setProgress(getProgress());
+      beforePos = getPos();
+      boolean ret = rawIn.next(key, value);
+      recIndex++;
+      afterPos = getPos();
+      return ret;
+    }
+    
+    protected void incrementCounters() {
+      inputRecordCounter.increment(1);
+      inputByteCounter.increment(afterPos - beforePos);
+    }
+    
     public long getPos() throws IOException { return rawIn.getPos(); }
     public void close() throws IOException { rawIn.close(); }
     public float getProgress() throws IOException {
@@ -218,7 +249,7 @@
       
     RecordReader rawIn =                  // open input
       job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
-    RecordReader in = new TrackedRecordReader(rawIn, getCounters());
+    RecordReader in = new TrackedRecordReader(rawIn, getCounters(), umbilical);
 
     MapRunnable runner =
       (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
Index: src/mapred/org/apache/hadoop/mapred/SortedRanges.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/SortedRanges.java	(revision 0)
+++ src/mapred/org/apache/hadoop/mapred/SortedRanges.java	(revision 0)
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Keeps the Ranges sorted by startIndex.
+ * The added ranges are always ensured to be non-overlapping.
+ * Provides the SkipRangeIterator, which skips the Ranges 
+ * stored in this object.
+ */
+public class SortedRanges implements Writable{
+  
+  public static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.mapred.SortedRanges");
+  
+  private SortedSet<Range> ranges = new TreeSet<Range>();
+  private int indicesCount;
+  
+  public Iterator<Long> skipRangeIterator(){
+    return new SkipRangeIterator();
+  }
+  
+  public int getIndicesCount() {
+    return indicesCount;
+  }
+  
+  public void add(Range range){
+    if(range.isEmpty()) {
+      return;
+    }
+    synchronized(ranges) {
+      long startIndex = range.getStartRecIndex();
+      long endIndex = range.getEndRecIndex();
+      //make sure that there are no overlapping ranges
+      SortedSet<Range> headSet = ranges.headSet(range);
+      if(headSet.size()>0) {
+        Range previousRange = headSet.last();
+        LOG.debug("previousRange "+previousRange);
+        if(startIndex<=previousRange.getEndRecIndex()) {
+          //previousRange overlaps this range
+          //remove the previousRange
+          if(ranges.remove(previousRange)) {
+            indicesCount-=previousRange.getIndicesCount();
+          }
+          //expand this range
+          startIndex = previousRange.getStartRecIndex();
+        }
+      }
+      
+      Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
+      while(tailSetIt.hasNext()) {
+        Range nextRange = tailSetIt.next();
+        LOG.debug("nextRange "+nextRange +"   startIndex:"+startIndex+"  endIndex:"+endIndex);
+        if(endIndex>=nextRange.getStartRecIndex()) {
+          //nextRange overlaps this range
+          //remove the nextRange
+          tailSetIt.remove();
+          indicesCount-=nextRange.getIndicesCount();
+          if(endIndex<nextRange.getEndRecIndex()) {
+            //expand this range
+            endIndex = nextRange.getEndRecIndex();
+            break;
+          }
+        } else {
+          break;
+        }
+      }
+      if(endIndex>=startIndex) {
+        Range recRange = new Range(startIndex,endIndex);
+        ranges.add(recRange);
+        indicesCount+=recRange.getIndicesCount();
+        LOG.debug("added "+recRange +"  count "+indicesCount);
+      }
+    }
+  }
+  
+  
+  public void readFields(DataInput in) throws IOException {
+    indicesCount = in.readInt();
+    ranges = new TreeSet<Range>();
+    int size = in.readInt();
+    for(int i=0;i<size;i++) {
+      Range range = new Range();
+      range.readFields(in);
+      ranges.add(range);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(indicesCount);
+    out.writeInt(ranges.size());
+    Iterator<Range> it = ranges.iterator();
+    while(it.hasNext()) {
+      Range range = it.next();
+      range.write(out);
+    }
+  }
+  
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    Iterator<Range> it = ranges.iterator();
+    while(it.hasNext()) {
+      Range range = it.next();
+      sb.append(range.toString()+"\n");
+    }
+    return sb.toString();
+  }
+  
+  static class Range implements Comparable<Range>, Writable{
+    public static final int NULL_INDEX = -1;
+    
+    private long startRecIndex;//inclusive
+    private long endRecIndex;//inclusive
+        
+    public Range(long startRecIndex, long endRecIndex) {
+      if(endRecIndex<startRecIndex) {
+        throw new RuntimeException("Invalid range. EndRecIndex can't be " +
+            "less than startRecIndex");
+      }
+      this.startRecIndex = startRecIndex;
+      this.endRecIndex = endRecIndex;
+    }
+    
+    public Range() {
+      this(NULL_INDEX,NULL_INDEX);
+    }
+    
+    public long getStartRecIndex() {
+      return startRecIndex;
+    }
+    
+    public long getEndRecIndex() {
+      return endRecIndex;
+    }
+    
+    public long getIndicesCount() {
+      //start and end index are inclusive
+      return endRecIndex-startRecIndex+1;
+    }
+    
+    public boolean isEmpty() {
+      return startRecIndex==NULL_INDEX &&
+      endRecIndex==NULL_INDEX;
+    }
+    
+    public boolean equals(Range o) {
+      return startRecIndex==o.startRecIndex &&
+        endRecIndex==o.endRecIndex;
+    }
+    
+    public int compareTo(Range o) {
+      if(this.startRecIndex == o.startRecIndex) {
+        return 0;
+      }
+      return (this.startRecIndex > o.startRecIndex) ? 1:-1;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      startRecIndex = in.readLong();
+      endRecIndex = in.readLong();
+    }
+
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(startRecIndex);
+      out.writeLong(endRecIndex);
+    }
+    
+    public String toString() {
+      return startRecIndex +":" +
+        endRecIndex;
+    }    
+  }
+  
+  /**
+   * Index Iterator which skips the ranges stored 
+   *
+   */
+  private class SkipRangeIterator implements Iterator<Long> {
+    Iterator<Range> rangeIterator = ranges.iterator();
+    Range range = new Range();
+    long currentIndex = -1;
+    
+    public boolean hasNext() {
+      return true;
+    }
+    
+    
+    public Long next() {
+      currentIndex++;
+      LOG.debug("currentIndex "+currentIndex +"   "+range);
+      skipIfInRange();
+      while(currentIndex>range.getEndRecIndex() && rangeIterator.hasNext()) {
+        range = rangeIterator.next();
+        skipIfInRange();
+      }
+      return currentIndex;
+    }
+    
+    private void skipIfInRange() {
+      if(currentIndex>=range.getStartRecIndex() && currentIndex<=range.getEndRecIndex()) {
+        //need to skip the range
+        LOG.debug("skipping " + currentIndex +"-" + range.getEndRecIndex());
+        currentIndex = range.getEndRecIndex() + 1;
+        
+      }
+    }
+    
+    public void remove() {
+      throw new UnsupportedOperationException("remove not supported.");
+    }
+    
+  }
+
+}
Index: src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Task_Counter.properties	(revision 677561)
+++ src/mapred/org/apache/hadoop/mapred/Task_Counter.properties	(working copy)
@@ -6,10 +6,12 @@
 MAP_INPUT_BYTES.name=          Map input bytes
 MAP_OUTPUT_RECORDS.name=       Map output records
 MAP_OUTPUT_BYTES.name=         Map output bytes
+MAP_SKIPPED_RECORDS.name=      Map skipped records
 COMBINE_INPUT_RECORDS.name=    Combine input records
 COMBINE_OUTPUT_RECORDS.name=   Combine output records
 REDUCE_INPUT_GROUPS.name=      Reduce input groups
 REDUCE_INPUT_RECORDS.name=     Reduce input records
 REDUCE_OUTPUT_RECORDS.name=    Reduce output records
+REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 
 
