Index: src/test/org/apache/hadoop/io/RandomDatum.java
===================================================================
--- src/test/org/apache/hadoop/io/RandomDatum.java	(revision 656306)
+++ src/test/org/apache/hadoop/io/RandomDatum.java	(working copy)
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.io;
 
-import java.util.*;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
 
-public class RandomDatum implements WritableComparable {
+public class RandomDatum implements WritableComparable<RandomDatum> {
   private int length;
   private byte[] data;
 
@@ -49,20 +51,24 @@
     in.readFully(data, 0, length);
   }
 
-  public int compareTo(Object o) {
-    RandomDatum that = (RandomDatum)o;
+  public int compareTo(RandomDatum that) {
     return WritableComparator.compareBytes(this.data, 0, this.length,
                                            that.data, 0, that.length);
   }
 
+  @Override
   public boolean equals(Object o) {
-    return compareTo(o) == 0;
+    if (o instanceof RandomDatum) {
+      return compareTo((RandomDatum)o) == 0;
+    }
+    return false;
   }
 
   private static final char[] HEX_DIGITS =
   {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'};
 
   /** Returns a string representation of this object. */
+  @Override
   public String toString() {
     StringBuffer buf = new StringBuffer(length*2);
     for (int i = 0; i < length; i++) {
@@ -92,11 +98,12 @@
   }
 
   /** A WritableComparator optimized for RandomDatum. */
-  public static class Comparator extends WritableComparator {
+  public static class Comparator extends WritableComparator<RandomDatum> {
     public Comparator() {
       super(RandomDatum.class);
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
       int n1 = readInt(b1, s1);
Index: src/test/org/apache/hadoop/mapred/TestComparators.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestComparators.java	(revision 656306)
+++ src/test/org/apache/hadoop/mapred/TestComparators.java	(working copy)
@@ -17,13 +17,22 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.BooleanWritable.Comparator;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
 import junit.framework.TestCase;
-import java.io.*;
-import java.util.*;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.BooleanWritable.Comparator;
+
 /**
  * Two different types of comparators can be used in MapReduce. One is used
  * during the Map and Reduce phases, to sort/merge key-value pairs. Another
@@ -128,7 +137,7 @@
     public void reduce(IntWritable key, Iterator<Writable> values, 
                        OutputCollector<IntWritable, Text> out,
                        Reporter reporter) throws IOException {
-      int currentKey = ((IntWritable)(key)).get();
+      int currentKey = ((key)).get();
       // keys should be in descending order
       if (currentKey > lastKey) {
         fail("Keys not in sorted descending order");
@@ -233,12 +242,10 @@
    * A decreasing Comparator for IntWritable 
    */ 
   public static class DecreasingIntComparator extends IntWritable.Comparator {
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
       return -super.compare(b1, s1, l1, b2, s2, l2);
     }
-    static {                    // register this comparator
-      WritableComparator.define(DecreasingIntComparator.class, new Comparator());
-    }
   }
 
   /** Grouping function for values based on the composite key. This
@@ -249,6 +256,7 @@
     public CompositeIntGroupFn() {
       super(IntWritable.class);
     }
+    @Override
     public int compare (WritableComparable v1, WritableComparable v2) {
       int val1 = ((IntWritable)(v1)).get() / 100;
       int val2 = ((IntWritable)(v2)).get() / 100;
@@ -275,10 +283,12 @@
   /** Reverse grouping function for values based on the composite key. 
    */
   public static class CompositeIntReverseGroupFn extends CompositeIntGroupFn {
+    @Override
     public int compare (WritableComparable v1, WritableComparable v2) {
       return -super.compare(v1, v2);
     }
     
+    @Override
     public boolean equals (IntWritable v1, IntWritable v2) {
       return !(super.equals(v1, v2));
     }
Index: src/java/org/apache/hadoop/io/FloatWritable.java
===================================================================
--- src/java/org/apache/hadoop/io/FloatWritable.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/FloatWritable.java	(working copy)
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.io;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
 /** A WritableComparable for floats. */
-public class FloatWritable implements WritableComparable {
+public class FloatWritable implements WritableComparable<FloatWritable> {
   private float value;
 
   public FloatWritable() {}
@@ -43,6 +45,7 @@
   }
 
   /** Returns true iff <code>o</code> is a FloatWritable with the same value. */
+  @Override
   public boolean equals(Object o) {
     if (!(o instanceof FloatWritable))
       return false;
@@ -50,27 +53,30 @@
     return this.value == other.value;
   }
 
+  @Override
   public int hashCode() {
     return Float.floatToIntBits(value);
   }
 
   /** Compares two FloatWritables. */
-  public int compareTo(Object o) {
+  public int compareTo(FloatWritable that) {
     float thisValue = this.value;
-    float thatValue = ((FloatWritable)o).value;
+    float thatValue = that.value;
     return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
   }
 
+  @Override
   public String toString() {
     return Float.toString(value);
   }
 
   /** A Comparator optimized for FloatWritable. */ 
-  public static class Comparator extends WritableComparator {
+  public static class Comparator extends WritableComparator<FloatWritable> {
     public Comparator() {
       super(FloatWritable.class);
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
       float thisValue = readFloat(b1, s1);
Index: src/java/org/apache/hadoop/io/DoubleWritable.java
===================================================================
--- src/java/org/apache/hadoop/io/DoubleWritable.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/DoubleWritable.java	(working copy)
@@ -25,7 +25,7 @@
 /**
  * Writable for Double values.
  */
-public class DoubleWritable implements WritableComparable {
+public class DoubleWritable implements WritableComparable<DoubleWritable> {
 
   private double value = 0.0;
   
@@ -52,6 +52,7 @@
   /**
    * Returns true iff <code>o</code> is a DoubleWritable with the same value.
    */
+  @Override
   public boolean equals(Object o) {
     if (!(o instanceof DoubleWritable)) {
       return false;
@@ -60,25 +61,27 @@
     return this.value == other.value;
   }
   
+  @Override
   public int hashCode() {
     return (int)Double.doubleToLongBits(value);
   }
   
-  public int compareTo(Object o) {
-    DoubleWritable other = (DoubleWritable)o;
+  public int compareTo(DoubleWritable other) {
     return (value < other.value ? -1 : (value == other.value ? 0 : 1));
   }
   
+  @Override
   public String toString() {
     return Double.toString(value);
   }
 
   /** A Comparator optimized for DoubleWritable. */ 
-  public static class Comparator extends WritableComparator {
+  public static class Comparator extends WritableComparator<DoubleWritable> {
     public Comparator() {
       super(DoubleWritable.class);
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
       double thisValue = readDouble(b1, s1);
Index: src/java/org/apache/hadoop/io/BytesWritable.java
===================================================================
--- src/java/org/apache/hadoop/io/BytesWritable.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/BytesWritable.java	(working copy)
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.io;
 
-import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.IOException;
 
 /** 
  * A byte sequence that is usable as a key or value.
@@ -28,7 +28,7 @@
  * the current capacity. The hash function is the front of the md5 of the 
  * buffer. The sort order is the same as memcmp.
  */
-public class BytesWritable implements WritableComparable {
+public class BytesWritable implements WritableComparable<BytesWritable> {
   private static final int LENGTH_BYTES = 4;
   private static final byte[] EMPTY_BYTES = {};
 
@@ -137,28 +137,29 @@
     out.write(bytes, 0, size);
   }
   
+  @Override
   public int hashCode() {
     return WritableComparator.hashBytes(bytes, size);
   }
   
   /**
    * Define the sort order of the BytesWritable.
-   * @param right_obj The other bytes writable
+   * @param that The other bytes writable
    * @return Positive if left is bigger than right, 0 if they are equal, and
    *         negative if left is smaller than right.
    */
-  public int compareTo(Object right_obj) {
-    BytesWritable right = ((BytesWritable) right_obj);
+  public int compareTo(BytesWritable that) {
     return WritableComparator.compareBytes(bytes, 0, size, 
-                                           right.bytes, 0, right.size);
+                                           that.bytes, 0, that.size);
   }
   
   /**
    * Are the two byte sequences equal?
    */
-  public boolean equals(Object right_obj) {
-    if (right_obj instanceof BytesWritable) {
-      return compareTo(right_obj) == 0;
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof BytesWritable) {
+      return compareTo((BytesWritable)that) == 0;
     }
     return false;
   }
@@ -166,6 +167,7 @@
   /**
    * Generate the stream of bytes as hex pairs separated by ' '.
    */
+  @Override
   public String toString() { 
     StringBuffer sb = new StringBuffer(3*size);
     for (int idx = 0; idx < size; idx++) {
@@ -184,7 +186,7 @@
   }
 
   /** A Comparator optimized for BytesWritable. */ 
-  public static class Comparator extends WritableComparator {
+  public static class Comparator extends WritableComparator<BytesWritable> {
     public Comparator() {
       super(BytesWritable.class);
     }
@@ -192,6 +194,7 @@
     /**
      * Compare the buffers in serialized form.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
       return compareBytes(b1, s1+LENGTH_BYTES, l1-LENGTH_BYTES, 
Index: src/java/org/apache/hadoop/io/Text.java
===================================================================
--- src/java/org/apache/hadoop/io/Text.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/Text.java	(working copy)
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.io;
 
-import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.CharacterCodingException;
@@ -44,7 +44,7 @@
  * byte array contains valid UTF8 code, calculating the length of an encoded
  * string.
  */
-public class Text implements WritableComparable {
+public class Text implements WritableComparable<Text> {
   private static final Log LOG= LogFactory.getLog("org.apache.hadoop.io.Text");
   
   private static final CharsetDecoder DECODER = 
@@ -239,6 +239,7 @@
    * Convert text back to string
    * @see java.lang.Object#toString()
    */
+  @Override
   public String toString() {
     try {
       return decode(bytes, 0, length);
@@ -273,8 +274,7 @@
   }
 
   /** Compare two Texts bytewise using standard UTF8 ordering. */
-  public int compareTo(Object o) {
-    Text that = (Text)o;
+  public int compareTo(Text that) {
     if (this == that)
       return 0;
     else
@@ -284,6 +284,7 @@
   }
 
   /** Returns true iff <code>o</code> is a Text with the same contents.  */
+  @Override
   public boolean equals(Object o) {
     if (!(o instanceof Text))
       return false;
@@ -293,20 +294,22 @@
     else if (this.length != that.length)
       return false;
     else
-      return compareTo(o) == 0;
+      return compareTo((Text)o) == 0;
   }
 
   /** hash function */
+  @Override
   public int hashCode() {
     return WritableComparator.hashBytes(bytes, length);
   }
 
   /** A WritableComparator optimized for Text keys. */
-  public static class Comparator extends WritableComparator {
+  public static class Comparator extends WritableComparator<Text> {
     public Comparator() {
       super(Text.class);
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
       int n1 = WritableUtils.decodeVIntSize(b1[s1]);
@@ -454,7 +457,7 @@
     int length = 0;
     int state = LEAD_BYTE;
     while (count < start+len) {
-      int aByte = ((int) utf8[count] & 0xFF);
+      int aByte = (utf8[count] & 0xFF);
 
       switch (state) {
       case LEAD_BYTE:
Index: src/java/org/apache/hadoop/io/WritableComparator.java
===================================================================
--- src/java/org/apache/hadoop/io/WritableComparator.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/WritableComparator.java	(working copy)
@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.io;
 
-import java.io.*;
-import java.util.*;
+import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+
 /** A Comparator for {@link WritableComparable}s.
  *
  * <p>This base implemenation uses the natural ordering.  To define alternate
@@ -30,47 +32,58 @@
  * {@link #compare(byte[],int,int,byte[],int,int)}.  Static utility methods are
  * provided to assist in optimized implementations of this method.
  */
-public class WritableComparator implements RawComparator {
+public class WritableComparator<K extends WritableComparable<K>> 
+  extends DefaultComparator<K> {
 
-  private static HashMap<Class, WritableComparator> comparators =
-    new HashMap<Class, WritableComparator>(); // registry
-
-  /** Get a comparator for a {@link WritableComparable} implementation. */
-  public static synchronized WritableComparator get(Class c) {
-    WritableComparator comparator = comparators.get(c);
+  /** Get a comparator for a {@link WritableComparable} implementation. 
+   * @deprecated please use {@link 
+   * DefaultComparator#get(Class, org.apache.hadoop.conf.Configuration)} instead
+   */
+  @Deprecated
+  public static synchronized <K extends WritableComparable<K>> 
+    WritableComparator<K> get(Class<K> c) {
+    @SuppressWarnings("unchecked")
+    WritableComparator<K> comparator 
+      = (WritableComparator<K>)DefaultComparator.getRegistered(c);
     if (comparator == null)
-      comparator = new WritableComparator(c);
+      comparator = new WritableComparator<K>(c);
     return comparator;
   }
 
   /** Register an optimized comparator for a {@link WritableComparable}
-   * implementation. */
-  public static synchronized void define(Class c,
-                                         WritableComparator comparator) {
-    comparators.put(c, comparator);
+   * implementation. 
+   * @deprecated please use {@link DefaultComparator#register(Class, RawComparator)}
+   * instead*/
+  @Deprecated
+  public static synchronized <K extends WritableComparable<K>> 
+    void define(Class<K> c, WritableComparator<K> comparator) {
+    
+    DefaultComparator.register(c, comparator);
   }
 
+  private Class<K> keyClass;
 
-  private DataInputBuffer buffer = new DataInputBuffer();
-
-  private Class keyClass;
-  private WritableComparable key1;
-  private WritableComparable key2;
-
   /** Construct for a {@link WritableComparable} implementation. */
-  protected WritableComparator(Class keyClass) {
-    this.keyClass = keyClass;
-    this.key1 = newKey();
-    this.key2 = newKey();
+  protected WritableComparator(Class<K> keyClass) {
+    this(keyClass, null);
   }
 
+  /** Construct for a {@link WritableComparable} implementation. passed
+   * configuration can be used to initialize objects 
+   */
+  protected WritableComparator(Class<K> keyClass, Configuration conf) {
+    super(new WritableSerialization().getDeserializer(keyClass));
+    setConf(conf);
+    this.keyClass = keyClass;
+  }
+  
   /** Returns the WritableComparable implementation class. */
-  public Class getKeyClass() { return keyClass; }
+  public Class<K> getKeyClass() { return keyClass; }
 
   /** Construct a new {@link WritableComparable} instance. */
-  public WritableComparable newKey() {
+  public K newKey() {
     try {
-      return (WritableComparable)keyClass.newInstance();
+      return keyClass.newInstance();
     } catch (InstantiationException e) {
       throw new RuntimeException(e);
     } catch (IllegalAccessException e) {
@@ -78,41 +91,6 @@
     }
   }
 
-  /** Optimization hook.  Override this to make SequenceFile.Sorter's scream.
-   *
-   * <p>The default implementation reads the data into two {@link
-   * WritableComparable}s (using {@link
-   * Writable#readFields(DataInput)}, then calls {@link
-   * #compare(WritableComparable,WritableComparable)}.
-   */
-  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-    try {
-      buffer.reset(b1, s1, l1);                   // parse key1
-      key1.readFields(buffer);
-      
-      buffer.reset(b2, s2, l2);                   // parse key2
-      key2.readFields(buffer);
-      
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    
-    return compare(key1, key2);                   // compare them
-  }
-
-  /** Compare two WritableComparables.
-   *
-   * <p> The default implementation uses the natural ordering, calling {@link
-   * Comparable#compareTo(Object)}. */
-  @SuppressWarnings("unchecked")
-  public int compare(WritableComparable a, WritableComparable b) {
-    return a.compareTo(b);
-  }
-
-  public int compare(Object a, Object b) {
-    return compare((WritableComparable)a, (WritableComparable)b);
-  }
-
   /** Lexicographic order of binary data. */
   public static int compareBytes(byte[] b1, int s1, int l1,
                                  byte[] b2, int s2, int l2) {
@@ -132,7 +110,7 @@
   public static int hashBytes(byte[] bytes, int length) {
     int hash = 1;
     for (int i = 0; i < length; i++)
-      hash = (31 * hash) + (int)bytes[i];
+      hash = (31 * hash) + bytes[i];
     return hash;
   }
 
Index: src/java/org/apache/hadoop/io/BooleanWritable.java
===================================================================
--- src/java/org/apache/hadoop/io/BooleanWritable.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/BooleanWritable.java	(working copy)
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.io;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
 /** 
  * A WritableComparable for booleans. 
  */
-public class BooleanWritable implements WritableComparable {
+public class BooleanWritable implements WritableComparable<BooleanWritable> {
   private boolean value;
 
   /** 
@@ -64,6 +66,7 @@
 
   /**
    */
+  @Override
   public boolean equals(Object o) {
     if (!(o instanceof BooleanWritable)) {
       return false;
@@ -72,6 +75,7 @@
     return this.value == other.value;
   }
 
+  @Override
   public int hashCode() {
     return value ? 0 : 1;
   }
@@ -80,12 +84,13 @@
 
   /**
    */
-  public int compareTo(Object o) {
+  public int compareTo(BooleanWritable that) {
     boolean a = this.value;
-    boolean b = ((BooleanWritable) o).value;
+    boolean b = (that).value;
     return ((a == b) ? 0 : (a == false) ? -1 : 1);
   }
   
+  @Override
   public String toString() {
     return Boolean.toString(get());
   }
@@ -93,11 +98,12 @@
   /** 
    * A Comparator optimized for BooleanWritable. 
    */ 
-  public static class Comparator extends WritableComparator {
+  public static class Comparator extends WritableComparator<BooleanWritable> {
     public Comparator() {
       super(BooleanWritable.class);
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
       boolean a = (readInt(b1, s1) == 1) ? true : false;
Index: src/java/org/apache/hadoop/io/DefaultComparator.java
===================================================================
--- src/java/org/apache/hadoop/io/DefaultComparator.java	(revision 0)
+++ src/java/org/apache/hadoop/io/DefaultComparator.java	(revision 0)
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.DeserializerComparator;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+
+public class DefaultComparator<T extends Comparable<T>> 
+  extends DeserializerComparator<T> implements Configurable {
+  
+  private Configuration conf;
+  
+  private static HashMap<Class<?>, RawComparator<?>> comparators =
+    new HashMap<Class<?>, RawComparator<?>>(); // registry
+  
+  public DefaultComparator(Class<T> keyClass, Configuration conf) {
+    super(new SerializationFactory(conf).getDeserializer(keyClass));
+    setConf(conf);
+  }
+  
+  public DefaultComparator(Deserializer<T> deserializer) {
+    super(deserializer);
+    setConf(null);
+  }
+  
+  /** Compare two Comparables.
+  *
+  * <p> The default implementation uses the natural ordering, calling {@link
+  * Comparable#compareTo(Object)}. */
+  public int compare(T o1, T o2) {
+    return o1.compareTo(o2);
+  }
+  
+  /** Get a comparator for a {@link Comparable} implementation. If a 
+   * comparator is not registered, returns a DefaultComparator instance.*/
+  @SuppressWarnings("unchecked")
+  public static synchronized  <K>
+    RawComparator<K> get(Class<K> keyClass, Configuration conf) {
+    
+    RawComparator<K> comparator = getRegistered(keyClass);
+    if (comparator == null)
+      comparator = new DefaultComparator(keyClass, conf);
+    return comparator;
+  }
+
+  /** Get a comparator for a {@link Comparable} implementation. If a 
+   * comparator is not registered, returns null. */
+  @SuppressWarnings("unchecked")
+  public static synchronized <K> 
+    RawComparator<K> getRegistered(Class<K> keyClass) {
+      
+    return (RawComparator<K>) comparators.get(keyClass);
+  }
+  
+  
+  /** Register an optimized comparator for a {@link Comparable}
+   * implementation. */
+  public static synchronized <K> 
+    void register(Class<K> c, RawComparator<K> comparator) {
+    comparators.put(c, comparator);
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+}
Index: src/java/org/apache/hadoop/io/IntWritable.java
===================================================================
--- src/java/org/apache/hadoop/io/IntWritable.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/IntWritable.java	(working copy)
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.io;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
 /** A WritableComparable for ints. */
-public class IntWritable implements WritableComparable {
+public class IntWritable implements WritableComparable<IntWritable> {
   private int value;
 
   public IntWritable() {}
@@ -43,6 +45,7 @@
   }
 
   /** Returns true iff <code>o</code> is a IntWritable with the same value. */
+  @Override
   public boolean equals(Object o) {
     if (!(o instanceof IntWritable))
       return false;
@@ -50,27 +53,30 @@
     return this.value == other.value;
   }
 
+  @Override
   public int hashCode() {
     return value;
   }
 
   /** Compares two IntWritables. */
-  public int compareTo(Object o) {
+  public int compareTo(IntWritable that) {
     int thisValue = this.value;
-    int thatValue = ((IntWritable)o).value;
+    int thatValue = that.value;
     return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
   }
 
+  @Override
   public String toString() {
     return Integer.toString(value);
   }
 
   /** A Comparator optimized for IntWritable. */ 
-  public static class Comparator extends WritableComparator {
+  public static class Comparator extends WritableComparator<IntWritable> {
     public Comparator() {
       super(IntWritable.class);
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
       int thisValue = readInt(b1, s1);
Index: src/java/org/apache/hadoop/io/UTF8.java
===================================================================
--- src/java/org/apache/hadoop/io/UTF8.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/UTF8.java	(working copy)
@@ -18,20 +18,21 @@
 
 package org.apache.hadoop.io;
 
-import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
-import org.apache.commons.logging.*;
-
 /** A WritableComparable for strings that uses the UTF8 encoding.
  * 
  * <p>Also includes utilities for efficiently reading and writing UTF-8.
  *
  * @deprecated replaced by Text
  */
-public class UTF8 implements WritableComparable {
+@Deprecated
+public class UTF8 implements WritableComparable<UTF8> {
   private static final Log LOG= LogFactory.getLog("org.apache.hadoop.io.UTF8");
   private static final DataOutputBuffer OBUF = new DataOutputBuffer();
   private static final DataInputBuffer IBUF = new DataInputBuffer();
@@ -118,13 +119,13 @@
   }
 
   /** Compare two UTF8s. */
-  public int compareTo(Object o) {
-    UTF8 that = (UTF8)o;
+  public int compareTo(UTF8 that) {
     return WritableComparator.compareBytes(bytes, 0, length,
                                            that.bytes, 0, that.length);
   }
 
   /** Convert to a String. */
+  @Override
   public String toString() {
     StringBuffer buffer = new StringBuffer(length);
     try {
@@ -139,6 +140,7 @@
   }
 
   /** Returns true iff <code>o</code> is a UTF8 with the same contents.  */
+  @Override
   public boolean equals(Object o) {
     if (!(o instanceof UTF8))
       return false;
@@ -150,16 +152,18 @@
                                              that.bytes, 0, that.length) == 0;
   }
 
+  @Override
   public int hashCode() {
     return WritableComparator.hashBytes(bytes, length);
   }
 
   /** A WritableComparator optimized for UTF8 keys. */
-  public static class Comparator extends WritableComparator {
+  public static class Comparator extends WritableComparator<UTF8> {
     public Comparator() {
       super(UTF8.class);
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
       int n1 = readUnsignedShort(b1, s1);
Index: src/java/org/apache/hadoop/io/LongWritable.java
===================================================================
--- src/java/org/apache/hadoop/io/LongWritable.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/LongWritable.java	(working copy)
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.io;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
 /** A WritableComparable for longs. */
-public class LongWritable implements WritableComparable {
+public class LongWritable implements WritableComparable<LongWritable> {
   private long value;
 
   public LongWritable() {}
@@ -43,6 +45,7 @@
   }
 
   /** Returns true iff <code>o</code> is a LongWritable with the same value. */
+  @Override
   public boolean equals(Object o) {
     if (!(o instanceof LongWritable))
       return false;
@@ -50,27 +53,30 @@
     return this.value == other.value;
   }
 
+  @Override
   public int hashCode() {
     return (int)value;
   }
 
   /** Compares two LongWritables. */
-  public int compareTo(Object o) {
+  public int compareTo(LongWritable that) {
     long thisValue = this.value;
-    long thatValue = ((LongWritable)o).value;
+    long thatValue = that.value;
     return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
   }
 
+  @Override
   public String toString() {
     return Long.toString(value);
   }
 
   /** A Comparator optimized for LongWritable. */ 
-  public static class Comparator extends WritableComparator {
+  public static class Comparator extends WritableComparator<LongWritable> {
     public Comparator() {
       super(LongWritable.class);
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
       long thisValue = readLong(b1, s1);
@@ -81,9 +87,11 @@
 
   /** A decreasing Comparator optimized for LongWritable. */ 
   public static class DecreasingComparator extends Comparator {
-    public int compare(WritableComparable a, WritableComparable b) {
+    @Override
+    public int compare(LongWritable a, LongWritable b) {
       return -super.compare(a, b);
     }
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
       return -super.compare(b1, s1, l1, b2, s2, l2);
     }
Index: src/java/org/apache/hadoop/io/MD5Hash.java
===================================================================
--- src/java/org/apache/hadoop/io/MD5Hash.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/MD5Hash.java	(working copy)
@@ -18,15 +18,16 @@
 
 package org.apache.hadoop.io;
 
-import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
-import java.security.*;
 
 /** A Writable for MD5 hash values.
  */
-public class MD5Hash implements WritableComparable {
+public class MD5Hash implements WritableComparable<MD5Hash> {
   public static final int MD5_LEN = 16;
   private static final MessageDigest DIGESTER;
   static {
@@ -127,6 +128,7 @@
 
   /** Returns true iff <code>o</code> is an MD5Hash whose digest contains the
    * same values.  */
+  @Override
   public boolean equals(Object o) {
     if (!(o instanceof MD5Hash))
       return false;
@@ -137,24 +139,25 @@
   /** Returns a hash code value for this object.
    * Only uses the first 4 bytes, since md5s are evenly distributed.
    */
+  @Override
   public int hashCode() {
     return quarterDigest();
   }
 
 
   /** Compares this object with the specified object for order.*/
-  public int compareTo(Object o) {
-    MD5Hash that = (MD5Hash)o;
+  public int compareTo(MD5Hash that) {
     return WritableComparator.compareBytes(this.digest, 0, MD5_LEN,
                                            that.digest, 0, MD5_LEN);
   }
 
   /** A WritableComparator optimized for MD5Hash keys. */
-  public static class Comparator extends WritableComparator {
+  public static class Comparator extends WritableComparator<MD5Hash> {
     public Comparator() {
       super(MD5Hash.class);
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
       return compareBytes(b1, s1, MD5_LEN, b2, s2, MD5_LEN);
@@ -169,6 +172,7 @@
   {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'};
 
   /** Returns a string representation of this object. */
+  @Override
   public String toString() {
     StringBuffer buf = new StringBuffer(MD5_LEN*2);
     for (int i = 0; i < MD5_LEN; i++) {
Index: src/java/org/apache/hadoop/io/serializer/JavaSerialization.java
===================================================================
--- src/java/org/apache/hadoop/io/serializer/JavaSerialization.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/serializer/JavaSerialization.java	(working copy)
@@ -62,8 +62,8 @@
 
   }
   
-  static class JavaSerializationSerializer
-    implements Serializer<Serializable> {
+  static class JavaSerializationSerializer<T extends Serializable>
+    implements Serializer<T> {
 
     private ObjectOutputStream oos;
 
@@ -75,7 +75,7 @@
       };
     }
 
-    public void serialize(Serializable object) throws IOException {
+    public void serialize(T object) throws IOException {
       oos.writeObject(object);
     }
 
@@ -89,12 +89,11 @@
     return Serializable.class.isAssignableFrom(c);
   }
 
-  public Deserializer<Serializable> getDeserializer(Class<Serializable> c) {
-    return new JavaSerializationDeserializer<Serializable>();
+  public <K extends Serializable> Deserializer<K> getDeserializer(Class<K> c) {
+    return new JavaSerializationDeserializer<K>();
   }
-
-  public Serializer<Serializable> getSerializer(Class<Serializable> c) {
-    return new JavaSerializationSerializer();
+ 
+  public <K extends Serializable> Serializer<K> getSerializer(Class<K> c) {
+    return new JavaSerializationSerializer<K>();
   }
-
 }
Index: src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
===================================================================
--- src/java/org/apache/hadoop/io/serializer/WritableSerialization.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/serializer/WritableSerialization.java	(working copy)
@@ -37,13 +37,13 @@
 public class WritableSerialization extends Configured 
   implements Serialization<Writable> {
   
-  static class WritableDeserializer extends Configured 
-    implements Deserializer<Writable> {
+  static class WritableDeserializer<K extends Writable> extends Configured 
+    implements Deserializer<K> {
 
-    private Class<?> writableClass;
+    private Class<K> writableClass;
     private DataInputStream dataIn;
     
-    public WritableDeserializer(Configuration conf, Class<?> c) {
+    public WritableDeserializer(Configuration conf, Class<K> c) {
       setConf(conf);
       this.writableClass = c;
     }
@@ -56,11 +56,11 @@
       }
     }
     
-    public Writable deserialize(Writable w) throws IOException {
-      Writable writable;
+    @SuppressWarnings("unchecked")
+    public K deserialize(K w) throws IOException {
+      K writable;
       if (w == null) {
-        writable 
-          = (Writable) ReflectionUtils.newInstance(writableClass, getConf());
+        writable = (K)ReflectionUtils.newInstance(writableClass, getConf());
       } else {
         writable = w;
       }
@@ -74,7 +74,8 @@
     
   }
   
-  static class WritableSerializer implements Serializer<Writable> {
+  static class WritableSerializer<K extends Writable> 
+    implements Serializer<K> {
 
     private DataOutputStream dataOut;
     
@@ -86,7 +87,7 @@
       }
     }
 
-    public void serialize(Writable w) throws IOException {
+    public void serialize(K w) throws IOException {
       w.write(dataOut);
     }
 
@@ -100,12 +101,12 @@
     return Writable.class.isAssignableFrom(c);
   }
 
-  public Deserializer<Writable> getDeserializer(Class<Writable> c) {
-    return new WritableDeserializer(getConf(), c);
+  public <K extends Writable> Deserializer<K> getDeserializer(Class<K> c) {
+    return new WritableDeserializer<K>(getConf(), c);
   }
 
-  public Serializer<Writable> getSerializer(Class<Writable> c) {
-    return new WritableSerializer();
+  public <K extends Writable> Serializer<K> getSerializer(Class<K> c) {
+    return new WritableSerializer<K>();
   }
 
 }
Index: src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
===================================================================
--- src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java	(working copy)
@@ -45,11 +45,13 @@
   private T key1;
   private T key2;
 
-  protected DeserializerComparator(Deserializer<T> deserializer)
-    throws IOException {
-    
+  protected DeserializerComparator(Deserializer<T> deserializer) {
     this.deserializer = deserializer;
-    this.deserializer.open(buffer);
+    try {
+      this.deserializer.open(buffer);
+    }catch (IOException ex) {
+      throw new RuntimeException(ex); //should not occur
+    }
   }
 
   public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
Index: src/java/org/apache/hadoop/io/serializer/Serialization.java
===================================================================
--- src/java/org/apache/hadoop/io/serializer/Serialization.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/serializer/Serialization.java	(working copy)
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.io.serializer;
 
+import java.io.Serializable;
+
+import org.apache.hadoop.io.Writable;
+
 /**
  * <p>
  * Encapsulates a {@link Serializer}/{@link Deserializer} pair.
  * </p>
- * @param <T>
+ * @param <T> The super type of the Serialization, such as {@link Writable}, 
+ * {@link Serializable}, etc.
  */
 public interface Serialization<T> {
   
@@ -33,12 +38,16 @@
   boolean accept(Class<?> c);
   
   /**
+   * @param c the class of the objects to be serialized
+   * @param <K> the actual class type of the objects
    * @return a {@link Serializer} for the given class.
    */
-  Serializer<T> getSerializer(Class<T> c);
+  <K extends T> Serializer<K> getSerializer(Class<K> c);
 
   /**
+   * @param c the class of the objects to be serialized
+   * @param <K> the actual class type of the objects
    * @return a {@link Deserializer} for the given class.
    */
-  Deserializer<T> getDeserializer(Class<T> c);
+  <K extends T> Deserializer<K> getDeserializer(Class<K> c);
 }
Index: src/java/org/apache/hadoop/io/ByteWritable.java
===================================================================
--- src/java/org/apache/hadoop/io/ByteWritable.java	(revision 656306)
+++ src/java/org/apache/hadoop/io/ByteWritable.java	(working copy)
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.io;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
 /** A WritableComparable for a single byte. */
-public class ByteWritable implements WritableComparable {
+public class ByteWritable implements WritableComparable<ByteWritable> {
   private byte value;
 
   public ByteWritable() {}
@@ -43,6 +45,7 @@
   }
 
   /** Returns true iff <code>o</code> is a ByteWritable with the same value. */
+  @Override
   public boolean equals(Object o) {
     if (!(o instanceof ByteWritable)) {
       return false;
@@ -51,27 +54,30 @@
     return this.value == other.value;
   }
 
+  @Override
   public int hashCode() {
-    return (int)value;
+    return value;
   }
 
   /** Compares two ByteWritables. */
-  public int compareTo(Object o) {
+  public int compareTo(ByteWritable that) {
     int thisValue = this.value;
-    int thatValue = ((ByteWritable)o).value;
+    int thatValue = that.value;
     return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
   }
 
+  @Override
   public String toString() {
     return Byte.toString(value);
   }
 
   /** A Comparator optimized for ByteWritable. */ 
-  public static class Comparator extends WritableComparator {
+  public static class Comparator extends WritableComparator<ByteWritable> {
     public Comparator() {
       super(ByteWritable.class);
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1,
                        byte[] b2, int s2, int l2) {
       byte thisValue = b1[s1];
Index: src/java/org/apache/hadoop/mapred/JobConf.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobConf.java	(revision 656306)
+++ src/java/org/apache/hadoop/mapred/JobConf.java	(working copy)
@@ -20,30 +20,30 @@
 
 
 import java.io.IOException;
-
+import java.net.URL;
+import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.StringTokenizer;
 
-import java.net.URL;
-import java.net.URLDecoder;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.DefaultComparator;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
-
+import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 
@@ -593,11 +593,11 @@
    * @return the {@link RawComparator} comparator used to compare keys.
    */
   public RawComparator getOutputKeyComparator() {
-    Class theClass = getClass("mapred.output.key.comparator.class", null,
+    Class<?> theClass = getClass("mapred.output.key.comparator.class", null,
     		RawComparator.class);
     if (theClass != null)
       return (RawComparator)ReflectionUtils.newInstance(theClass, this);
-    return WritableComparator.get(getMapOutputKeyClass());
+    return DefaultComparator.get(getMapOutputKeyClass(), this);
   }
 
   /**
Index: src/java/org/apache/hadoop/record/RecordComparator.java
===================================================================
--- src/java/org/apache/hadoop/record/RecordComparator.java	(revision 656306)
+++ src/java/org/apache/hadoop/record/RecordComparator.java	(working copy)
@@ -23,15 +23,16 @@
 /**
  * A raw record comparator base class
  */
-public abstract class RecordComparator extends WritableComparator {
+public abstract class RecordComparator<K extends Record<K>> extends WritableComparator<K> {
   
   /**
    * Construct a raw {@link Record} comparison implementation. */
-  protected RecordComparator(Class recordClass) {
+  protected RecordComparator(Class<K> recordClass) {
     super(recordClass);
   }
   
   // inheric JavaDoc
+  @Override
   public abstract int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
   
   /**
@@ -40,7 +41,8 @@
    * @param c record classs for which a raw comparator is provided
    * @param comparator Raw comparator instance for class c 
    */
-  public static synchronized void define(Class c, RecordComparator comparator) {
+  public static synchronized <K extends Record<K>> 
+    void define(Class<K> c, RecordComparator<K> comparator) {
     WritableComparator.define(c, comparator);
   }
 }
Index: src/java/org/apache/hadoop/record/Record.java
===================================================================
--- src/java/org/apache/hadoop/record/Record.java	(revision 656306)
+++ src/java/org/apache/hadoop/record/Record.java	(working copy)
@@ -18,17 +18,19 @@
 
 package org.apache.hadoop.record;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+
 import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Abstract class that is extended by generated classes.
- * 
+ * @param K the actual Record class, so that compareTo(K peer) can be defined
  */
-public abstract class Record implements WritableComparable, Cloneable {
+public abstract class Record<K> implements 
+  WritableComparable<K>, Cloneable {
   
   /**
    * Serialize a record with tag (ususally field name)
@@ -78,6 +80,7 @@
   }
 
   // inherit javadoc
+  @Override
   public String toString() {
     try {
       ByteArrayOutputStream s = new ByteArrayOutputStream();
