Index: src/core/org/apache/hadoop/io/WrapperIOException.java
===================================================================
--- src/core/org/apache/hadoop/io/WrapperIOException.java	(revision 0)
+++ src/core/org/apache/hadoop/io/WrapperIOException.java	(revision 0)
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.IOException;
+
+/** This Exception wraps throwables into an IOException */
+public class WrapperIOException extends IOException {
+  
+  public WrapperIOException(Throwable cause) {
+    initCause(cause);
+  }
+
+}
Index: src/core/org/apache/hadoop/io/WritableUtils.java
===================================================================
--- src/core/org/apache/hadoop/io/WritableUtils.java	(revision 667496)
+++ src/core/org/apache/hadoop/io/WritableUtils.java	(working copy)
@@ -18,14 +18,19 @@
 
 package org.apache.hadoop.io;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
 public final class WritableUtils  {
 
   public static byte[] readCompressedByteArray(DataInput in) throws IOException {
@@ -214,6 +219,7 @@
    * Allocate a buffer for each thread that tries to clone objects.
    */
   private static ThreadLocal cloneBuffers = new ThreadLocal() {
+      @Override
       protected synchronized Object initialValue() {
         return new CopyInCopyOutBuffer();
       }
@@ -427,4 +433,33 @@
                             "due to end of input.");
     }
   }
+ 
+  /**
+   * Write a list of integers to DataOutput
+   */
+  public static void writeIntegerList(DataOutput out, List<Integer> list) 
+    throws IOException {
+    out.writeInt(list.size());
+    for(Integer item : list) {
+      out.writeInt(item);
+    }
+  }
+  
+  /**
+   * Read a list of integers from DataInput
+   * @param list the list to read into. A new ArrayList will be created if 
+   * it is null.
+   * @return the list of integers
+   */
+  public static List<Integer> readIntegerList(DataInput in, List<Integer> list)
+    throws IOException {
+    if(list == null)
+      list = new ArrayList<Integer>();
+    int size = in.readInt();
+    for(int i=0; i<size; i++ ) {
+      list.add(in.readInt());
+    }
+    return list;
+  }
+  
 }
Index: src/mapred/org/apache/hadoop/mapred/Reporter.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Reporter.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/Reporter.java	(working copy)
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.util.Progressable;
 
 /** 
@@ -46,13 +49,18 @@
       }
       public void progress() {
       }
-      public void incrCounter(Enum key, long amount) {
+      public void incrCounter(Enum<?> key, long amount) {
       }
       public void incrCounter(String group, String counter, long amount) {
       }
       public InputSplit getInputSplit() throws UnsupportedOperationException {
         throw new UnsupportedOperationException("NULL reporter has no input");
       }
+      public void reportBadRecord(int recordID, Throwable th) {
+      }
+      public List<Integer> getBadRecords() {
+        return new ArrayList<Integer>(0);
+      }
     };
 
   /**
@@ -71,7 +79,7 @@
    * @param amount A non-negative amount by which the counter is to 
    *               be incremented.
    */
-  public abstract void incrCounter(Enum key, long amount);
+  public abstract void incrCounter(Enum<?> key, long amount);
   
   /**
    * Increments the counter identified by the group and counter name
@@ -92,4 +100,13 @@
    */
   public abstract InputSplit getInputSplit() 
     throws UnsupportedOperationException;
+  
+  /** Report that the recordID is a bad record, which failed.
+   * @param recordID the ID of the record that failed 
+   * @param th the throwable that the task execution has thrown*/ 
+  void reportBadRecord(int recordID, Throwable th);
+  
+  /** Get the list of bad record IDs which failed, in ascending order. */
+  List<Integer> getBadRecords();
+  
 }
Index: src/mapred/org/apache/hadoop/mapred/Task.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Task.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/Task.java	(working copy)
@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.net.URI;
 import java.text.NumberFormat;
 import java.util.ArrayList;
@@ -46,6 +48,7 @@
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
@@ -112,7 +115,9 @@
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
   protected LocalDirAllocator lDirAlloc;
-
+  
+  private List<Integer> badRecords; //known bad records, in ascending order 
+  
   ////////////////////////////////////////////
   // Constructors
   ////////////////////////////////////////////
@@ -121,7 +126,7 @@
     taskStatus = TaskStatus.createTaskStatus(isMapTask());
   }
 
-  public Task(String jobFile, TaskAttemptID taskId, int partition) {
+  public Task(String jobFile, TaskAttemptID taskId, int partition, List<Integer> badRecords) {
     this.jobFile = jobFile;
     this.taskId = taskId;
      
@@ -135,6 +140,7 @@
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
     this.mapOutputFile.setJobId(taskId.getJobID());
+    this.badRecords = badRecords == null ? new ArrayList<Integer>() : badRecords;
   }
 
   ////////////////////////////////////////////
@@ -190,6 +196,7 @@
       Text.writeString(out, "");
     }
     taskStatus.write(out);
+    WritableUtils.writeIntegerList(out, badRecords);
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
@@ -202,7 +209,8 @@
       taskOutputPath = null;
     }
     taskStatus.readFields(in);
-    this.mapOutputFile.setJobId(taskId.getJobID()); 
+    this.mapOutputFile.setJobId(taskId.getJobID());
+    badRecords = WritableUtils.readIntegerList(in, null);
   }
 
   @Override
@@ -360,7 +368,7 @@
           // indicate that progress update needs to be sent
           setProgressFlag();
         }
-        public void incrCounter(Enum key, long amount) {
+        public void incrCounter(Enum<?> key, long amount) {
           if (counters != null) {
             counters.incrCounter(key, amount);
           }
@@ -375,6 +383,20 @@
         public InputSplit getInputSplit() throws UnsupportedOperationException {
           return Task.this.getInputSplit();
         }
+        
+        public void reportBadRecord(int recordID, Throwable th) {
+          Task.this.taskStatus.addBadRecord(recordID);
+          
+          //The throwable will be reported for diagnostic
+          ByteArrayOutputStream baos = new ByteArrayOutputStream();
+          th.printStackTrace(new PrintStream(baos));
+          Task.this.taskStatus.setDiagnosticInfo("Skipped error in record " + recordID +
+              ": " + baos.toString());
+        }
+        
+        public List<Integer> getBadRecords() {
+          return Task.this.badRecords;
+        }
       };
   }
 
@@ -749,6 +771,7 @@
       this.combineInputCounter = combineInputCounter;
     }
 
+    @Override
     public VALUE next() {
       combineInputCounter.increment(1);
       return super.next();
Index: src/mapred/org/apache/hadoop/mapred/MapRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/MapRunner.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/MapRunner.java	(working copy)
@@ -19,7 +19,10 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
 
+import org.apache.hadoop.io.WrapperIOException;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** Default {@link MapRunnable} implementation.*/
@@ -27,11 +30,14 @@
     implements MapRunnable<K1, V1, K2, V2> {
   
   private Mapper<K1, V1, K2, V2> mapper;
-
+  private int maxSkipRecords;
+  
+  
   @SuppressWarnings("unchecked")
   public void configure(JobConf job) {
     this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
                                                       job);
+    maxSkipRecords = job.getMapMaxSkipRecord();
   }
 
   public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
@@ -42,10 +48,43 @@
       K1 key = input.createKey();
       V1 value = input.createValue();
       
-      while (input.next(key, value)) {
-        // map pair to output
-        mapper.map(key, value, output, reporter);
+      int recordID = 0;
+      if(maxSkipRecords == 0) { //two different execution paths for performance 
+        while (input.next(key, value)) {
+          // map pair to output
+          mapper.map(key, value, output, reporter);
+        }
       }
+      else {
+        List<Integer> badRecords = reporter.getBadRecords();
+        int skippedRecordCount = badRecords.size(); 
+        Iterator<Integer> badRecordIt = badRecords.iterator();
+        int nextBadRecord =  badRecordIt.hasNext() ? badRecordIt.next() : Integer.MAX_VALUE;
+        while (input.next(key, value)) {
+          try {
+            if(recordID == nextBadRecord) {
+              //skip this record
+              nextBadRecord =  badRecordIt.hasNext() ? badRecordIt.next() : Integer.MAX_VALUE;
+            } else {
+              //map pair to output
+              mapper.map(key, value, output, reporter);
+            }
+          } catch (OutOfMemoryError ex) {
+            reporter.reportBadRecord(recordID, ex);
+            skippedRecordCount++;
+            throw ex; //cannot recover
+          } catch(Throwable th) {  
+            reporter.reportBadRecord(recordID, th); //try to continue executing
+            if(skippedRecordCount++ > maxSkipRecords) {
+              if(th instanceof RuntimeException) throw (RuntimeException)th;
+              else if (th instanceof IOException) throw (IOException)th;
+              else throw new WrapperIOException(th);
+            }
+          } finally {
+            recordID ++;
+          }
+        }
+      }
     } finally {
       mapper.close();
     }
Index: src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java	(working copy)
@@ -135,7 +135,7 @@
           MapTask map = new MapTask(file.toString(),  
                                     mapId, i,
                                     splits[i].getClass().getName(),
-                                    split);
+                                    split, null);
           JobConf localConf = new JobConf(job);
           if (outputFs != null) {
             if (outputFs.exists(tmpDir)) {
@@ -179,7 +179,7 @@
 
             {
               ReduceTask reduce = new ReduceTask(file.toString(), 
-                                                 reduceId, 0, mapIds.size());
+                                                 reduceId, 0, mapIds.size(), null);
               JobConf localConf = new JobConf(job);
               if (outputFs != null) {
                 if (outputFs.exists(tmpDir)) {
Index: src/mapred/org/apache/hadoop/mapred/TaskStatus.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskStatus.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/TaskStatus.java	(working copy)
@@ -20,6 +20,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -57,6 +58,8 @@
   private Counters counters;
   private boolean includeCounters;
 
+  private ArrayList<Integer> badRecords = new ArrayList<Integer>(3);
+  
   public TaskStatus() {}
 
   public TaskStatus(TaskAttemptID taskid, float progress,
@@ -259,8 +262,17 @@
     
     this.phase = status.getPhase();
     this.counters = status.getCounters();
+    badRecords.addAll(status.badRecords);
   }
   
+  synchronized void addBadRecord(int recordID) {
+    badRecords.add(recordID);
+  }
+  
+  public ArrayList<Integer> getBadRecords() {
+    return badRecords;
+  }
+  
   /**
    * Clear out transient information after sending out a status-update
    * from either the {@link Task} to the {@link TaskTracker} or from the
@@ -269,6 +281,7 @@
   synchronized void clearStatus() {
     // Clear diagnosticInfo
     diagnosticInfo = "";
+    badRecords.clear();
   }
 
   @Override
@@ -297,6 +310,7 @@
     if (includeCounters) {
       counters.write(out);
     }
+    WritableUtils.writeIntegerList(out, badRecords);
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -313,6 +327,7 @@
     if (includeCounters) {
       counters.readFields(in);
     }
+    WritableUtils.readIntegerList(in, badRecords);
   }
   
   //////////////////////////////////////////////////////////////////////////////
Index: src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java	(working copy)
@@ -23,29 +23,28 @@
 import org.apache.hadoop.ipc.VersionedProtocol;
 
 /** Protocol that task child process uses to contact its parent process.  The
- * parent is a daemon which which polls the central master for a new map or
+ * parent is a daemon which polls the central master for a new map or
  * reduce task and runs it as a child process.  All communication between child
  * and parent is via this protocol. */ 
 interface TaskUmbilicalProtocol extends VersionedProtocol {
 
   /** 
-   * Changed the version to 2, since we have a new method getMapOutputs 
-   * Changed version to 3 to have progress() return a boolean
-   * Changed the version to 4, since we have replaced 
-   *         TaskUmbilicalProtocol.progress(String, float, String, 
-   *         org.apache.hadoop.mapred.TaskStatus.Phase, Counters) 
-   *         with statusUpdate(String, TaskStatus)
-   * 
-   * Version 5 changed counters representation for HADOOP-2248
+   * Version 2 adds a new method getMapOutputs 
+   * Version 3 changes progress() to return a boolean
+   * Version 4 replaces TaskUmbilicalProtocol.progress(String, float, String, 
+   *           org.apache.hadoop.mapred.TaskStatus.Phase, Counters) 
+   *           with statusUpdate(String, TaskStatus)
+   * Version 5 changes counters representation for HADOOP-2248
    * Version 6 changes the TaskStatus representation for HADOOP-2208
    * Version 7 changes the done api (via HADOOP-3140). It now expects whether
    *           or not the task's output needs to be promoted.
    * Version 8 changes {job|tip|task}id's to use their corresponding 
-   * objects rather than strings.
+   *           objects rather than strings.
    * Version 9 changes the counter representation for HADOOP-1915
+   * Version 10 adds a new method reportBadRecord()
    * */
 
-  public static final long versionID = 9L;
+  public static final long versionID = 10L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(TaskAttemptID taskid) throws IOException;
Index: src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskInProgress.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/TaskInProgress.java	(working copy)
@@ -107,7 +107,9 @@
   
   private Counters counters = new Counters();
   
-
+  //the list of records that caused exception during execution
+  private ArrayList<Integer> badRecords = new ArrayList<Integer>(3);
+  
   /**
    * Constructor for MapTask
    */
@@ -421,7 +423,8 @@
     }
         
     taskStatuses.put(taskid, status);
-
+    badRecords.addAll(status.getBadRecords());
+    
     // Recompute progress
     recomputeProgress();
     return changed;
@@ -706,9 +709,9 @@
 
     if (isMapTask()) {
       t = new MapTask(jobFile, taskid, partition, 
-          rawSplit.getClassName(), rawSplit.getBytes());
+          rawSplit.getClassName(), rawSplit.getBytes(), badRecords);
     } else {
-      t = new ReduceTask(jobFile, taskid, partition, numMaps);
+      t = new ReduceTask(jobFile, taskid, partition, numMaps, badRecords);
     }
     t.setConf(conf);
     tasks.put(taskid, t);
Index: src/mapred/org/apache/hadoop/mapred/TaskTracker.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(working copy)
@@ -1416,7 +1416,7 @@
     private long taskTimeout;
     private String debugCommand;
     private boolean shouldPromoteOutput = false;
-        
+    
     /**
      */
     public TaskInProgress(Task task, JobConf conf) {
@@ -2026,6 +2026,12 @@
       LOG.warn("Error from unknown child task: "+taskid+". Ignored.");
     }
   }
+  
+  public synchronized void reportBadRecord(TaskAttemptID taskID, int recordID) throws IOException {
+    TaskInProgress tip = tasks.get(taskID);
+    TaskStatus status = tip.getStatus();
+    status.addBadRecord(recordID);
+  }
 
   /** Child checking to see if we're alive.  Normally does nothing.*/
   public synchronized boolean ping(TaskAttemptID taskid) throws IOException {
@@ -2529,6 +2535,7 @@
       return;
     }
 
+    @Override
     public void run() {
       LOG.debug("cleanup thread started");
       Path path = null;
Index: src/mapred/org/apache/hadoop/mapred/JobConf.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobConf.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/JobConf.java	(working copy)
@@ -20,30 +20,30 @@
 
 
 import java.io.IOException;
-
+import java.net.URL;
+import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.StringTokenizer;
 
-import java.net.URL;
-import java.net.URLDecoder;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
-
+import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 
@@ -1295,6 +1295,22 @@
     return get("mapred.reduce.task.debug.script");
   }
 
+  public int getMapMaxSkipRecord() {
+    return getInt("mapred.map.max.skip.record", 0);
+  }
+  
+  public void setMapMaxSkipRecord(int numRecord) {
+    setInt("mapred.map.max.skip.record", numRecord);
+  }
+  
+  public int getReduceMaxSkipRecord() {
+    return getInt("mapred.reduce.max.skip.record", 0);
+  }
+  
+  public void setReduceMaxSkipRecord(int numRecord) {
+    setInt("mapred.reduce.max.skip.record", numRecord);
+  }
+  
   /**
    * Get the uri to be invoked in-order to send a notification after the job 
    * has completed (success/failure). 
Index: src/mapred/org/apache/hadoop/mapred/ReduceTask.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/ReduceTask.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/ReduceTask.java	(working copy)
@@ -34,8 +34,8 @@
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -62,7 +62,9 @@
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapred.IFile.*;
+import org.apache.hadoop.mapred.IFile.InMemoryReader;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -138,8 +140,8 @@
   }
 
   public ReduceTask(String jobFile, TaskAttemptID taskId,
-                    int partition, int numMaps) {
-    super(jobFile, taskId, partition);
+                    int partition, int numMaps, List<Integer> badRecords) {
+    super(jobFile, taskId, partition, badRecords);
     this.numMaps = numMaps;
   }
   
@@ -301,6 +303,9 @@
         }
       };
     
+    int maxSkipRecords = job.getReduceMaxSkipRecord();
+    int recordID = 0;
+    
     // apply reduce function
     try {
       Class keyClass = job.getMapOutputKeyClass();
@@ -310,27 +315,26 @@
           job.getOutputValueGroupingComparator(), keyClass, valClass, 
           job, reporter);
       values.informReduceProgress();
-      while (values.more()) {
-        reduceInputKeyCounter.increment(1);
-        reducer.reduce(values.getKey(), values, collector, reporter);
-        values.nextKey();
-        values.informReduceProgress();
+      
+      if(maxSkipRecords == 0) { //two different execution paths for performance
+        while (values.more()) {
+          reduceInputKeyCounter.increment(1);
+          reducer.reduce(values.getKey(), values, collector, reporter);
+          values.nextKey();
+          values.informReduceProgress();
+        }
+      } else {
+        while (values.more()) {
+          
+          reduceInputKeyCounter.increment(1);
+          reducer.reduce(values.getKey(), values, collector, reporter);
+          values.nextKey();
+          values.informReduceProgress();
+        }
       }
-
-      //Clean up: repeated in catch block below
+    }finally {
       reducer.close();
       out.close(reporter);
-      //End of clean up.
-    } catch (IOException ioe) {
-      try {
-        reducer.close();
-      } catch (IOException ignored) {}
-        
-      try {
-        out.close(reporter);
-      } catch (IOException ignored) {}
-      
-      throw ioe;
     }
     done(umbilical);
   }
@@ -1838,6 +1842,7 @@
         setDaemon(true);
       }
 
+      @Override
       @SuppressWarnings("unchecked")
       public void run() {
         try {
@@ -1939,6 +1944,7 @@
         setDaemon(true);
       }
       
+      @Override
       @SuppressWarnings("unchecked")
       public void run() {
         LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
@@ -1992,8 +1998,8 @@
                    " segments...");
           
           rIter = Merger.merge(conf, localFileSys,
-                               (Class<K>)conf.getMapOutputKeyClass(),
-                               (Class<V>)conf.getMapOutputValueClass(),
+                               conf.getMapOutputKeyClass(),
+                               conf.getMapOutputValueClass(),
                                inMemorySegments, inMemorySegments.size(),
                                new Path(reduceTask.getTaskID().toString()),
                                conf.getOutputKeyComparator(), reporter);
Index: src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/IsolationRunner.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/IsolationRunner.java	(working copy)
@@ -180,11 +180,11 @@
       BytesWritable split = new BytesWritable();
       split.readFields(splitFile);
       splitFile.close();
-      task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split);
+      task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split, null);
     } else {
       int numMaps = conf.getNumMapTasks();
       fillInMissingMapOutputs(local, taskId, numMaps, conf);
-      task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps);
+      task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps, null);
     }
     task.setConf(conf);
     task.run(conf, new FakeUmbilical());
Index: src/mapred/org/apache/hadoop/mapred/MapTask.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/MapTask.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/MapTask.java	(working copy)
@@ -47,8 +47,8 @@
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.IFile.Writer;
-import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
@@ -80,9 +80,10 @@
   }
 
   public MapTask(String jobFile, TaskAttemptID taskId, 
-                 int partition, String splitClass, BytesWritable split
+                 int partition, String splitClass, BytesWritable split,
+                 List<Integer> badRecords
                  ) throws IOException {
-    super(jobFile, taskId, partition);
+    super(jobFile, taskId, partition, badRecords);
     this.splitClass = splitClass;
     this.split.set(split);
   }
@@ -890,6 +891,7 @@
       private int start;
       private int length;
             
+      @Override
       public void reset(byte[] buffer, int start, int length) {
         this.buffer = buffer;
         this.start = start;
Index: src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java	(revision 667496)
+++ src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java	(working copy)
@@ -17,10 +17,17 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
-import java.io.*;
-import java.util.*;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
 
 /**************************************************
  * A TaskTrackerStatus is a MapReduce primitive.  Keeps
@@ -114,8 +121,8 @@
    */
   public int countMapTasks() {
     int mapCount = 0;
-    for (Iterator it = taskReports.iterator(); it.hasNext();) {
-      TaskStatus ts = (TaskStatus) it.next();
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+      TaskStatus ts = it.next();
       TaskStatus.State state = ts.getRunState();
       if (ts.getIsMap() &&
           ((state == TaskStatus.State.RUNNING) ||
@@ -131,8 +138,8 @@
    */
   public int countReduceTasks() {
     int reduceCount = 0;
-    for (Iterator it = taskReports.iterator(); it.hasNext();) {
-      TaskStatus ts = (TaskStatus) it.next();
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+      TaskStatus ts = it.next();
       TaskStatus.State state = ts.getRunState();
       if ((!ts.getIsMap()) &&
           ((state == TaskStatus.State.RUNNING) ||  
@@ -170,8 +177,8 @@
   // Writable
   ///////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, trackerName);
-    UTF8.writeString(out, host);
+    Text.writeString(out, trackerName);
+    Text.writeString(out, host);
     out.writeInt(httpPort);
     out.writeInt(failures);
     out.writeInt(maxMapTasks);
@@ -183,8 +190,8 @@
   }
 
   public void readFields(DataInput in) throws IOException {
-    this.trackerName = UTF8.readString(in);
-    this.host = UTF8.readString(in);
+    this.trackerName = Text.readString(in);
+    this.host = Text.readString(in);
     this.httpPort = in.readInt();
     this.failures = in.readInt();
     this.maxMapTasks = in.readInt();
