Index: src/test/org/apache/hadoop/mapred/TestSortedRanges.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestSortedRanges.java	(revision 0)
+++ src/test/org/apache/hadoop/mapred/TestSortedRanges.java	(revision 0)
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.SortedRanges.Range;
+
+public class TestSortedRanges extends TestCase {
+  public static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.mapred.TestSortedRanges");
+  
+  public void testAdd() {
+    SortedRanges sr = new SortedRanges();
+    sr.add(new SortedRanges.Range(3,10));
+    assertEquals(8, sr.getIndicesCount());
+    
+    sr.add(new Range(2,7));
+    assertEquals(9, sr.getIndicesCount());
+    
+    sr.add(new Range(7,15));
+    assertEquals(14, sr.getIndicesCount());
+    
+    sr.add(new Range(31,40));
+    sr.add(new Range(51,60));
+    sr.add(new Range(66,75));
+    assertEquals(44, sr.getIndicesCount());
+    
+    sr.add(new Range(21,70));
+    assertEquals(69, sr.getIndicesCount());
+    
+    LOG.debug(sr);
+  }
+
+}
Index: src/test/org/apache/hadoop/mapred/TestBadRecords.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestBadRecords.java	(revision 0)
+++ src/test/org/apache/hadoop/mapred/TestBadRecords.java	(revision 0)
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestBadRecords extends ClusterMapReduceTestCase {
+  
+  private static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.mapred.TestBadRecords");
+  
+  private static final List<String> MAPPER_BAD_RECORDS = 
+    Arrays.asList("hello01","hello04","hello05");
+  
+  private static final List<String> REDUCER_BAD_RECORDS = 
+    Arrays.asList("hello08","hello10");
+  
+  private List<String> input;
+  
+  public TestBadRecords() {
+    input = new ArrayList<String>();
+    for(int i=1;i<=10;i++) {
+      String str = ""+i;
+      int zerosToPrepend = 2 - str.length();
+      for(int j=0;j<zerosToPrepend;j++){
+        str = "0"+str;
+      }
+      input.add("hello"+str);
+    }
+  }
+  
+  private void runMapReduce(JobConf conf, 
+      List<String> mapperBadRecords, List<String> redBadRecords) throws Exception {
+    createInput();
+    conf.setJobName("mr");
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.setInt("mapred.task.timeout", 30*1000);
+    
+    //the no of attempts to successfully complete the task depends 
+    //on the no of bad records.
+    conf.setMaxMapAttempts(SkipBadRecords.getAttemptsToKickOffSkipMode(conf)+1+mapperBadRecords.size());
+    conf.setMaxReduceAttempts(SkipBadRecords.getAttemptsToKickOffSkipMode(conf)+1+redBadRecords.size());
+    
+    FileInputFormat.setInputPaths(conf, getInputDir());
+    FileOutputFormat.setOutputPath(conf, getOutputDir());
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+    RunningJob runningJob = JobClient.runJob(conf);
+    validateOutput(conf, runningJob, mapperBadRecords, redBadRecords);
+  }
+  
+  
+  private void createInput() throws Exception {
+    OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    for(String inp : input) {
+      wr.write(inp+"\n");
+    }wr.close();
+  }
+  
+  private void validateOutput(JobConf conf, RunningJob runningJob, 
+      List<String> mapperBadRecords, List<String> redBadRecords) 
+    throws Exception{
+    LOG.info(runningJob.getCounters().toString());
+    assertTrue(runningJob.isSuccessful());
+    Path[] outputFiles = FileUtil.stat2Paths(
+        getFileSystem().listStatus(getOutputDir(),
+        new OutputLogFilter()));
+    
+    List<String> mapperOutput=getProcessed(input, mapperBadRecords);
+    LOG.debug("mapperOutput " + mapperOutput.size());
+    List<String> reducerOutput=getProcessed(mapperOutput, redBadRecords);
+    LOG.debug("reducerOutput " + reducerOutput.size());
+    
+   if (outputFiles.length > 0) {
+      InputStream is = getFileSystem().open(outputFiles[0]);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+      String line = reader.readLine();
+      int counter = 0;
+      while (line != null) {
+        counter++;
+        StringTokenizer tokeniz = new StringTokenizer(line, "\t");
+        String key = tokeniz.nextToken();
+        String value = tokeniz.nextToken();
+        LOG.debug("Output: key:"+key + "  value:"+value);
+        assertTrue(value.contains("hello"));
+        
+        
+        assertTrue(reducerOutput.contains(value));
+        line = reader.readLine();
+      }
+      reader.close();
+      assertEquals(reducerOutput.size(), counter);
+    }
+  }
+  
+  private List<String> getProcessed(List<String> inputs, List<String> badRecs) {
+    List<String> processed = new ArrayList<String>();
+    for(String input : inputs) {
+      if(!badRecs.contains(input)) {
+        processed.add(input);
+      }
+    }
+    return processed;
+  }
+  
+  public void testBadMapper() throws Exception {
+    JobConf conf = createJobConf();
+    conf.setMapperClass(BadMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    runMapReduce(conf, MAPPER_BAD_RECORDS, new ArrayList<String>());
+  }
+  
+  public void testBadReducer() throws Exception {
+    JobConf conf = createJobConf();
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(BadReducer.class);
+    runMapReduce(conf, new ArrayList<String>(),REDUCER_BAD_RECORDS);
+  }
+  
+  public void testBadMapRed() throws Exception {
+    JobConf conf = createJobConf();
+    conf.setMapperClass(BadMapper.class);
+    conf.setReducerClass(BadReducer.class);
+    runMapReduce(conf, MAPPER_BAD_RECORDS, REDUCER_BAD_RECORDS);
+  }
+  
+    
+  static class BadMapper extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
+    
+    public void map(LongWritable key, Text val,
+        OutputCollector<LongWritable, Text> output, Reporter reporter)
+        throws IOException {
+      String str = val.toString();
+      LOG.debug("MAP key:" +key +"  value:" + str);
+      if(MAPPER_BAD_RECORDS.get(0).equals(str)) {
+        LOG.warn("MAP Encountered BAD record");
+        System.exit(-1);
+      }
+      else if(MAPPER_BAD_RECORDS.get(1).equals(str)) {
+        LOG.warn("MAP Encountered BAD record");
+        throw new RuntimeException("Bad record "+str);
+      }
+      else if(MAPPER_BAD_RECORDS.get(2).equals(str)) {
+        try {
+          LOG.warn("MAP Encountered BAD record");
+          Thread.sleep(15*60*1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      output.collect(key, val);
+    }
+  }
+  
+  static class BadReducer extends MapReduceBase implements Reducer<LongWritable, Text, LongWritable, Text> {
+    
+    public void reduce(LongWritable key, Iterator<Text> values,
+        OutputCollector<LongWritable, Text> output, Reporter reporter)
+        throws IOException {
+      while(values.hasNext()) {
+        Text value = values.next();
+        LOG.debug("REDUCE key:" +key +"  value:" + value);
+        if(REDUCER_BAD_RECORDS.get(0).equals(value.toString())) {
+          LOG.warn("REDUCE Encountered BAD record");
+          System.exit(-1);
+        }
+        else if(REDUCER_BAD_RECORDS.get(1).equals(value.toString())) {
+          try {
+            LOG.warn("REDUCE Encountered BAD record");
+            Thread.sleep(15*60*1000);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+        output.collect(key, value);
+      }
+      
+    }
+  }
+  
+
+}
Index: src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
===================================================================
--- src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java	(revision 0)
+++ src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java	(revision 0)
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SkipBadRecords;
+
+public class TestStreamingBadRecords extends ClusterMapReduceTestCase
+{
+
+  private static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.mapred.TestStreamingBadRecords");
+  
+  private static final List<String> MAPPER_BAD_RECORDS = 
+    Arrays.asList("hey023","hey055","hey099");
+  
+  private static final List<String> REDUCER_BAD_RECORDS = 
+    Arrays.asList("hey001","hey024");
+  
+  private static final String mapper = StreamUtil.makeJavaCommand(App.class, new String[]{});
+  private static final String badMapper = StreamUtil.makeJavaCommand(BadApp.class, new String[]{});
+  private static final String reducer = StreamUtil.makeJavaCommand(App.class, new String[]{"true"});
+  private static final String badReducer = StreamUtil.makeJavaCommand(BadApp.class, new String[]{"true"});
+  
+  public TestStreamingBadRecords() throws IOException
+  {
+    UtilTest utilTest = new UtilTest(getClass().getName());
+    utilTest.checkUserDir();
+    utilTest.redirectIfAntJunit();
+  }
+  
+  public void runStreamingJob(String mapperApp, String reducerApp, 
+      List<String> mapperBadRecs, List<String> reducerBadRecs) throws Exception {
+    createInput();
+    StreamJob job = new StreamJob(genArgs(mapperApp,reducerApp,mapperBadRecs, reducerBadRecs), 
+        false);      
+    job.go();
+    validateOutput(job.running_, mapperBadRecs, reducerBadRecs);
+  }
+
+  private void createInput() throws Exception {
+    OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    //increasing the record size so that we have stream flushing
+    String prefix = new String(new byte[20*1024]);
+    for(int i=1;i<=100;i++) {
+      String str = ""+i;
+      int zerosToPrepend = 3 - str.length();
+      for(int j=0;j<zerosToPrepend;j++){
+        str = "0"+str;
+      }
+      wr.write(prefix + "hey"+str+"\n");
+    }wr.close();
+  }
+  
+  private void validateOutput(RunningJob runningJob, 
+      List<String> mapperBadRecs, List<String> reducerBadRecs) 
+    throws Exception {
+    LOG.info(runningJob.getCounters().toString());
+    assertTrue(runningJob.isSuccessful());
+    List<String> badRecs = new ArrayList<String>();
+    badRecs.addAll(mapperBadRecs);
+    badRecs.addAll(reducerBadRecs);
+    Path[] outputFiles = FileUtil.stat2Paths(
+        getFileSystem().listStatus(getOutputDir(),
+        new OutputLogFilter()));
+    
+    if (outputFiles.length > 0) {
+      InputStream is = getFileSystem().open(outputFiles[0]);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+      String line = reader.readLine();
+      int counter = 0;
+      while (line != null) {
+        counter++;
+        StringTokenizer tokeniz = new StringTokenizer(line, "\t");
+        String value = tokeniz.nextToken();
+        int index = value.indexOf("hey");
+        assertTrue(index>-1);
+        if(index>-1) {
+          String heyStr = value.substring(index);
+          assertTrue(!badRecs.contains(heyStr));
+        }
+        
+        line = reader.readLine();
+      }
+      reader.close();
+    }
+  }
+
+  private String[] genArgs(String mapperApp, String reducerApp, 
+      List<String> mapperBadRecs, List<String> reducerBadRecs) {
+    JobConf clusterConf = createJobConf();
+    //the no of attempts to successfully complete the task depends 
+    //on the no of bad records.
+    int mapperAttempts = SkipBadRecords.getAttemptsToKickOffSkipMode(clusterConf)
+                          +1+mapperBadRecs.size();
+    int reducerAttempts = SkipBadRecords.getAttemptsToKickOffSkipMode(clusterConf)
+                           +1+reducerBadRecs.size();
+    
+    return new String[] {
+      "-input", (new Path(getInputDir(), "text.txt")).toString(),
+      "-output", getOutputDir().toString(),
+      "-mapper", mapperApp,
+      "-reducer", reducerApp,
+      "-verbose",
+      "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
+      "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
+      "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
+      "-jobconf", "mapred.map.tasks=1",
+      "-jobconf", "mapred.reduce.tasks=1",
+      "-jobconf", "mapred.task.timeout=30000",
+      "-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
+      "-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
+      "-jobconf", "mapred.job.tracker.http.address="+clusterConf.get("mapred.job.tracker.http.address"),
+      "-jobconf", "stream.debug=set",
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+  }
+  
+  public void testBadMapper() throws Exception {
+    runStreamingJob(badMapper, reducer, MAPPER_BAD_RECORDS, new ArrayList<String>());
+  }
+  
+  public void testBadReducer() throws Exception {
+    runStreamingJob(mapper, badReducer, new ArrayList<String>(), REDUCER_BAD_RECORDS);
+  }
+  
+  public void testBadMapperReducer() throws Exception {
+    runStreamingJob(badMapper, badReducer, MAPPER_BAD_RECORDS, REDUCER_BAD_RECORDS);
+  }
+  
+  static class App{
+    boolean isReducer;
+    
+    public App(String[] args) throws Exception{
+      if(args.length>0) {
+        isReducer = Boolean.parseBoolean(args[0]);
+      }
+      String counter = Counters.Application.MAP_PROCESSED_RECORDS;
+      if(isReducer) {
+        counter = Counters.Application.REDUCE_PROCESSED_RECORDS;
+      }
+      BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+      String line;
+
+      while ((line = in.readLine()) != null) {
+        processLine(line);
+        System.err.println("reporter:counter:"+Counters.Application.GROUP+","+
+            counter+",1");
+      }
+    }
+    
+    protected void processLine(String line) throws Exception{
+      System.out.println(line);
+    }
+    
+    
+    public static void main(String[] args) throws Exception{
+      new App(args);
+    }
+  }
+  
+  static class BadApp extends App{
+    
+    public BadApp(String[] args) throws Exception {
+      super(args);
+    }
+
+    protected void processLine(String line) throws Exception {
+      List<String> badRecords = MAPPER_BAD_RECORDS;
+      if(isReducer) {
+        badRecords = REDUCER_BAD_RECORDS;
+      }
+      if(badRecords.size()>0 && line.contains(badRecords.get(0))) {
+        LOG.warn("Encountered BAD record");
+        System.exit(-1);
+      }
+      else if(badRecords.size()>1 && line.contains(badRecords.get(1))) {
+        LOG.warn("Encountered BAD record");
+        throw new Exception("Got bad record..crashing");
+      }
+      else if(badRecords.size()>2 && line.contains(badRecords.get(2))) {
+        LOG.warn("Encountered BAD record");
+        Thread.sleep(15*60*1000);
+      }
+      super.processLine(line);
+    }
+    
+    public static void main(String[] args) throws Exception{
+      new BadApp(args);
+    }
+  }
+  
+  
+
+}
Index: src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
===================================================================
--- src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java	(revision 679748)
+++ src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java	(working copy)
@@ -25,6 +25,7 @@
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.SkipBadRecords;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.StringUtils;
 
@@ -34,6 +35,7 @@
 public class PipeMapper extends PipeMapRed implements Mapper {
 
   private boolean ignoreKey = false;
+  private boolean isSkipModeKickedOff = false;
 
   private byte[] mapOutputFieldSeparator;
   private byte[] mapInputFieldSeparator;
@@ -59,6 +61,10 @@
   
   public void configure(JobConf job) {
     super.configure(job);
+    //disable the auto increment of the counter. For streaming, no of processed records could be 
+    //different(equal or less) than the no of records input.
+    SkipBadRecords.setAutoIncrMapperProcessedRecords(job, false);
+    isSkipModeKickedOff = job.getBoolean("skipmode.kicked.off", false);
     String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
     ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
 
@@ -101,6 +107,11 @@
         }
         write(value);
         clientOut_.write('\n');
+        if(isSkipModeKickedOff) {
+          //flush the streams on every record input if running in skip mode
+          //so that we don't buffer other records surrounding a bad record. 
+          clientOut_.flush();
+        }
       } else {
         numRecSkipped_++;
       }
Index: src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
===================================================================
--- src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java	(revision 679748)
+++ src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java	(working copy)
@@ -27,6 +27,7 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.SkipBadRecords;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Writable;
@@ -39,6 +40,7 @@
   private byte[] reduceOutFieldSeparator;
   private byte[] reduceInputFieldSeparator;
   private int numOfReduceOutputKeyFields = 1;
+  private boolean isSkipModeKickedOff = false;
   
   String getPipeCommand(JobConf job) {
     String str = job.get("stream.reduce.streamprocessor");
@@ -61,6 +63,10 @@
 
   public void configure(JobConf job) {
     super.configure(job);
+    //disable the auto increment of the counter. For streaming, no of processed records could be 
+    //different(equal or less) than the no of records input.
+    SkipBadRecords.setAutoIncrReducerProcessedRecords(job, false);
+    isSkipModeKickedOff = job.getBoolean("skipmode.kicked.off", false);
 
     try {
       reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
@@ -99,6 +105,11 @@
           output.collect(key, val);
         }
       }
+      if(doPipe_ && isSkipModeKickedOff) {
+        //flush the streams on every record input if running in skip mode
+        //so that we don't buffer other records surrounding a bad record. 
+        clientOut_.flush();
+      }
     } catch (IOException io) {
       // a common reason to get here is failure of the subprocess.
       // Document that fact, if possible.
Index: src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/pipes/PipesMapRunner.java	(working copy)
@@ -27,6 +27,7 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SkipBadRecords;
 
 /**
  * An adaptor to run a C++ mapper.
@@ -42,6 +43,9 @@
    */
   public void configure(JobConf job) {
     this.job = job;
+    //disable the auto increment of the counter. For pipes, no of processed records could be 
+    //different(equal or less) than the no of records input.
+    SkipBadRecords.setAutoIncrMapperProcessedRecords(job, false);
   }
 
   /**
@@ -64,6 +68,7 @@
     boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
     downlink.runMap(reporter.getInputSplit(), 
                     job.getNumReduceTasks(), isJavaInput);
+    boolean isSkipModeKickedOff = job.getBoolean("skipmode.kicked.off", false);
     try {
       if (isJavaInput) {
         // allocate key & value instances that are re-used for all entries
@@ -75,6 +80,11 @@
         while (input.next(key, value)) {
           // map pair to output
           downlink.mapItem(key, value);
+          if(isSkipModeKickedOff) {
+            //flush the streams on every record input if running in skip mode
+            //so that we don't buffer other records surrounding a bad record.
+            downlink.flush();
+          }
         }
         downlink.endOfInput();
       }
Index: src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/pipes/PipesReducer.java	(working copy)
@@ -26,6 +26,7 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SkipBadRecords;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -41,9 +42,14 @@
   private Application<K2, V2, K3, V3> application = null;
   private DownwardProtocol<K2, V2> downlink = null;
   private boolean isOk = true;
+  private boolean isSkipModeKickedOff = false;
 
   public void configure(JobConf job) {
     this.job = job;
+    //disable the auto increment of the counter. For pipes, no of processed records could be 
+    //different(equal or less) than the no of records input.
+    SkipBadRecords.setAutoIncrReducerProcessedRecords(job, false);
+    isSkipModeKickedOff = job.getBoolean("skipmode.kicked.off", false);
   }
 
   /**
@@ -59,6 +65,11 @@
     while (values.hasNext()) {
       downlink.reduceValue(values.next());
     }
+    if(isSkipModeKickedOff) {
+      //flush the streams on every record input if running in skip mode
+      //so that we don't buffer other records surrounding a bad record.
+      downlink.flush();
+    }
     isOk = true;
   }
 
Index: src/mapred/org/apache/hadoop/mapred/Task.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Task.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/Task.java	(working copy)
@@ -64,13 +64,15 @@
   protected static enum Counter { 
     MAP_INPUT_RECORDS, 
     MAP_OUTPUT_RECORDS,
+    MAP_SKIPPED_RECORDS,
     MAP_INPUT_BYTES, 
     MAP_OUTPUT_BYTES,
     COMBINE_INPUT_RECORDS,
     COMBINE_OUTPUT_RECORDS,
     REDUCE_INPUT_GROUPS,
     REDUCE_INPUT_RECORDS,
-    REDUCE_OUTPUT_RECORDS
+    REDUCE_OUTPUT_RECORDS,
+    REDUCE_SKIPPED_RECORDS
   }
   
   /**
@@ -108,6 +110,10 @@
   private int partition;                          // id within job
   TaskStatus taskStatus; 										      // current status of the task
   private Path taskOutputPath;                    // task-specific output dir
+  private SortedRanges recordsToSkip = new SortedRanges(); //failed ranges from previous attempts
+  private boolean skipModeKickedOff = false;
+  private volatile long currentRecStartIndex; //currently processing record start index
+  private Iterator<Long> currentRecIndexIterator = recordsToSkip.skipRangeIterator();
   
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
@@ -175,7 +181,23 @@
   protected synchronized void setPhase(TaskStatus.Phase phase){
     this.taskStatus.setPhase(phase); 
   }
+  
+  public SortedRanges getRecordsToSkip() {
+    return recordsToSkip;
+  }
 
+  public void setRecordsToSkip(SortedRanges recordsToSkip) {
+    this.recordsToSkip = recordsToSkip;
+  }
+
+  public boolean isSkipModeKickedOff() {
+    return skipModeKickedOff;
+  }
+
+  public void setSkipModeKickedOff(boolean skipModeKickedOff) {
+    this.skipModeKickedOff = skipModeKickedOff;
+  }
+
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -190,6 +212,8 @@
       Text.writeString(out, "");
     }
     taskStatus.write(out);
+    recordsToSkip.write(out);
+    out.writeBoolean(skipModeKickedOff);
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
@@ -203,6 +227,10 @@
     }
     taskStatus.readFields(in);
     this.mapOutputFile.setJobId(taskId.getJobID()); 
+    recordsToSkip.readFields(in);
+    currentRecIndexIterator = recordsToSkip.skipRangeIterator();
+    currentRecStartIndex = currentRecIndexIterator.next();
+    skipModeKickedOff = in.readBoolean();
   }
 
   @Override
@@ -370,6 +398,17 @@
           if (counters != null) {
             counters.incrCounter(group, counter, amount);
           }
+          if(Counters.Application.GROUP.equals(group) && (
+              Counters.Application.MAP_PROCESSED_RECORDS.equals(counter) ||
+              Counters.Application.REDUCE_PROCESSED_RECORDS.equals(counter))) {
+            //if application reports the processed records, move the currentRecStartIndex
+            //to the next.
+            //currentRecStartIndex is the start index which has not yet been finished and is 
+            //still in task's stomach.
+            for(int i=0;i<amount;i++) {
+              currentRecStartIndex = currentRecIndexIterator.next();
+            }
+          }
           setProgressFlag();
         }
         public InputSplit getInputSplit() throws UnsupportedOperationException {
@@ -377,6 +416,23 @@
         }
       };
   }
+  
+  /**
+   *  Reports the next executing record range to TaskTracker.
+   *  
+   * @param umbilical
+   * @param nextRecIndex the record index which would be fed next.
+   * @throws IOException
+   */
+  protected void reportNextRecordRange(final TaskUmbilicalProtocol umbilical, 
+      long nextRecIndex) throws IOException{
+    //currentRecStartIndex is the start index which has not yet been finished and is 
+    //still in task's stomach.
+    SortedRanges.Range range = new SortedRanges.Range(currentRecStartIndex, nextRecIndex);
+    taskStatus.setNextRecordRange(range);
+    LOG.debug("sending reportNextRecordRange " + range);
+    umbilical.reportNextRecordRange(taskId, range);
+  }
 
   public void setProgress(float progress) {
     taskProgress.set(progress);
Index: src/mapred/org/apache/hadoop/mapred/MapRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/MapRunner.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/MapRunner.java	(working copy)
@@ -27,11 +27,13 @@
     implements MapRunnable<K1, V1, K2, V2> {
   
   private Mapper<K1, V1, K2, V2> mapper;
+  private boolean autoIncrProcessedRecord;
 
   @SuppressWarnings("unchecked")
   public void configure(JobConf job) {
     this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
                                                       job);
+    this.autoIncrProcessedRecord = SkipBadRecords.getAutoIncrMapperProcessedRecords(job);
   }
 
   public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
@@ -45,6 +47,10 @@
       while (input.next(key, value)) {
         // map pair to output
         mapper.map(key, value, output, reporter);
+        if(autoIncrProcessedRecord) {
+          reporter.incrCounter(Counters.Application.GROUP, 
+              Counters.Application.MAP_PROCESSED_RECORDS, 1);
+        }
       }
     } finally {
       mapper.close();
Index: src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java	(working copy)
@@ -277,6 +277,11 @@
     public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
       // Ignore for now
     }
+    
+    public void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range)
+    throws IOException {
+      LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
+    }
 
     public boolean ping(TaskAttemptID taskid) throws IOException {
       return true;
Index: src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java	(working copy)
@@ -44,6 +44,7 @@
    * version 12 changes the counters representation for HADOOP-1915
    * version 13 added call getBuildVersion() for HADOOP-236
    * Version 14: replaced getFilesystemName with getSystemDir for HADOOP-3135
+   * Version 15: Changed format of Task and TaskStatus for HADOOP-153
    */
   public static final long versionID = 14L;
   
Index: src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java	(revision 0)
+++ src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java	(revision 0)
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Utility class for skip bad records functionality. It contains various 
+ * settings related to skipping of bad records.
+ */
+public class SkipBadRecords {
+  
+  /**
+   * Is skipping of bad records enabled. If it is enabled 
+   * the framework will try to find bad records and skip  
+   * them on further attempts.
+   * 
+   * @param conf the configuration
+   * @return <code>true</code> if skipping is enabled
+   *         <code>false</code> otherwise.
+   */
+  public static boolean getSkipModeEnabled(Configuration conf) {
+    return conf.getBoolean("skip.mode.enabled", true);
+  }
+  
+  /**
+   * Set whether to enable skipping of bad records. If it is enabled 
+   * the framework will try to find bad records and will 
+   * try to skip them on further attempts.
+   * 
+   * @param conf the configuration
+   * @param skipModeEnabled boolean to enable/disable skipping 
+   */
+  public static void setSkipModeEnabled(Configuration conf, boolean skipModeEnabled) {
+    conf.setBoolean("skip.mode.enabled", skipModeEnabled);
+  }
+
+  /**
+   * Get the number of Task attempts AFTER which skip mode 
+   * will be kicked off. When skip mode is kicked off, the 
+   * tasks reports the range of records which it will process 
+   * next to the TaskTracker. So that on failures, TT knows which 
+   * ones are possibly the bad records. On further executions, 
+   * those are skipped.
+   * 
+   * @param conf the configuration
+   * @return attemptsToKickOffSkipMode no of task attempts
+   */
+  public static int getAttemptsToKickOffSkipMode(Configuration conf) {
+    return conf.getInt("attempts.to.kick.Off.skip.mode", 2);
+  }
+
+  /**
+   * Set the number of Task attempts AFTER which skip mode 
+   * will be kicked off. When skip mode is kicked off, the 
+   * tasks reports the range of records which it will process 
+   * next to the TaskTracker. So that on failures, TT knows which 
+   * ones are possibly the bad records. On further executions, 
+   * those are skipped.
+   * 
+   * @param conf the configuration
+   * @param attemptsToKickOffSkipMode no of task attempts
+   */
+  public static void setAttemptsToKickOffSkipMode(Configuration conf, 
+      int attemptsToKickOffSkipMode) {
+    conf.setInt("attempts.to.kick.Off.skip.mode", attemptsToKickOffSkipMode);
+  }
+
+  /**
+   * Get the flag which if set to true, 
+   * Counters.Application.MAP_PROCESSED_RECORDS is incremented 
+   * by MapRunner after invoking the map function. This value must be set to 
+   * false for applications which process the records asynchronously 
+   * or buffer the input records. For example streaming. 
+   * In such cases applications should increment this counter on their own.
+   * 
+   * @param conf the configuration
+   * @return <code>true</code> if auto increment 
+   *                           Counters.Application.MAP_PROCESSED_RECORDS.
+   *         <code>false</code> otherwise.
+   */
+  public static boolean getAutoIncrMapperProcessedRecords(Configuration conf) {
+    return conf.getBoolean("mapred.map.auto.incr.mapper.processed.records", true);
+  }
+  
+  /**
+   * Set the flag which if set to true, 
+   * Counters.Application.MAP_PROCESSED_RECORDS is incremented 
+   * by MapRunner after invoking the map function. This value must be set to 
+   * false for applications which process the records asynchronously 
+   * or buffer the input records. For example streaming. 
+   * In such cases applications should increment this counter on their own.
+   * 
+   * @param conf the configuration
+   * @param autoIncr whether to auto increment 
+   *        Counters.Application.MAP_PROCESSED_RECORDS.
+   */
+  public static void setAutoIncrMapperProcessedRecords(Configuration conf, 
+      boolean autoIncr) {
+    conf.setBoolean("mapred.map.auto.incr.mapper.processed.records", autoIncr);
+  }
+  
+  /**
+   * Get the flag which if set to true, 
+   * Counters.Application.REDUCE_PROCESSED_RECORDS is incremented 
+   * by framework after invoking the reduce function. This value must be set to 
+   * false for applications which process the records asynchronously 
+   * or buffer the input records. For example streaming. 
+   * In such cases applications should increment this counter on their own.
+   * 
+   * @param conf the configuration
+   * @return <code>true</code> if auto increment 
+   *                           Counters.Application.REDUCE_PROCESSED_RECORDS.
+   *         <code>false</code> otherwise.
+   */
+  public static boolean getAutoIncrReducerProcessedRecords(Configuration conf) {
+    return conf.getBoolean("mapred.map.auto.incr.reducer.processed.records", true);
+  }
+  
+  /**
+   * Set the flag which if set to true, 
+   * Counters.Application.REDUCE_PROCESSED_RECORDS is incremented 
+   * by framework after invoking the reduce function. This value must be set to 
+   * false for applications which process the records asynchronously 
+   * or buffer the input records. For example streaming. 
+   * In such cases applications should increment this counter on their own.
+   * 
+   * @param conf the configuration
+   * @param autoIncr whether to auto increment 
+   *        Counters.Application.REDUCE_PROCESSED_RECORDS.
+   */
+  public static void setAutoIncrReducerProcessedRecords(Configuration conf, 
+      boolean autoIncr) {
+    conf.setBoolean("mapred.map.auto.incr.reducer.processed.records", autoIncr);
+  }
+  
+}
Index: src/mapred/org/apache/hadoop/mapred/Counters.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Counters.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/Counters.java	(working copy)
@@ -493,4 +493,13 @@
     }
     return buffer.toString();
   }
+  
+  public static class Application {
+    //special counters which are written by the application and are 
+    //used by the framework.
+    public static final String GROUP = "ApplicationCounters";
+    public static final String MAP_PROCESSED_RECORDS = "MapProcessedRecords";
+    public static final String REDUCE_PROCESSED_RECORDS = "ReduceProcessedRecords";
+    
+  }
 }
Index: src/mapred/org/apache/hadoop/mapred/TaskStatus.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskStatus.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/TaskStatus.java	(working copy)
@@ -56,6 +56,7 @@
   private Phase phase = Phase.STARTING; 
   private Counters counters;
   private boolean includeCounters;
+  private SortedRanges.Range nextRecordRange = new SortedRanges.Range();
 
   public TaskStatus() {}
 
@@ -89,6 +90,15 @@
   }
   public String getStateString() { return stateString; }
   public void setStateString(String stateString) { this.stateString = stateString; }
+  
+  public SortedRanges.Range getNextRecordRange() {
+    return nextRecordRange;
+  }
+
+  public void setNextRecordRange(SortedRanges.Range nextRecordRange) {
+    this.nextRecordRange = nextRecordRange;
+  }
+  
   /**
    * Get task finish time. if shuffleFinishTime and sortFinishTime 
    * are not set before, these are set to finishTime. It takes care of 
@@ -247,6 +257,7 @@
     this.progress = status.getProgress();
     this.runState = status.getRunState();
     this.stateString = status.getStateString();
+    this.nextRecordRange = status.getNextRecordRange();
 
     setDiagnosticInfo(status.getDiagnosticInfo());
     
@@ -297,6 +308,7 @@
     if (includeCounters) {
       counters.write(out);
     }
+    nextRecordRange.write(out);
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -313,6 +325,7 @@
     if (includeCounters) {
       counters.readFields(in);
     }
+    nextRecordRange.readFields(in);
   }
   
   //////////////////////////////////////////////////////////////////////////////
Index: src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java	(working copy)
@@ -43,9 +43,11 @@
    * Version 8 changes {job|tip|task}id's to use their corresponding 
    * objects rather than strings.
    * Version 9 changes the counter representation for HADOOP-1915
+   * Version 10 changed the TaskStatus format and added reportNextRecordRange
+   *            for HADOOP-153
    * */
 
-  public static final long versionID = 9L;
+  public static final long versionID = 10L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(TaskAttemptID taskid) throws IOException;
@@ -68,6 +70,15 @@
    *  @param trace the text to report
    */
   void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException;
+  
+  /**
+   * Report the record range which is going to process next by the Task.
+   * @param taskid the id of the task involved
+   * @param range the range of record sequence nos
+   * @throws IOException
+   */
+  void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) 
+    throws IOException;
 
   /** Periodically called by child to check if parent is still alive. 
    * @return True if the task is known
Index: src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskInProgress.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/TaskInProgress.java	(working copy)
@@ -77,6 +77,8 @@
   private int completes = 0;
   private boolean failed = false;
   private boolean killed = false;
+  private volatile SortedRanges failedRecords = new SortedRanges();
+  private volatile boolean skipModeKickedOff = false;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -485,6 +487,12 @@
     if (taskState == TaskStatus.State.FAILED) {
       numTaskFailures++;
       machinesWhereFailed.add(trackerHostName);
+      LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
+      failedRecords.add(status.getNextRecordRange());
+      if(SkipBadRecords.getSkipModeEnabled(conf) && 
+          numTaskFailures>=SkipBadRecords.getAttemptsToKickOffSkipMode(conf)) {
+        skipModeKickedOff = true;
+      }
     } else {
       numKilledTasks++;
     }
@@ -677,7 +685,7 @@
     // in more depth eventually...
     //
       
-    if (activeTasks.size() <= MAX_TASK_EXECS &&
+    if (!skipModeKickedOff && activeTasks.size() <= MAX_TASK_EXECS &&
         (averageProgress - progress >= SPECULATIVE_GAP) &&
         (currentTime - startTime >= SPECULATIVE_LAG) 
         && completes == 0 && !isOnlyCommitPending()) {
@@ -709,12 +717,15 @@
     }
 
     if (isMapTask()) {
+      LOG.debug("attemdpt "+  numTaskFailures   +" sending skippedRecords "+failedRecords.getIndicesCount());
       t = new MapTask(jobFile, taskid, partition, 
           rawSplit.getClassName(), rawSplit.getBytes());
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
     t.setConf(conf);
+    t.setRecordsToSkip(failedRecords);
+    t.setSkipModeKickedOff(skipModeKickedOff);
     tasks.put(taskid, t);
 
     activeTasks.put(taskid, taskTracker);
Index: src/mapred/org/apache/hadoop/mapred/TaskTracker.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(working copy)
@@ -1585,6 +1585,10 @@
     public synchronized void reportDiagnosticInfo(String info) {
       this.diagnosticInfo.append(info);
     }
+    
+    public synchronized void reportNextRecordRange(SortedRanges.Range range) {
+      this.taskStatus.setNextRecordRange(range);
+    }
 
     /**
      * The task is reporting that it's done running
@@ -1988,6 +1992,16 @@
       LOG.warn("Error from unknown child task: "+taskid+". Ignored.");
     }
   }
+  
+  public synchronized void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) 
+    throws IOException {
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      tip.reportNextRecordRange(range);
+    } else {
+      LOG.warn("reportNextRecordRange from unknown child task: "+taskid+". Ignored.");
+    }
+  }
 
   /** Child checking to see if we're alive.  Normally does nothing.*/
   public synchronized boolean ping(TaskAttemptID taskid) throws IOException {
Index: src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java	(working copy)
@@ -19,12 +19,14 @@
 package org.apache.hadoop.mapred.lib;
 
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.MapRunnable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SkipBadRecords;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -58,6 +60,7 @@
   private ExecutorService executorService;
   private volatile IOException ioException;
   private volatile RuntimeException runtimeException;
+  private boolean autoIncrProcessedRecord;
 
   @SuppressWarnings("unchecked")
   public void configure(JobConf jobConf) {
@@ -69,6 +72,8 @@
     }
 
     this.job = jobConf;
+    this.autoIncrProcessedRecord = 
+      SkipBadRecords.getAutoIncrMapperProcessedRecords(job);
     this.mapper = (Mapper)ReflectionUtils.newInstance(jobConf.getMapperClass(),
         jobConf);
 
@@ -222,6 +227,10 @@
       try {
         // map pair to output
         MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
+        if(autoIncrProcessedRecord) {
+          reporter.incrCounter(Counters.Application.GROUP, 
+              Counters.Application.MAP_PROCESSED_RECORDS, 1);
+        }
       } catch (IOException ex) {
         // If there is an IOException during the call it is set in an instance
         // variable of the MultithreadedMapRunner from where it will be
Index: src/mapred/org/apache/hadoop/mapred/ReduceTask.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/ReduceTask.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/ReduceTask.java	(working copy)
@@ -232,11 +232,46 @@
       reporter.progress();
     }
   }
+  
+  private class SkippingReduceValuesIterator<KEY,VALUE> 
+    extends ReduceValuesIterator<KEY,VALUE> {
+    private Iterator<Long> skipFailedRecIndexIterator;
+    private TaskUmbilicalProtocol umbilical;
+    private long recIndex = -1;
+    
+    public SkippingReduceValuesIterator(RawKeyValueIterator in,
+        RawComparator<KEY> comparator, Class<KEY> keyClass,
+        Class<VALUE> valClass, Configuration conf, Progressable reporter,
+        TaskUmbilicalProtocol umbilical) throws IOException {
+      super(in, comparator, keyClass, valClass, conf, reporter);
+      this.umbilical = umbilical;
+      skipFailedRecIndexIterator = getRecordsToSkip().skipRangeIterator();
+      mayBeSkip();
+    }
+    
+    void nextKey() throws IOException {
+      super.nextKey();
+      mayBeSkip();
+    }
+    
+    private void mayBeSkip() throws IOException {
+      recIndex++;
+      long nextRecIndex = skipFailedRecIndexIterator.next();
+      long skip = nextRecIndex - recIndex;
+      for(int i=0;i<skip && super.more();i++) {
+        super.nextKey();
+        recIndex++;
+      }
+      getCounters().incrCounter(Counter.REDUCE_SKIPPED_RECORDS, skip);
+      reportNextRecordRange(umbilical, nextRecIndex);
+    }
+  }
 
   @Override
   @SuppressWarnings("unchecked")
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
+    job.setBoolean("skipmode.kicked.off", isSkipModeKickedOff());
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
                                                            job.getReducerClass(), job);
 
@@ -308,14 +343,24 @@
     try {
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
+      boolean autoIncrRecordsProcessed = 
+        SkipBadRecords.getAutoIncrReducerProcessedRecords(job);
       
-      ReduceValuesIterator values = new ReduceValuesIterator(rIter, 
+      ReduceValuesIterator values = isSkipModeKickedOff() ? 
+          new SkippingReduceValuesIterator(rIter, 
+              job.getOutputValueGroupingComparator(), keyClass, valClass, 
+              job, reporter, umbilical) :
+          new ReduceValuesIterator(rIter, 
           job.getOutputValueGroupingComparator(), keyClass, valClass, 
           job, reporter);
       values.informReduceProgress();
       while (values.more()) {
         reduceInputKeyCounter.increment(1);
         reducer.reduce(values.getKey(), values, collector, reporter);
+        if(autoIncrRecordsProcessed) {
+          reporter.incrCounter(Counters.Application.GROUP, 
+              Counters.Application.REDUCE_PROCESSED_RECORDS, 1);
+        }
         values.nextKey();
         values.informReduceProgress();
       }
Index: src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/IsolationRunner.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/IsolationRunner.java	(working copy)
@@ -89,6 +89,11 @@
                                                         int fromEventId, int maxLocs) throws IOException {
       return TaskCompletionEvent.EMPTY_ARRAY;
     }
+
+    public void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range)
+        throws IOException {
+      LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
+    }
   }
   
   private static ClassLoader makeClassLoader(JobConf conf, 
Index: src/mapred/org/apache/hadoop/mapred/MapTask.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/MapTask.java	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/MapTask.java	(working copy)
@@ -31,6 +31,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -142,11 +143,19 @@
     private RecordReader<K,V> rawIn;
     private Counters.Counter inputByteCounter;
     private Counters.Counter inputRecordCounter;
+    private Iterator<Long> skipFailedRecIndexIterator;
+    private TaskUmbilicalProtocol umbilical;
+    private long recIndex = -1;
+    private long beforePos = -1;
+    private long afterPos = -1;
     
-    TrackedRecordReader(RecordReader<K,V> raw, Counters counters) {
+    TrackedRecordReader(RecordReader<K,V> raw, Counters counters, 
+        TaskUmbilicalProtocol umbilical) {
       rawIn = raw;
+      this.umbilical = umbilical;
       inputRecordCounter = counters.findCounter(MAP_INPUT_RECORDS);
       inputByteCounter = counters.findCounter(MAP_INPUT_BYTES);
+      skipFailedRecIndexIterator = getRecordsToSkip().skipRangeIterator();
     }
 
     public K createKey() {
@@ -158,17 +167,39 @@
     }
      
     public synchronized boolean next(K key, V value)
-      throws IOException {
-
-      setProgress(getProgress());
-      long beforePos = getPos();
-      boolean ret = rawIn.next(key, value);
+    throws IOException {
+      boolean ret = moveToNext(key, value);
+      if(isSkipModeKickedOff() && ret) {
+        long nextRecIndex = skipFailedRecIndexIterator.next();
+        long skip = nextRecIndex - recIndex;
+        for(int i=0;i<skip && ret;i++) {
+          ret = moveToNext(key, value);
+        }
+        getCounters().incrCounter(Counter.MAP_SKIPPED_RECORDS, skip);
+        reportNextRecordRange(umbilical, nextRecIndex);
+      }
       if (ret) {
         inputRecordCounter.increment(1);
-        inputByteCounter.increment(getPos() - beforePos);
+        inputByteCounter.increment(afterPos - beforePos);
       }
       return ret;
     }
+     
+    protected boolean moveToNext(K key, V value)
+      throws IOException {
+      setProgress(getProgress());
+      beforePos = getPos();
+      boolean ret = rawIn.next(key, value);
+      recIndex++;
+      afterPos = getPos();
+      return ret;
+    }
+    
+    protected void incrementCounters() {
+      inputRecordCounter.increment(1);
+      inputByteCounter.increment(afterPos - beforePos);
+    }
+    
     public long getPos() throws IOException { return rawIn.getPos(); }
     public void close() throws IOException { rawIn.close(); }
     public float getProgress() throws IOException {
@@ -218,7 +249,8 @@
       
     RecordReader rawIn =                  // open input
       job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
-    RecordReader in = new TrackedRecordReader(rawIn, getCounters());
+    RecordReader in = new TrackedRecordReader(rawIn, getCounters(), umbilical);
+    job.setBoolean("skipmode.kicked.off", isSkipModeKickedOff());
 
     MapRunnable runner =
       (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
Index: src/mapred/org/apache/hadoop/mapred/SortedRanges.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/SortedRanges.java	(revision 0)
+++ src/mapred/org/apache/hadoop/mapred/SortedRanges.java	(revision 0)
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Keeps the Ranges sorted by startIndex.
+ * The added ranges are always ensured to be non-overlapping.
+ * Provides the SkipRangeIterator, which skips the Ranges 
+ * stored in this object.
+ */
+public class SortedRanges implements Writable{
+  
+  public static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.mapred.SortedRanges");
+  
+  private SortedSet<Range> ranges = new TreeSet<Range>();
+  private int indicesCount;
+  
+  public Iterator<Long> skipRangeIterator(){
+    return new SkipRangeIterator();
+  }
+  
+  public int getIndicesCount() {
+    return indicesCount;
+  }
+  
+  public void add(Range range){
+    if(range.isEmpty()) {
+      return;
+    }
+    synchronized(ranges) {
+      long startIndex = range.getStartRecIndex();
+      long endIndex = range.getEndRecIndex();
+      //make sure that there are no overlapping ranges
+      SortedSet<Range> headSet = ranges.headSet(range);
+      if(headSet.size()>0) {
+        Range previousRange = headSet.last();
+        LOG.debug("previousRange "+previousRange);
+        if(startIndex<=previousRange.getEndRecIndex()) {
+          //previousRange overlaps this range
+          //remove the previousRange
+          if(ranges.remove(previousRange)) {
+            indicesCount-=previousRange.getIndicesCount();
+          }
+          //expand this range
+          startIndex = previousRange.getStartRecIndex();
+        }
+      }
+      
+      Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
+      while(tailSetIt.hasNext()) {
+        Range nextRange = tailSetIt.next();
+        LOG.debug("nextRange "+nextRange +"   startIndex:"+startIndex+"  endIndex:"+endIndex);
+        if(endIndex>=nextRange.getStartRecIndex()) {
+          //nextRange overlaps this range
+          //remove the nextRange
+          tailSetIt.remove();
+          indicesCount-=nextRange.getIndicesCount();
+          if(endIndex<nextRange.getEndRecIndex()) {
+            //expand this range
+            endIndex = nextRange.getEndRecIndex();
+            break;
+          }
+        } else {
+          break;
+        }
+      }
+      if(endIndex>=startIndex) {
+        Range recRange = new Range(startIndex,endIndex);
+        ranges.add(recRange);
+        indicesCount+=recRange.getIndicesCount();
+        LOG.debug("added "+recRange +"  count "+indicesCount);
+      }
+    }
+  }
+  
+  
+  public void readFields(DataInput in) throws IOException {
+    indicesCount = in.readInt();
+    ranges = new TreeSet<Range>();
+    int size = in.readInt();
+    for(int i=0;i<size;i++) {
+      Range range = new Range();
+      range.readFields(in);
+      ranges.add(range);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(indicesCount);
+    out.writeInt(ranges.size());
+    Iterator<Range> it = ranges.iterator();
+    while(it.hasNext()) {
+      Range range = it.next();
+      range.write(out);
+    }
+  }
+  
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    Iterator<Range> it = ranges.iterator();
+    while(it.hasNext()) {
+      Range range = it.next();
+      sb.append(range.toString()+"\n");
+    }
+    return sb.toString();
+  }
+  
+  static class Range implements Comparable<Range>, Writable{
+    public static final int NULL_INDEX = -1;
+    
+    private long startRecIndex;//inclusive
+    private long endRecIndex;//inclusive
+        
+    public Range(long startRecIndex, long endRecIndex) {
+      if(endRecIndex<startRecIndex) {
+        throw new RuntimeException("Invalid range. EndRecIndex can't be " +
+            "less than startRecIndex");
+      }
+      this.startRecIndex = startRecIndex;
+      this.endRecIndex = endRecIndex;
+    }
+    
+    public Range() {
+      this(NULL_INDEX,NULL_INDEX);
+    }
+    
+    public long getStartRecIndex() {
+      return startRecIndex;
+    }
+    
+    public long getEndRecIndex() {
+      return endRecIndex;
+    }
+    
+    public long getIndicesCount() {
+      //start and end index are inclusive
+      return endRecIndex-startRecIndex+1;
+    }
+    
+    public boolean isEmpty() {
+      return startRecIndex==NULL_INDEX &&
+      endRecIndex==NULL_INDEX;
+    }
+    
+    public boolean equals(Range o) {
+      return startRecIndex==o.startRecIndex &&
+        endRecIndex==o.endRecIndex;
+    }
+    
+    public int compareTo(Range o) {
+      if(this.startRecIndex == o.startRecIndex) {
+        return 0;
+      }
+      return (this.startRecIndex > o.startRecIndex) ? 1:-1;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      startRecIndex = in.readLong();
+      endRecIndex = in.readLong();
+    }
+
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(startRecIndex);
+      out.writeLong(endRecIndex);
+    }
+    
+    public String toString() {
+      return startRecIndex +":" +
+        endRecIndex;
+    }    
+  }
+  
+  /**
+   * Index Iterator which skips the ranges stored 
+   *
+   */
+  private class SkipRangeIterator implements Iterator<Long> {
+    Iterator<Range> rangeIterator = ranges.iterator();
+    Range range = new Range();
+    long currentIndex = -1;
+    
+    public boolean hasNext() {
+      return true;
+    }
+    
+    
+    public synchronized Long next() {
+      currentIndex++;
+      LOG.debug("currentIndex "+currentIndex +"   "+range);
+      skipIfInRange();
+      while(currentIndex>range.getEndRecIndex() && rangeIterator.hasNext()) {
+        range = rangeIterator.next();
+        skipIfInRange();
+      }
+      return currentIndex;
+    }
+    
+    private void skipIfInRange() {
+      if(currentIndex>=range.getStartRecIndex() && currentIndex<=range.getEndRecIndex()) {
+        //need to skip the range
+        LOG.warn("Skipping record index " + currentIndex +"-" + range.getEndRecIndex());
+        currentIndex = range.getEndRecIndex() + 1;
+        
+      }
+    }
+    
+    public void remove() {
+      throw new UnsupportedOperationException("remove not supported.");
+    }
+    
+  }
+
+}
Index: src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Task_Counter.properties	(revision 679748)
+++ src/mapred/org/apache/hadoop/mapred/Task_Counter.properties	(working copy)
@@ -6,10 +6,12 @@
 MAP_INPUT_BYTES.name=          Map input bytes
 MAP_OUTPUT_RECORDS.name=       Map output records
 MAP_OUTPUT_BYTES.name=         Map output bytes
+MAP_SKIPPED_RECORDS.name=      Map skipped records
 COMBINE_INPUT_RECORDS.name=    Combine input records
 COMBINE_OUTPUT_RECORDS.name=   Combine output records
 REDUCE_INPUT_GROUPS.name=      Reduce input groups
 REDUCE_INPUT_RECORDS.name=     Reduce input records
 REDUCE_OUTPUT_RECORDS.name=    Reduce output records
+REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 
 
