Index: src/java/org/apache/hadoop/net/SocketIOWithTimeout.java
===================================================================
--- src/java/org/apache/hadoop/net/SocketIOWithTimeout.java	(revision 644534)
+++ src/java/org/apache/hadoop/net/SocketIOWithTimeout.java	(working copy)
@@ -158,18 +158,8 @@
         throw e;
       } 
 
-      if (count == 0) {
-        
-        String waitingFor = ""+ops;
-        if (ops == SelectionKey.OP_READ) {
-          waitingFor = "read";
-        } else if (ops == SelectionKey.OP_WRITE) {
-          waitingFor = "write";
-        }
-        
-        throw new SocketTimeoutException(timeout + " millis timeout while " +
-                                         "waiting for channel to be ready for "
-                                         + waitingFor + ". ch : " + channel);
+      if (count == 0) {        
+        throw new SocketTimeoutException(timeoutExceptionString(ops));
       }
       // otherwise the socket should be ready for io.
     }
@@ -178,6 +168,37 @@
   }
   
   /**
+   * This is similar to {@link #doIO(ByteBuffer, int)} except that it
+   * does not perform any I/O, but just wait for the channel to be ready
+   * for I/O as specified in ops.
+   * 
+   * @param ops Selection Ops used for waiting
+   * 
+   * @throws IOException 
+   *         SocketTimeoutException if select on the channel timesout.
+   */
+  void waitForIO(int ops) throws IOException {
+    
+    if (selector.select(channel, ops, timeout) == 0) {
+      throw new SocketTimeoutException(timeoutExceptionString(ops)); 
+    }
+  }
+  
+  private String timeoutExceptionString(int ops) {
+    
+    String waitingFor = "" + ops;
+    if (ops == SelectionKey.OP_READ) {
+      waitingFor = "read";
+    } else if (ops == SelectionKey.OP_WRITE) {
+      waitingFor = "write";
+    }
+    
+    return timeout + " millis timeout while " +
+           "waiting for channel to be ready for " + 
+           waitingFor + ". ch : " + channel;    
+  }
+  
+  /**
    * This maintains a pool of selectors. These selectors are closed
    * once they are idle (unused) for a few seconds.
    */
Index: src/java/org/apache/hadoop/net/SocketInputStream.java
===================================================================
--- src/java/org/apache/hadoop/net/SocketInputStream.java	(revision 644534)
+++ src/java/org/apache/hadoop/net/SocketInputStream.java	(working copy)
@@ -148,4 +148,15 @@
   public int read(ByteBuffer dst) throws IOException {
     return reader.doIO(dst, SelectionKey.OP_READ);
   }
+  
+  /**
+   * waits for the underlying channel to be ready for reading.
+   * The timeout specified for this stream applies to this wait.
+   * 
+   * @throws SocketTimeoutException if the timeout expires while waiting.<br>
+   *         IOException if there is some other error.
+   */
+  public void waitForReadable() throws IOException {
+    reader.waitForIO(SelectionKey.OP_READ);
+  }
 }
Index: src/java/org/apache/hadoop/net/SocketOutputStream.java
===================================================================
--- src/java/org/apache/hadoop/net/SocketOutputStream.java	(revision 644534)
+++ src/java/org/apache/hadoop/net/SocketOutputStream.java	(working copy)
@@ -143,4 +143,15 @@
   public int write(ByteBuffer src) throws IOException {
     return writer.doIO(src, SelectionKey.OP_WRITE);
   }
+  
+  /**
+   * waits for the underlying channel to be ready for writing.
+   * The timeout specified for this stream applies to this wait.
+   * 
+   * @throws SocketTimeoutException if the timeout expires while waiting.<br>
+   *         IOException if there is some other error.
+   */
+  public void waitForWritable() throws IOException {
+    writer.waitForIO(SelectionKey.OP_WRITE);
+  }
 }
Index: src/java/org/apache/hadoop/dfs/DataBlockScanner.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataBlockScanner.java	(revision 644534)
+++ src/java/org/apache/hadoop/dfs/DataBlockScanner.java	(working copy)
@@ -382,7 +382,7 @@
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());
         
-        blockSender.sendBlock(out, throttler);
+        blockSender.sendBlock(out, null, throttler);
 
         LOG.info((second ? "Second " : "") +
                  "Verification succeeded for " + block);
Index: src/java/org/apache/hadoop/dfs/DataNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataNode.java	(revision 644534)
+++ src/java/org/apache/hadoop/dfs/DataNode.java	(working copy)
@@ -41,8 +41,10 @@
 import java.io.*;
 import java.net.*;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
 import java.util.*;
 import java.util.concurrent.Semaphore;
 import java.security.NoSuchAlgorithmException;
@@ -1013,9 +1015,9 @@
       long length = in.readLong();
 
       // send the block
+      OutputStream baseStream = new SocketOutputStream(s, WRITE_TIMEOUT);
       DataOutputStream out = new DataOutputStream(
-            new BufferedOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT),
-                                     SMALL_BUFFER_SIZE));
+            new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
       BlockSender blockSender = null;
       try {
         try {
@@ -1027,7 +1029,7 @@
         }
 
         out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
-        long read = blockSender.sendBlock(out, null); // send data
+        long read = blockSender.sendBlock(out, baseStream, null); // send data
 
         if (blockSender.isBlockReadFully()) {
           // See if client verification succeeded. 
@@ -1296,9 +1298,11 @@
         targetSock.connect(targetAddr, socketTimeout);
         targetSock.setSoTimeout(socketTimeout);
 
+        OutputStream baseStream = new SocketOutputStream(targetSock, 
+                                                         WRITE_TIMEOUT);
         targetOut = new DataOutputStream(new BufferedOutputStream(
-                        new SocketOutputStream(targetSock, WRITE_TIMEOUT),
-                        SMALL_BUFFER_SIZE));
+                                                       baseStream,
+                                                       SMALL_BUFFER_SIZE));
 
         /* send request to the target */
         // fist write header info
@@ -1308,7 +1312,8 @@
         Text.writeString( targetOut, source); // del hint
 
         // then send data
-        long read = blockSender.sendBlock(targetOut, balancingThrottler);
+        long read = blockSender.sendBlock(targetOut, baseStream, 
+                                          balancingThrottler);
 
         myMetrics.bytesRead.inc((int) read);
         myMetrics.blocksRead.inc();
@@ -1474,6 +1479,63 @@
     }
   }
 
+  /**
+   * Transfers data from FileChannel associcated with FileInputStream
+   * to channel associated with SocketOutputStream using 
+   * transferTo() interface.
+   * 
+   * Similar to readFully(), this waits till requested amount of 
+   * data is transfered.
+   * 
+   * The fileSize could be set to -1.
+   */
+  private static void transferToFully(FileChannel inChannel, long startPos,
+                                      SocketOutputStream out,
+                                      int len) throws IOException {
+    
+    WritableByteChannel outChannel = out.getChannel();
+    
+    while (len > 0) {
+      int nTransfered = 0;
+      try {
+        nTransfered = (int) inChannel.transferTo(startPos, len, outChannel);
+      } catch (IOException e) {
+        /* at least jdk1.6.0 on Linux seems to throw IOException
+         * when the socket is full. Hopefully near future verisions will 
+         * handle EAGAIN better. For now look for a specific string in for 
+         * the message for the exception.
+         */
+        if (e.getMessage().startsWith("Resource temporarily unavailable")) {
+          out.waitForWritable();
+          continue;
+        } else {
+          throw e;
+        }
+      }
+      
+      if (nTransfered == 0) {
+        //check if end of file is reached.
+        if (startPos >= inChannel.size()) {
+          throw new EOFException("EOF Reached. file size is " + 
+                                 inChannel.size() + " and " + len + 
+                                 " more bytes left to be transfered.");
+        }
+        //otherwise assume the socket is full.
+        out.waitForWritable();
+      } else if (nTransfered < 0) {
+        throw new IOException("Unexpected return of " + nTransfered + 
+                              " from transferTo()");
+      } else {
+        startPos += nTransfered;
+        len -= nTransfered;
+      }
+      
+      LOG.info("XXX: Transfered " + nTransfered + " and " + len + 
+               " bytes left.");      
+    }
+  }
+  
+  
   /* ********************************************************************
   Protocol when a client reads data from Datanode (Cur Ver: 9):
   
@@ -1572,6 +1634,7 @@
     private boolean verifyChecksum; //if true, check is verified while reading
     private Throttler throttler;
     private OutputStream out;
+    long blockInPosition = -1;     // set when transferTo() is used
     
     static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */
                                         8 + /* offset in block */
@@ -1740,29 +1803,53 @@
       }
       
       int dataOff = checksumOff + checksumLen;
-      IOUtils.readFully(blockIn, buf, dataOff, len);
       
-      if (verifyChecksum) {
-        int dOff = dataOff;
-        int cOff = checksumOff;
-        int dLeft = len;
+      if (!verifyChecksum && 
+          out instanceof SocketOutputStream && 
+          blockIn instanceof FileInputStream) {
+        //use transferTo()
+      
+        //first write the packet
+        out.write(buf, 0, dataOff);
+        // no need to flush.        
+
+        FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
         
-        for (int i=0; i<numChunks; i++) {
-          checksum.reset();
-          int dLen = Math.min(dLeft, bytesPerChecksum);
-          checksum.update(buf, dOff, dLen);
-          if (!checksum.compare(buf, cOff)) {
-            throw new ChecksumException("Checksum failed at " + 
-                                        (offset + len - dLeft), len);
+        if (blockInPosition < 0) {
+          blockInPosition = fileChannel.position();
+        }
+        
+        transferToFully(fileChannel, blockInPosition, 
+                        (SocketOutputStream)out, len);
+        
+        blockInPosition += len;
+        
+      } else {
+        //normal transfer
+        IOUtils.readFully(blockIn, buf, dataOff, len);
+
+        if (verifyChecksum) {
+          int dOff = dataOff;
+          int cOff = checksumOff;
+          int dLeft = len;
+
+          for (int i=0; i<numChunks; i++) {
+            checksum.reset();
+            int dLen = Math.min(dLeft, bytesPerChecksum);
+            checksum.update(buf, dOff, dLen);
+            if (!checksum.compare(buf, cOff)) {
+              throw new ChecksumException("Checksum failed at " + 
+                                          (offset + len - dLeft), len);
+            }
+            dLeft -= dLen;
+            dOff += dLen;
+            cOff += checksumSize;
           }
-          dLeft -= dLen;
-          dOff += dLen;
-          cOff += checksumSize;
         }
+
+        out.write(buf, 0, dataOff + len);
       }
-
-      out.write(buf, 0, dataOff + len);
-
+      
       if (throttler != null) { // rebalancing so throttle
         throttler.throttle(packetLen);
       }
@@ -1775,14 +1862,17 @@
      * either a client or to another datanode. 
      * 
      * @param out  stream to which the block is written to
-     * returns total bytes reads, including crc.
+     * @param baseStream if non-null, <code>out</code> is assumed to be a 
+     *        wrapper over this stream.
+     * @param throttler for sending data.
+     * @return total bytes reads, including crc.
      */
-    long sendBlock(DataOutputStream out, Throttler throttler)
-        throws IOException {
+    long sendBlock(DataOutputStream out, OutputStream baseStream, 
+                   Throttler throttler) throws IOException {
       if( out == null ) {
         throw new IOException( "out stream is null" );
       }
-      this.out = out;
+      this.out = (baseStream == null) ? out : baseStream;
       this.throttler = throttler;
 
       long initialOffset = offset;
@@ -1798,7 +1888,7 @@
         ByteBuffer pktBuf = ByteBuffer.allocate(PKT_HEADER_LEN + 
                       (bytesPerChecksum + checksumSize) * maxChunksPerPacket);
 
-
+        out.flush();
         while (endOffset > offset) {
           long len = sendChunks(pktBuf, maxChunksPerPacket);
           offset += len;
@@ -2596,8 +2686,9 @@
 
         long writeTimeout = WRITE_TIMEOUT + 
                             WRITE_TIMEOUT_EXTENSION * (targets.length-1);
-        out = new DataOutputStream(new BufferedOutputStream(
-               new SocketOutputStream(sock, writeTimeout), SMALL_BUFFER_SIZE));
+        OutputStream baseStream = new SocketOutputStream(sock, writeTimeout);
+        out = new DataOutputStream(new BufferedOutputStream(baseStream, 
+                                                            SMALL_BUFFER_SIZE));
 
         blockSender = new BlockSender(b, 0, -1, false, false, false);
 
@@ -2616,7 +2707,7 @@
           targets[i].write(out);
         }
         // send data & checksum
-        blockSender.sendBlock(out, null);
+        blockSender.sendBlock(out, baseStream, null);
 
         // no response necessary
         LOG.info(dnRegistration + ":Transmitted block " + b + " to " + curTarget);
