Index: src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
===================================================================
--- src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java	(revision 8)
+++ src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java	(working copy)
@@ -204,7 +204,7 @@
     sendOut.writeInt(0);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt((int)512);
-    sendOut.writeInt(20);          // size of packet
+    sendOut.writeInt(4);           // size of packet
     sendOut.writeLong(0);          // OffsetInBlock
     sendOut.writeLong(100);        // sequencenumber
     sendOut.writeBoolean(false);   // lastPacketInBlock
@@ -229,7 +229,7 @@
     sendOut.writeInt(0);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt((int)512);    // checksum size
-    sendOut.writeInt(20);          // size of packet
+    sendOut.writeInt(8);           // size of packet
     sendOut.writeLong(0);          // OffsetInBlock
     sendOut.writeLong(100);        // sequencenumber
     sendOut.writeBoolean(true);    // lastPacketInBlock
Index: src/java/org/apache/hadoop/dfs/DFSClient.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DFSClient.java	(revision 8)
+++ src/java/org/apache/hadoop/dfs/DFSClient.java	(working copy)
@@ -39,6 +39,7 @@
 import java.util.zip.CRC32;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ConcurrentHashMap;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 
 import javax.net.SocketFactory;
@@ -1523,7 +1524,6 @@
     private DataInputStream blockReplyStream;
     private Block block;
     private long blockSize;
-    private int buffersize;
     private DataChecksum checksum;
     private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
     private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
@@ -1536,8 +1536,6 @@
     private long bytesCurBlock = 0; // bytes writen in current block
     private int packetSize = 0;
     private int chunksPerPacket = 0;
-    private int chunksPerBlock = 0;
-    private int chunkSize = 0;
     private DatanodeInfo[] nodes = null; // list of targets for current block
     private volatile boolean hasError = false;
     private volatile int errorIndex = 0;
@@ -1546,34 +1544,87 @@
 
     private class Packet {
       ByteBuffer buffer;
+      byte[]  buf;
       long    seqno;               // sequencenumber of buffer in block
       long    offsetInBlock;       // offset in block
       boolean lastPacketInBlock;   // is this the last packet in block?
       int     numChunks;           // number of chunks currently in packet
-  
+      int     dataStart;
+      int     dataPos;
+      int     checksumStart;
+      int     checksumPos;
+      
       // create a new packet
-      Packet(int size, long offsetInBlock) {
-        buffer = ByteBuffer.allocate(size);
-        buffer.clear();
+      Packet(long offsetInBlock) {
         this.lastPacketInBlock = false;
         this.numChunks = 0;
         this.offsetInBlock = offsetInBlock;
         this.seqno = currentSeqno;
         currentSeqno++;
+        
+        buf = new byte[DataNode.PKT_HEADER_LEN + packetSize];
+        
+        checksumPos = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+        checksumStart = checksumPos;
+        dataPos = checksumPos + chunksPerPacket * checksum.getChecksumSize();
+        dataStart = dataPos;
       }
   
-      // writes len bytes from offset off in inarray into
-      // this packet.
-      // 
-      void write(byte[] inarray, int off, int len) {
-        buffer.put(inarray, off, len);
+      void writeData(byte[] inarray, int off, int len) {
+        if ( dataPos + len > buf.length) {
+          throw new BufferOverflowException();
+        }
+        System.arraycopy(inarray, off, buf, dataPos, len);
+        dataPos += len;
       }
   
-      // writes an integer into this packet. 
-      //
-      void  writeInt(int value) {
-       buffer.putInt(value);
+      void  writeChecksum(byte[] inarray, int off, int len) {
+        if (checksumPos + len > dataStart) {
+          throw new BufferOverflowException();
+        }
+        System.arraycopy(inarray, off, buf, checksumPos, len);
+        checksumPos += len;
       }
+      
+      /**
+       * Returns ByteBuffer that contains one full packet, including header.
+       */
+      ByteBuffer getBuffer() {
+        if (buffer != null) {
+          return buffer;
+        }
+        
+        //prepare header and close gap in buffer if necessary.
+        
+        int dataLen = dataPos - dataStart;
+        int checksumLen = checksumPos - checksumStart;
+        int pktLen = SIZE_OF_INTEGER + dataLen + checksumLen;
+        
+        if (checksumPos != dataStart) {
+          /* move the checksum to cover the gap.
+           * This can happen for the last packet.
+           */
+          System.arraycopy(buf, checksumStart, buf, 
+                           dataStart - checksumLen , checksumLen); 
+        }
+        
+        buffer = ByteBuffer.wrap(buf, dataStart - checksumPos,
+                                 DataNode.PKT_HEADER_LEN + pktLen);
+        buf = null;
+
+        /* write the header
+         * The format is described in comment before DataNode.BlockSender
+         */
+        buffer.putInt(pktLen);
+        buffer.putLong(offsetInBlock);
+        buffer.putLong(seqno);
+        buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));
+        buffer.putInt(dataLen);
+
+        buffer.position(dataStart - checksumPos);
+        
+        return buffer;
+      }
     }
   
     //
@@ -1623,7 +1674,6 @@
             try {
               // get packet to be sent.
               one = dataQueue.getFirst();
-              int len = one.buffer.limit();
   
               // get new block from namenode.
               if (blockStream == null) {
@@ -1635,8 +1685,6 @@
                 response.start();
               }
 
-              // user bytes from 'position' to 'limit'.
-              byte[] arr = one.buffer.array();
               if (one.offsetInBlock >= blockSize) {
                 throw new IOException("BlockSize " + blockSize +
                                       " is smaller than data size. " +
@@ -1645,6 +1693,8 @@
                                       " Aborting file " + src);
               }
 
+              ByteBuffer buf = one.getBuffer();
+              
               // move packet from dataQueue to ackQueue
               dataQueue.removeFirst();
               dataQueue.notifyAll();
@@ -1652,22 +1702,20 @@
                 ackQueue.addLast(one);
                 ackQueue.notifyAll();
               } 
-  
-              // write out data to remote datanode
-              blockStream.writeInt(len); // size of this packet
-              blockStream.writeLong(one.offsetInBlock); // data offset in block
-              blockStream.writeLong(one.seqno); // sequence num of packet
-              blockStream.writeBoolean(one.lastPacketInBlock); 
-              blockStream.write(arr, 0, len);
+              
+              blockStream.write(buf.array(), buf.position(), buf.remaining());
+              
               if (one.lastPacketInBlock) {
                 blockStream.writeInt(0); // indicate end-of-block 
               }
               blockStream.flush();
-              LOG.debug("DataStreamer block " + block +
-                        " wrote packet seqno:" + one.seqno +
-                        " size:" + len + 
-                        " offsetInBlock:" + one.offsetInBlock + 
-                        " lastPacketInBlock:" + one.lastPacketInBlock);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("DataStreamer block " + block +
+                          " wrote packet seqno:" + one.seqno +
+                          " size:" + buf.remaining() +
+                          " offsetInBlock:" + one.offsetInBlock + 
+                          " lastPacketInBlock:" + one.lastPacketInBlock);
+              }
             } catch (IOException e) {
               LOG.warn("DataStreamer Exception: " + e);
               hasError = true;
@@ -1940,7 +1988,6 @@
       super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4);
       this.src = src;
       this.blockSize = blockSize;
-      this.buffersize = buffersize;
       this.progress = progress;
       if (progress != null) {
         LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
@@ -1956,11 +2003,10 @@
       }
       checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
                                               bytesPerChecksum);
-      // A maximum of 128 chunks per packet, i.e. 64K packet size.
-      chunkSize = bytesPerChecksum + 2 * SIZE_OF_INTEGER; // user data & checksum
-      chunksPerBlock = (int)(blockSize / bytesPerChecksum);
-      chunksPerPacket = Math.min(chunksPerBlock, 128);
-      packetSize = chunkSize * chunksPerPacket;
+      // each packet contains bufferSize of data:
+      chunksPerPacket = (buffersize + bytesPerChecksum - 1)/bytesPerChecksum; 
+      packetSize = (bytesPerChecksum + checksum.getChecksumSize()) 
+                   * chunksPerPacket + SIZE_OF_INTEGER /* size of the data */;
 
       namenode.create(
           src, masked, clientName, overwrite, replication, blockSize);
@@ -2043,7 +2089,9 @@
         //
         // Xmit header info to datanode
         //
-        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream(), buffersize));
+        DataOutputStream out = new DataOutputStream(
+                       new BufferedOutputStream(s.getOutputStream(), 
+                                                DataNode.SMALL_BUFFER_SIZE));
         blockReplyStream = new DataInputStream(s.getInputStream());
 
         out.writeShort( DATA_TRANSFER_VERSION );
@@ -2140,12 +2188,6 @@
                               this.checksum.getChecksumSize() + 
                               " but found to be " + checksum.length);
       }
-      if (len + cklen + SIZE_OF_INTEGER > chunkSize) {
-        throw new IOException("writeChunk() found data of size " +
-                              (len + cklen + 4) +
-                              " that cannot be larger than chukSize " + 
-                              chunkSize);
-      }
 
       synchronized (dataQueue) {
   
@@ -2159,26 +2201,24 @@
         isClosed();
   
         if (currentPacket == null) {
-          currentPacket = new Packet(packetSize, bytesCurBlock);
+          currentPacket = new Packet(bytesCurBlock);
         }
 
-        currentPacket.writeInt(len);
-        currentPacket.write(checksum, 0, cklen);
-        currentPacket.write(b, offset, len);
+        currentPacket.writeChecksum(checksum, 0, cklen);
+        currentPacket.writeData(b, offset, len);
         currentPacket.numChunks++;
         bytesCurBlock += len;
 
         // If packet is full, enqueue it for transmission
         //
         if (currentPacket.numChunks == chunksPerPacket ||
-            bytesCurBlock == chunksPerBlock * bytesPerChecksum) {
+            bytesCurBlock == blockSize) {
           LOG.debug("DFSClient writeChunk packet full seqno " + currentPacket.seqno);
-          currentPacket.buffer.flip();
           //
           // if we allocated a new packet because we encountered a block
           // boundary, reset bytesCurBlock.
           //
-          if (bytesCurBlock == chunksPerBlock * bytesPerChecksum) {
+          if (bytesCurBlock == blockSize) {
             currentPacket.lastPacketInBlock = true;
             bytesCurBlock = 0;
           }
@@ -2195,11 +2235,12 @@
      * Waits till all existing data is flushed and
      * confirmations received from datanodes.
      */
-    @Override
-    public synchronized void flush() throws IOException {
+    private synchronized void flushInternal() throws IOException {
       checkOpen();
       isClosed();
   
+      flushBuffer();
+      
       while (!closed) {
         synchronized (dataQueue) {
           isClosed();
@@ -2207,7 +2248,6 @@
           // if there is data in the current buffer, send it across
           //
           if (currentPacket != null) {
-            currentPacket.buffer.flip();
             dataQueue.addLast(currentPacket);
             dataQueue.notifyAll();
             currentPacket = null;
@@ -2280,15 +2320,14 @@
         // packet with empty payload.
         synchronized (dataQueue) {
           if (currentPacket == null && bytesCurBlock != 0) {
-            currentPacket = new Packet(packetSize, bytesCurBlock);
-            currentPacket.writeInt(0); // one chunk with empty contents
+            currentPacket = new Packet(bytesCurBlock);
           }
           if (currentPacket != null) { 
             currentPacket.lastPacketInBlock = true;
           }
         }
 
-        flush();             // flush all data to Datanodes
+        flushInternal();  // flush all data to Datanodes
         closed = true;
  
         // wait for threads to finish processing
@@ -2346,7 +2385,9 @@
 
     void setChunksPerPacket(int value) {
       chunksPerPacket = Math.min(chunksPerPacket, value);
-      packetSize = chunkSize * chunksPerPacket;
+      packetSize = (checksum.getBytesPerChecksum() + 
+                    checksum.getChecksumSize()) * chunksPerPacket +
+                   SIZE_OF_INTEGER;
     }
   }
 
Index: src/java/org/apache/hadoop/dfs/DataNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataNode.java	(revision 8)
+++ src/java/org/apache/hadoop/dfs/DataNode.java	(working copy)
@@ -908,7 +908,7 @@
       DataInputStream in=null; 
       try {
         in = new DataInputStream(
-            new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
+            new BufferedInputStream(s.getInputStream(), SMALL_BUFFER_SIZE));
         short version = in.readShort();
         if ( version != DATA_TRANSFER_VERSION ) {
           throw new IOException( "Version Mismatch" );
@@ -1085,7 +1085,7 @@
             mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
             mirrorOut = new DataOutputStream(
                           new BufferedOutputStream(mirrorSock.getOutputStream(),
-                                                   BUFFER_SIZE));
+                                                   SMALL_BUFFER_SIZE));
             mirrorIn = new DataInputStream(mirrorSock.getInputStream());
 
             // Write header: Copied from DFSClient.java!
@@ -1497,6 +1497,10 @@
       Not all the fields might be used while reading.
     
    ************************************************************************ */
+  static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */
+                                      8 + /* offset in block */
+                                      8 + /* seqno */
+                                      1   /* isLastPacketInBlock */);
   
   class BlockSender implements java.io.Closeable {
     private Block block; // the block to read from
@@ -1518,12 +1522,6 @@
     private OutputStream out;
     private ByteBuffer sendBuf;
     private int maxChunksPerPacket;
-    
-    static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */
-                                        8 + /* offset in block */
-                                        8 + /* seqno */
-                                        1 + /* isLastPacketInBlock */
-                                        4   /* data len */ );
        
     BlockSender(Block block, long startOffset, long length,
                 boolean corruptChecksumOk, boolean chunkOffsetOK,
@@ -1561,7 +1559,7 @@
         //set up sendBuf:
         maxChunksPerPacket = Math.max(1,
                  (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
-        byte[] buf = new byte[PKT_HEADER_LEN +
+        byte[] buf = new byte[PKT_HEADER_LEN + SIZE_OF_INTEGER +
                               checksumSize * maxChunksPerPacket +
                               bytesPerChecksum * maxChunksPerPacket];
         sendBuf = ByteBuffer.wrap(buf);
@@ -2026,18 +2024,6 @@
     }
   }
 
-  // this class is a bufferoutputstream that exposes the number of
-  // bytes in the buffer.
-  static private class DFSBufferedOutputStream extends BufferedOutputStream {
-    DFSBufferedOutputStream(OutputStream out, int capacity) {
-      super(out, capacity);
-    }
-
-    int count() {
-      return count;
-    }
-  }
-
   /* A class that receives a block and wites to its own disk, meanwhile
    * may copies it to another site. If a throttler is provided,
    * streaming throttling is also supported. 
@@ -2047,13 +2033,13 @@
     private boolean finalized;
     private DataInputStream in = null; // from where data are read
     private DataChecksum checksum; // from where chunks of a block can be read
-    private DataOutputStream out = null; // to block file at local disk
+    private OutputStream out = null; // to block file at local disk
     private DataOutputStream checksumOut = null; // to crc file at local disk
-    private DFSBufferedOutputStream bufStream = null;
     private int bytesPerChecksum;
     private int checksumSize;
-    private byte buf[];
-    private byte checksumBuf[];
+    private ByteBuffer buf; // contains one full packet.
+    private int bufRead; //amount of valid data in the buf
+    private int maxPacketReadLen;
     private long offsetInBlock;
     final private String inAddr;
     private String mirrorAddr;
@@ -2083,17 +2069,13 @@
         this.checksum = DataChecksum.newDataChecksum(in);
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
-        this.buf = new byte[bytesPerChecksum + checksumSize];
-        this.checksumBuf = new byte[checksumSize];
         //
         // Open local disk out
         //
         streams = data.writeToBlock(block, isRecovery);
         this.finalized = data.isValidBlock(block);
         if (streams != null) {
-          this.bufStream = new DFSBufferedOutputStream(
-                                          streams.dataOut, BUFFER_SIZE);
-          this.out = new DataOutputStream(bufStream);
+          this.out = streams.dataOut;
           this.checksumOut = new DataOutputStream(new BufferedOutputStream(
                                           streams.checksumOut, BUFFER_SIZE));
         }
@@ -2144,15 +2126,11 @@
       }
     }
 
-    /* receive a chunk: write it to disk & mirror it to another stream */
-    private void receiveChunk( int len, byte[] checksumBuf, int checksumOff ) 
-                              throws IOException {
-      if (len <= 0 || len > bytesPerChecksum) {
-        throw new IOException("Got wrong length during writeBlock(" + block
-            + ") from " + inAddr + " at offset " + offsetInBlock + ": " + len
-            + " expected <= " + bytesPerChecksum);
-      }
 
+    /* Verify length and checksum of a chunk */
+    private void verifyChunk( byte[] dataBuf, int dataOff, int len, 
+                              byte[] checksumBuf, int checksumOff ) 
+                              throws IOException {      
       if (lastLen > 0 && lastLen != bytesPerChecksum) {
         throw new IOException("Got wrong length during receiveBlock(" + block
           + ") from " + inAddr + " : " + " got " + lastLen + " instead of "
@@ -2162,111 +2140,219 @@
       lastLen = curLen;
       curLen = len;
 
-      in.readFully(buf, 0, len);
+      checksum.update(dataBuf, dataOff, len);
 
-      /*
-       * Verification is not included in the initial design. For now, it at
-       * least catches some bugs. Later, we can include this after showing that
-       * it does not affect performance much.
-       */
-      checksum.update(buf, 0, len);
-
       if (!checksum.compare(checksumBuf, checksumOff)) {
         throw new IOException("Unexpected checksum mismatch "
             + "while writing " + block + " from " + inAddr);
       }
 
       checksum.reset();
+    }
 
-      // record the fact that the current write is still in progress
-      synchronized (currentWriteLock) {
-        currentWrite = true;
+    /// Invokes verifyChunk() on each of the chunks
+    private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
+                               byte[] checksumBuf, int checksumOff ) 
+                               throws IOException {
+      while (len > 0) {
+        int chunkLen = Math.min(len, bytesPerChecksum);
+        
+        verifyChunk(dataBuf, dataOff, chunkLen, checksumBuf, checksumOff);
+        
+        dataOff += chunkLen;
+        checksumOff += checksumSize;
+        len -= chunkLen;
       }
-      offsetInBlock += len;
-
-      // First write to remote node before writing locally.
-      if (mirrorOut != null) {
-        try {
-          mirrorOut.writeInt(len);
-          mirrorOut.write(checksumBuf, checksumOff, checksumSize);
-          mirrorOut.write(buf, 0, len);
-        } catch (IOException ioe) {
-          LOG.info(dnRegistration + ":Exception writing block " +
-                   block + " to mirror " + mirrorAddr + "\n" +
-                   StringUtils.stringifyException(ioe));
-          mirrorOut = null;
-          //
-          // If stream-copy fails, continue
-          // writing to disk for replication requests. For client
-          // writes, return error so that the client can do error
-          // recovery.
-          //
-          if (clientName.length() > 0) {
-            synchronized (currentWriteLock) {
-              currentWrite = false;
-              currentWriteLock.notifyAll();
-            }
-            throw ioe;
-          }
+    }
+    
+    /**
+     * Makes sure buf.position() is zero without modifying buf.remaining().
+     * It moves the data if position needs to be changed.
+     */
+    private void shiftBufData() {
+      if (bufRead != buf.limit()) {
+        throw new IllegalStateException("bufRead should be same as " +
+                                        "buf.limit()");
+      }
+      
+      //shift the remaining data on buf to the front
+      if (buf.position() > 0) {
+        int dataLeft = buf.remaining();
+        if (dataLeft > 0) {
+          byte[] b = buf.array();
+          System.arraycopy(b, buf.position(), b, 0, dataLeft);
         }
+        buf.position(0);
+        bufRead = dataLeft;
+        buf.limit(bufRead);
       }
-
-      try {
-        if (!finalized) {
-          out.write(buf, 0, len);
-          // Write checksum
-          checksumOut.write(checksumBuf, checksumOff, checksumSize);
-          myMetrics.bytesWritten.inc(len);
+    }
+    
+    /**
+     * reads upto toRead byte to buf at buf.limit() and increments the limit.
+     * throws an IOException if read does not succeed.
+     */
+    private int readToBuf(int toRead) throws IOException {
+      if (toRead < 0) {
+        toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
+                 - buf.limit();
+      }
+      
+      int nRead = in.read(buf.array(), buf.limit(), toRead);
+      
+      if (nRead < 0) {
+        throw new EOFException("while trying to read " + toRead + " bytes");
+      }
+      bufRead = buf.limit() + nRead;
+      buf.limit(bufRead);
+      return nRead;
+    }
+    
+    
+    /**
+     * Reads (at least) one packet and returns the packet length.
+     * buf.position() points to the start of the packet and 
+     * buf.limit() point to the end of the packet. There could 
+     * be more data from next packet in buf.<br><br>
+     * 
+     * It tries to read a full packet with single read call.
+     * Consecutinve packets are usually of the same length.
+     */
+    private int readNextPacket() throws IOException {
+      /* This dances around buf a little bit, mainly to read 
+       * full packet with single read and to accept arbitarary size  
+       * for next packet at the same time.
+       */
+      if (buf == null) {
+        //initialize buffer to the best guess size:
+        buf = ByteBuffer.allocate(PKT_HEADER_LEN + SIZE_OF_INTEGER +
+                                  ((bytesPerChecksum + checksumSize) *
+                                   ((BUFFER_SIZE + bytesPerChecksum - 1)/
+                                     bytesPerChecksum)));  
+        buf.limit(0);
+      }
+      
+      // See if there is data left in the buffer :
+      if (bufRead > buf.limit()) {
+        buf.limit(bufRead);
+      }
+      
+      while (buf.remaining() < SIZE_OF_INTEGER) {
+        if (buf.position() > 0) {
+          shiftBufData();
         }
-      } catch (IOException iex) {
-        checkDiskError(iex);
-        throw iex;
-      } finally {
-        synchronized (currentWriteLock) {
-          currentWrite = false;
-          currentWriteLock.notifyAll();
+        readToBuf(-1);
+      }
+      
+      /* We mostly have the full packet or at least enough for an int
+       */
+      buf.mark();
+      int pktLen = buf.getInt();
+      buf.reset();
+      
+      if (pktLen == 0) {
+        //end of stream!
+        buf.limit(buf.position() + SIZE_OF_INTEGER);
+        return 0;
+      }
+      
+      // check corrupt values for pktLen, 100MB upper limit should be ok?
+      if (pktLen < 0 || pktLen > (100*1024*1024)) {
+        throw new IOException("Incorrect value for packet len : " + pktLen);
+      }
+      
+      int pktSize = pktLen + PKT_HEADER_LEN;
+      
+      if (buf.remaining() < pktSize) {
+        //we need to read more data
+        int toRead = pktSize - buf.remaining();
+        
+        // first make sure buf has enough space.        
+        int spaceLeft = buf.capacity() - buf.limit();
+        if (toRead > spaceLeft && buf.position() > 0) {
+          shiftBufData();
+          spaceLeft = buf.capacity() - buf.limit();
         }
+        
+        if (toRead > spaceLeft) {
+          byte oldBuf[] = buf.array();
+          int toCopy = buf.limit();
+          buf = ByteBuffer.allocate(toCopy + toRead);
+          System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
+          buf.limit(toCopy);
+        }
+        
+        //now read:
+        while (toRead > 0) {
+          toRead -= readToBuf(toRead);
+        }
       }
-
-      if (throttler != null) { // throttle I/O
-        throttler.throttle(len + checksumSize + 4);
+      
+      if (buf.remaining() > pktSize) {
+        buf.limit(buf.position() + pktSize);
       }
+      
+      if (pktSize > maxPacketReadLen) {
+        maxPacketReadLen = pktSize;
+      }
+      
+      return pktLen;
     }
-
-    /* 
-     * Receive and process a packet. It contains many chunks.
+    
+    /** 
+     * Receives and processes a packet. It can contain many chunks.
+     * returns size of the packet.
      */
-    private void receivePacket(int packetSize) throws IOException {
-      /* TEMP: Currently this handles both interleaved 
-       * and non-interleaved DATA_CHUNKs in side the packet.
-       * non-interleaved is required for HADOOP-2758 and in future.
-       * iterleaved will be removed once extra buffer copies are removed
-       * in write path (HADOOP-1702).
-       * 
-       * Format of Non-interleaved data packets is described in the 
-       * comment before BlockSender.
-       */
-      offsetInBlock = in.readLong(); // get offset of packet in block
-      long seqno = in.readLong();    // get seqno
-      boolean lastPacketInBlock = in.readBoolean();
-      int curPacketSize = 0;         
-      LOG.debug("Receiving one packet for block " + block +
-                " of size " + packetSize +
-                " seqno " + seqno +
-                " offsetInBlock " + offsetInBlock +
-                " lastPacketInBlock " + lastPacketInBlock);
+    private int receivePacket() throws IOException {
+      
+      int pktLen = readNextPacket();
+      
+      if (pktLen <= 0) {
+        return pktLen;
+      }
+      
+      buf.mark();
+      //read the header
+      buf.getInt(); // packet length
+      offsetInBlock = buf.getLong(); // get offset of packet in block
+      long seqno = buf.getLong();    // get seqno
+      boolean lastPacketInBlock = (buf.get() != 0);
+      
+      int endOfHeader = buf.position();
+      buf.reset();
+      
+      if (LOG.isDebugEnabled()){
+        LOG.debug("Receiving one packet for block " + block +
+                  " of size " + pktLen +
+                  " seqno " + seqno +
+                  " offsetInBlock " + offsetInBlock +
+                  " lastPacketInBlock " + lastPacketInBlock);
+      }
+      
       setBlockPosition(offsetInBlock);
-
-      // send packet header to next datanode in pipeline
+      
+      // record the fact that the current write is still in progress
+      synchronized (currentWriteLock) {
+        currentWrite = true;
+      }
+      
+      //First write the packet to the mirror:
       if (mirrorOut != null) {
+        
+        // first enqueue the ack packet to avoid a race with the response coming
+        // from downstream datanode.
+        if (responder != null) {
+          ((PacketResponder)responder.getRunnable()).enqueue(seqno, 
+                                          lastPacketInBlock); 
+        }
+        
         try {
-          mirrorOut.writeInt(packetSize);
-          mirrorOut.writeLong(offsetInBlock);
-          mirrorOut.writeLong(seqno);
-          mirrorOut.writeBoolean(lastPacketInBlock);
+          
+          mirrorOut.write(buf.array(), buf.position(), buf.remaining());
+          
         } catch (IOException e) {
           LOG.info("Exception writing to mirror " + mirrorAddr + "\n"
-              + StringUtils.stringifyException(e));
+                   + StringUtils.stringifyException(e));
           mirrorOut = null;
 
           // If stream-copy fails, continue
@@ -2275,68 +2361,63 @@
           // recovery.
           //
           if (clientName.length() > 0) {
+            synchronized (currentWriteLock) {
+              currentWrite = false;
+              currentWriteLock.notifyAll();
+            }
             throw e;
           }
         }
-        // first enqueue the ack packet to avoid a race with the response coming
-        // from downstream datanode.
-        if (responder != null) {
-          ((PacketResponder)responder.getRunnable()).enqueue(seqno, 
-                                          lastPacketInBlock); 
-        }
       }
-
-      int len = in.readInt();
-      curPacketSize += 4;            // read an integer in previous line
-
-      if (len == 0) {
-        LOG.info("Receiving empty packet for block " + block);
-        if (mirrorOut != null) {
-          mirrorOut.writeInt(len);
-          mirrorOut.flush();
+      
+      try {
+        buf.position(endOfHeader);        
+        int len = buf.getInt();
+   
+        if (len < 0) {
+          throw new IOException("Got wrong length during writeBlock(" + block
+              + ") from " + inAddr + " at offset " + offsetInBlock + ": " 
+              + len);
         }
-      }
-
-      while (len != 0) {
-        int checksumOff = 0;    
-        if (len > 0) {
-          int checksumLen = (len + bytesPerChecksum - 1)/bytesPerChecksum*
+        
+        if (len == 0) {
+          LOG.debug("Receiving empty packet for block " + block);
+        } else {
+          offsetInBlock += len;
+          
+          int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                             checksumSize;
-          if (checksumBuf.length < checksumLen) {
-            checksumBuf = new byte[checksumLen];
+        
+          if ( buf.remaining() != (checksumLen + len)) {
+            throw new IOException("Data remaining in packet does not match " +
+                                   " sum of checksumLen dataLen");
           }
-          // read the checksum
-          in.readFully(checksumBuf, 0, checksumLen);
-        }
         
-        while (len != 0) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Receiving one chunk for block " + block +
-                      " of size " + len);
-          }
+          int checksumOff = buf.position();
+          int dataOff = checksumOff + checksumLen;
+          byte pktBuf[] = buf.array();
+
+          buf.position(buf.limit()); // move to the end of the data.
           
-          int toRecv = Math.min(len, bytesPerChecksum);
-          
-          receiveChunk(toRecv, checksumBuf, checksumOff);
-          
-          len -= toRecv;
-          checksumOff += checksumSize;       
-          curPacketSize += (toRecv + checksumSize);
-          if (curPacketSize > packetSize) {
-            throw new IOException("Packet size for block " + block +
-                                  " too long " + curPacketSize +
-                                  " was expecting " + packetSize);
-          } 
-        }
-        
-        if (curPacketSize == packetSize) {
-          if (mirrorOut != null) {
-            mirrorOut.flush();
+          verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+
+          try {
+            if (!finalized) {
+              //finally write to the disk :
+              out.write(pktBuf, dataOff, len);
+              checksumOut.write(pktBuf, checksumOff, checksumLen);
+              myMetrics.bytesWritten.inc(len);
+            }
+          } catch (IOException iex) {
+            checkDiskError(iex);
+            throw iex;
           }
-          break;
         }
-        len = in.readInt();
-        curPacketSize += 4;
+      } finally {
+        synchronized (currentWriteLock) {
+          currentWrite = false;
+          currentWriteLock.notifyAll();
+        }
       }
 
       // put in queue for pending acks
@@ -2344,6 +2425,12 @@
         ((PacketResponder)responder.getRunnable()).enqueue(seqno,
                                         lastPacketInBlock); 
       }
+      
+      if (throttler != null) { // throttle I/O
+        throttler.throttle(pktLen);
+      }
+      
+      return pktLen;
     }
 
     public void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -2377,13 +2464,9 @@
         }
 
         /* 
-         * Skim packet headers. A response is needed for every packet.
+         * Receive until packet length is zero.
          */
-        int len = in.readInt(); // get packet size
-        while (len != 0) {
-          receivePacket(len);
-          len = in.readInt(); // get packet size
-        }
+        while (receivePacket() > 0) {}
 
         // flush the mirror out
         if (mirrorOut != null) {
@@ -2448,8 +2531,7 @@
         }
         return;
       }
-      if (data.getChannelPosition(block, streams) + bufStream.count() == 
-                                                    offsetInBlock) {
+      if (data.getChannelPosition(block, streams) == offsetInBlock) {
         return;                   // nothing to do 
       }
       if (offsetInBlock % bytesPerChecksum != 0) {
