Index: src/java/org/apache/hadoop/mapred/IFile.java
===================================================================
--- src/java/org/apache/hadoop/mapred/IFile.java	(revision 0)
+++ src/java/org/apache/hadoop/mapred/IFile.java	(revision 0)
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+
+class IFile {
+  public static class Writer<K extends Object, V extends Object> {
+    DataOutputStream out;
+    boolean ownOutputStream = false;
+    Compressor compressor;
+    
+    Class<K> keyClass;
+    Class<V> valueClass;
+    Serializer<K> keySerializer;
+    Serializer<V> valueSerializer;
+    
+    DataOutputBuffer buffer = new DataOutputBuffer();
+
+    public Writer(Configuration conf, FileSystem fs, Path file, 
+                  Class<K> keyClass, Class<V> valueClass,
+                  CompressionCodec codec) throws IOException {
+      this(conf, fs.create(file), keyClass, valueClass, codec);
+      ownOutputStream = true;
+    }
+    
+    public Writer(Configuration conf, OutputStream out, 
+                  Class<K> keyClass, Class<V> valueClass,
+                  CompressionCodec codec) throws IOException {
+      if (codec != null) {
+        this.compressor = CodecPool.getCompressor(codec);
+        this.out = 
+          new DataOutputStream(codec.createOutputStream(out, compressor));
+      } else {
+        this.out = new DataOutputStream(out);
+      }
+      
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+      SerializationFactory serializationFactory = new SerializationFactory(conf);
+      this.keySerializer = serializationFactory.getSerializer(keyClass);
+      this.keySerializer.open(buffer);
+      this.valueSerializer = serializationFactory.getSerializer(valueClass);
+      this.valueSerializer.open(buffer);
+    }
+    
+    public void close() throws IOException {
+      CodecPool.returnCompressor(compressor);
+      
+      keySerializer.close();
+      valueSerializer.close();
+
+      if (out != null) {
+        
+        // Close the underlying stream iff we own it...
+        if (ownOutputStream) {
+          out.close();
+        } else {
+          out.flush();
+        }
+        out = null;
+      }
+
+    }
+    
+    public void append(K key, V value) throws IOException {
+      if (key.getClass() != keyClass)
+        throw new IOException("wrong key class: "+ key.getClass()
+                              +" is not "+ keyClass);
+      if (value.getClass() != valueClass)
+        throw new IOException("wrong value class: "+ value.getClass()
+                              +" is not "+ valueClass);
+
+      // Append the 'key'
+      keySerializer.serialize(key);
+      int keyLength = buffer.getLength();
+      if (keyLength == 0)
+        throw new IOException("zero length keys not allowed: " + key);
+
+      // Append the 'value'
+      valueSerializer.serialize(value);
+
+      // Write the record out
+      out.writeInt(keyLength);                                  // key length
+      out.writeInt(buffer.getLength() - keyLength);             // value length
+      out.write(buffer.getData(), 0, buffer.getLength());       // data
+
+      // Reset
+      buffer.reset();
+    }
+  }
+
+  public static class Reader<K extends Object, V extends Object> {
+    private static final int DEFAULT_BUFFER_SIZE = 128*1024*1024;
+    private static final int INTEGER_SIZE = 4;
+    
+    DataInputStream in;
+    Decompressor decompressor;
+    long bytesRead = 0;
+    long fileLength;
+    
+    byte[] buffer;
+    int bufferSize = 0;
+    DataInputBuffer dataIn = new DataInputBuffer();
+
+    public Reader(Configuration conf, FileSystem fs, Path file,
+                  CompressionCodec codec) throws IOException {
+      this(conf, fs.open(file), fs.getLength(file), codec);
+    }
+    
+    public Reader(Configuration conf, InputStream in, long length, 
+                  CompressionCodec codec) throws IOException {
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+        this.in = new DataInputStream(codec.createInputStream(in, decompressor));
+      } else {
+        this.in = new DataInputStream(in);
+      }
+      this.fileLength = length;
+      
+      buffer = new byte[DEFAULT_BUFFER_SIZE];
+      dataIn.reset(buffer, bufferSize);
+    }
+    
+    public Reader(byte[] data, int start, int length) {
+      buffer = data;
+      fileLength = bufferSize = (length - start);
+      dataIn.reset(data, start, length);
+    }
+    
+    private void readNextBlock(int minSize) throws IOException {
+      buffer = 
+        rejigData(buffer, 
+                  (bufferSize < minSize) ? new byte[minSize << 1] : buffer);
+      bufferSize = buffer.length;
+      dataIn.reset(buffer, bufferSize);
+    }
+    
+    private byte[] rejigData(byte[] source, byte[] destination) 
+    throws IOException{
+      // Copy remaining data into the destination array
+      int bytesRemaining = dataIn.getLength()-dataIn.getPosition();
+      System.arraycopy(source, dataIn.getPosition(), 
+                       destination, 0, bytesRemaining);
+      
+      // Read as much data as will fit from the underlying stream 
+      int spaceAvailable = destination.length - bytesRemaining;
+      long dataAvailable = fileLength - bytesRead;
+      if (dataAvailable < spaceAvailable) {
+        spaceAvailable = (int)dataAvailable;
+      }
+      in.readFully(destination, bytesRemaining, spaceAvailable);
+      
+      return destination;
+    }
+    
+    public boolean next(DataInputBuffer key, DataInputBuffer value) 
+    throws IOException {
+      // Sanity check
+      if (bytesRead > fileLength) {
+        return false;
+      }
+      
+      // Check if we have enough data to read lengths
+      if ((dataIn.getLength() - dataIn.getPosition()) < 2*INTEGER_SIZE) {
+        readNextBlock(2*INTEGER_SIZE);
+      }
+      
+      // Read key and value lengths
+      int oldPos = dataIn.getPosition();
+      int keyLength = dataIn.readInt();
+      int valueLength = dataIn.readInt();
+
+      int pos = dataIn.getPosition();
+      bytesRead += pos - oldPos;
+      
+      if ((dataIn.getLength()-pos) < (keyLength+valueLength)) {
+        readNextBlock(keyLength+valueLength);
+      }
+
+      // Setup the key and value
+      key.reset(dataIn.getData(), pos, keyLength);
+      value.reset(dataIn.getData(), (pos + keyLength), valueLength);
+      
+      // Position for the next record
+      dataIn.skipBytes(keyLength + valueLength);
+
+      bytesRead += keyLength + valueLength;
+      if (bytesRead > fileLength) {
+        throw new EOFException("Read past end of data: " + 
+                               "(" + bytesRead + " v/s " + fileLength + ")");
+      }
+      
+      return true;
+    }
+  }
+}
Index: src/java/org/apache/hadoop/io/DataInputBuffer.java
===================================================================
--- src/java/org/apache/hadoop/io/DataInputBuffer.java	(revision 656433)
+++ src/java/org/apache/hadoop/io/DataInputBuffer.java	(working copy)
@@ -53,6 +53,7 @@
       this.pos = start;
     }
 
+    public byte[] getData() { return buf; }
     public int getPosition() { return pos; }
     public int getLength() { return count; }
   }
@@ -78,6 +79,10 @@
   public void reset(byte[] input, int start, int length) {
     buffer.reset(input, start, length);
   }
+  
+  public byte[] getData() {
+    return buffer.getData();
+  }
 
   /** Returns the current position in the input. */
   public int getPosition() { return buffer.getPosition(); }
