Index: src/core/org/apache/hadoop/fs/FSDataOutputStream.java
===================================================================
--- src/core/org/apache/hadoop/fs/FSDataOutputStream.java	(revision 745757)
+++ src/core/org/apache/hadoop/fs/FSDataOutputStream.java	(working copy)
@@ -25,41 +25,51 @@
 public class FSDataOutputStream extends DataOutputStream implements Syncable {
   private OutputStream wrappedStream;
 
-  private static class PositionCache extends FilterOutputStream {
+  private static final class StatsOutputStream extends FilterOutputStream {
     private FileSystem.Statistics statistics;
+
+    public StatsOutputStream(OutputStream out, FileSystem.Statistics stats) {
+       super(out);
+    }
+
+    public void write(int b) throws IOException {
+       out.write(b);
+       if (statistics != null) {
+        statistics.incrementBytesWritten(1);
+      }
+    }
+
+
+   public void write(byte b[], int off, int len) throws IOException {
+      out.write(b, off, len);
+      if (statistics != null) {
+        statistics.incrementBytesWritten(len);
+      }
+    }
+  }
+
+  private static final class PositionCache extends BufferedOutputStream {
     long position;
 
     public PositionCache(OutputStream out, 
-                         FileSystem.Statistics stats,
                          long pos) throws IOException {
-      super(out);
-      statistics = stats;
+      super(out, 256*1024);
       position = pos;
     }
 
     public void write(int b) throws IOException {
-      out.write(b);
+      super.write(b);
       position++;
-      if (statistics != null) {
-        statistics.incrementBytesWritten(1);
-      }
     }
     
     public void write(byte b[], int off, int len) throws IOException {
-      out.write(b, off, len);
+      super.write(b, off, len);
       position += len;                            // update position
-      if (statistics != null) {
-        statistics.incrementBytesWritten(len);
-      }
     }
       
     public long getPos() throws IOException {
       return position;                            // return cached position
     }
-    
-    public void close() throws IOException {
-      out.close();
-    }
   }
 
   @Deprecated
@@ -74,7 +84,7 @@
 
   public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats,
                             long startPosition) throws IOException {
-    super(new PositionCache(out, stats, startPosition));
+    super(new PositionCache(new StatsOutputStream(out, stats), startPosition));
     wrappedStream = out;
   }
   
@@ -93,6 +103,7 @@
 
   /** {@inheritDoc} */
   public void sync() throws IOException {
+    flush(); // make sure the buffered output stream is flushed
     if (wrappedStream instanceof Syncable) {
       ((Syncable)wrappedStream).sync();
     }
Index: src/core/org/apache/hadoop/fs/FSDataInputStream.java
===================================================================
--- src/core/org/apache/hadoop/fs/FSDataInputStream.java	(revision 745757)
+++ src/core/org/apache/hadoop/fs/FSDataInputStream.java	(working copy)
@@ -23,10 +23,23 @@
  * and buffers input through a {@link BufferedInputStream}. */
 public class FSDataInputStream extends DataInputStream
     implements Seekable, PositionedReadable {
+  private static final class ViewableBufferedInputStream extends BufferedInputStream {
+    public ViewableBufferedInputStream(InputStream is) { super(is); }
+    /* how many bytes have been read since we read a buffer */
+    private int getBufferReadBytes() { return pos; } 
+    private void resetBuffer() {
+      pos = count = 0;
+    }
+  }
+  private ViewableBufferedInputStream buffered;
+  private InputStream wrapped;
 
   public FSDataInputStream(InputStream in)
     throws IOException {
-    super(in);
+    super(new ViewableBufferedInputStream(in));
+    buffered = (ViewableBufferedInputStream) this.in;
+    wrapped = in;
+
     if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
       throw new IllegalArgumentException(
           "In is not an instance of Seekable or PositionedReadable");
@@ -34,29 +47,32 @@
   }
   
   public synchronized void seek(long desired) throws IOException {
-    ((Seekable)in).seek(desired);
+    ((Seekable)wrapped).seek(desired);
+    buffered.resetBuffer();
   }
 
   public long getPos() throws IOException {
-    return ((Seekable)in).getPos();
+    return ((Seekable)wrapped).getPos() + buffered.getBufferReadBytes();
   }
   
   public int read(long position, byte[] buffer, int offset, int length)
     throws IOException {
-    return ((PositionedReadable)in).read(position, buffer, offset, length);
+    return ((PositionedReadable)wrapped).read(position, buffer, offset, length);
   }
   
   public void readFully(long position, byte[] buffer, int offset, int length)
     throws IOException {
-    ((PositionedReadable)in).readFully(position, buffer, offset, length);
+    ((PositionedReadable)wrapped).readFully(position, buffer, offset, length);
   }
   
   public void readFully(long position, byte[] buffer)
     throws IOException {
-    ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
+    ((PositionedReadable)wrapped).readFully(position, buffer, 0, buffer.length);
   }
   
   public boolean seekToNewSource(long targetPos) throws IOException {
-    return ((Seekable)in).seekToNewSource(targetPos); 
+    boolean ret = ((Seekable)wrapped).seekToNewSource(targetPos); 
+    buffered.resetBuffer();
+    return ret;
   }
 }
