Index: src/java/org/apache/hadoop/net/SocketIOWithTimeout.java
===================================================================
--- src/java/org/apache/hadoop/net/SocketIOWithTimeout.java	(revision 650233)
+++ src/java/org/apache/hadoop/net/SocketIOWithTimeout.java	(working copy)
@@ -159,17 +159,7 @@
       } 
 
       if (count == 0) {
-        
-        String waitingFor = ""+ops;
-        if (ops == SelectionKey.OP_READ) {
-          waitingFor = "read";
-        } else if (ops == SelectionKey.OP_WRITE) {
-          waitingFor = "write";
-        }
-        
-        throw new SocketTimeoutException(timeout + " millis timeout while " +
-                                         "waiting for channel to be ready for "
-                                         + waitingFor + ". ch : " + channel);
+        throw new SocketTimeoutException(timeoutExceptionString(ops));
       }
       // otherwise the socket should be ready for io.
     }
@@ -178,6 +168,39 @@
   }
   
   /**
+   * This is similar to {@link #doIO(ByteBuffer, int)} except that it
+   * does not perform any I/O. It just waits for the channel to be ready
+   * for I/O as specified in ops.
+   * 
+   * @param ops Selection Ops used for waiting
+   * 
+   * @throws SocketTimeoutException 
+   *         if select on the channel times out.
+   * @throws IOException
+   *         if any other I/O error occurs. 
+   */
+  void waitForIO(int ops) throws IOException {
+    
+    if (selector.select(channel, ops, timeout) == 0) {
+      throw new SocketTimeoutException(timeoutExceptionString(ops)); 
+    }
+  }
+  
+  private String timeoutExceptionString(int ops) {
+    
+    String waitingFor = "" + ops;
+    if (ops == SelectionKey.OP_READ) {
+      waitingFor = "read";
+    } else if (ops == SelectionKey.OP_WRITE) {
+      waitingFor = "write";
+    }
+    
+    return timeout + " millis timeout while " +
+           "waiting for channel to be ready for " + 
+           waitingFor + ". ch : " + channel;    
+  }
+  
+  /**
    * This maintains a pool of selectors. These selectors are closed
    * once they are idle (unused) for a few seconds.
    */
Index: src/java/org/apache/hadoop/net/SocketInputStream.java
===================================================================
--- src/java/org/apache/hadoop/net/SocketInputStream.java	(revision 650233)
+++ src/java/org/apache/hadoop/net/SocketInputStream.java	(working copy)
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.ReadableByteChannel;
@@ -148,4 +149,17 @@
   public int read(ByteBuffer dst) throws IOException {
     return reader.doIO(dst, SelectionKey.OP_READ);
   }
+  
+  /**
+   * waits for the underlying channel to be ready for reading.
+   * The timeout specified for this stream applies to this wait.
+   * 
+   * @throws SocketTimeoutException 
+   *         if select on the channel times out.
+   * @throws IOException
+   *         if any other I/O error occurs. 
+   */
+  public void waitForReadable() throws IOException {
+    reader.waitForIO(SelectionKey.OP_READ);
+  }
 }
Index: src/java/org/apache/hadoop/net/SocketOutputStream.java
===================================================================
--- src/java/org/apache/hadoop/net/SocketOutputStream.java	(revision 650233)
+++ src/java/org/apache/hadoop/net/SocketOutputStream.java	(working copy)
@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.net;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SelectableChannel;
@@ -143,4 +145,75 @@
   public int write(ByteBuffer src) throws IOException {
     return writer.doIO(src, SelectionKey.OP_WRITE);
   }
+  
+  /**
+   * waits for the underlying channel to be ready for writing.
+   * The timeout specified for this stream applies to this wait.
+   *
+   * @throws SocketTimeoutException 
+   *         if select on the channel times out.
+   * @throws IOException
+   *         if any other I/O error occurs. 
+   */
+  public void waitForWritable() throws IOException {
+    writer.waitForIO(SelectionKey.OP_WRITE);
+  }
+  
+  /**
+   * Transfers data from FileChannel using 
+   * {@link FileChannel#transferTo(long, long, WritableByteChannel)}. 
+   * 
+   * Similar to readFully(), this waits till requested amount of 
+   * data is transfered.
+   * 
+   * @param fileCh FileChannel to transfer data from.
+   * @param position position within the channel where the transfer begins
+   * @param count number of bytes to transfer.
+   * 
+   * @throws EOFException 
+   *         If end of input file is reached before requested number of 
+   *         bytes are transfered.
+   *
+   * @throws SocketTimeoutException 
+   *         If this channel blocks transfer longer than timeout for 
+   *         this stream.
+   *          
+   * @throws IOException Includes any exception thrown by 
+   *         {@link FileChannel#transferTo(long, long, WritableByteChannel)}. 
+   */
+  public void transferToFully(FileChannel fileCh, long position, int count) 
+                              throws IOException {
+    
+    while (count > 0) {
+      /* 
+       * Ideally we should wait after transferTo returns 0. But because of
+       * a bug in JRE on Linux (http://bugs.sun.com/view_bug.do?bug_id=5103988),
+       * which throws an exception instead of returning 0, we wait for the
+       * channel to be writable before writing to it. If you ever see 
+       * IOException with message "Resource temporarily unavailable" 
+       * thrown here, please let us know.
+       * 
+       * Once we move to JAVA SE 7, wait should be moved to correct place.
+       */
+      waitForWritable();
+      int nTransfered = (int) fileCh.transferTo(position, count, getChannel());
+      
+      if (nTransfered == 0) {
+        //check if end of file is reached.
+        if (position >= fileCh.size()) {
+          throw new EOFException("EOF Reached. file size is " + fileCh.size() + 
+                                 " and " + count + " more bytes left to be " +
+                                 "transfered.");
+        }
+        //otherwise assume the socket is full.
+        //waitForWritable(); // see comment above.
+      } else if (nTransfered < 0) {
+        throw new IOException("Unexpected return of " + nTransfered + 
+                              " from transferTo()");
+      } else {
+        position += nTransfered;
+        count -= nTransfered;
+      }
+    }
+  }  
 }
Index: src/java/org/apache/hadoop/dfs/DataBlockScanner.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataBlockScanner.java	(revision 650233)
+++ src/java/org/apache/hadoop/dfs/DataBlockScanner.java	(working copy)
@@ -382,7 +382,7 @@
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());
         
-        blockSender.sendBlock(out, throttler);
+        blockSender.sendBlock(out, null, throttler);
 
         LOG.info((second ? "Second " : "") +
                  "Verification succeeded for " + block);
Index: src/java/org/apache/hadoop/dfs/DataNode.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DataNode.java	(revision 650233)
+++ src/java/org/apache/hadoop/dfs/DataNode.java	(working copy)
@@ -27,6 +27,7 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -39,6 +40,7 @@
 import java.io.*;
 import java.net.*;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.*;
@@ -1022,8 +1024,9 @@
       long length = in.readLong();
 
       // send the block
-      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-        NetUtils.getOutputStream(s, socketWriteTimeout), SMALL_BUFFER_SIZE));
+      OutputStream baseStream = NetUtils.getOutputStream(s,socketWriteTimeout);
+      DataOutputStream out = new DataOutputStream(
+                   new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
       
       BlockSender blockSender = null;
       try {
@@ -1036,7 +1039,7 @@
         }
 
         out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
-        long read = blockSender.sendBlock(out, null); // send data
+        long read = blockSender.sendBlock(out, baseStream, null); // send data
 
         if (blockSender.isBlockReadFully()) {
           // See if client verification succeeded. 
@@ -1306,9 +1309,10 @@
         targetSock.connect(targetAddr, socketTimeout);
         targetSock.setSoTimeout(socketTimeout);
 
-        targetOut = new DataOutputStream(new BufferedOutputStream(
-                      NetUtils.getOutputStream(targetSock, socketWriteTimeout),
-                      SMALL_BUFFER_SIZE));
+        OutputStream baseStream = NetUtils.getOutputStream(targetSock, 
+                                                            socketWriteTimeout);
+        targetOut = new DataOutputStream(
+                       new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
 
         /* send request to the target */
         // fist write header info
@@ -1318,7 +1322,8 @@
         Text.writeString( targetOut, source); // del hint
 
         // then send data
-        long read = blockSender.sendBlock(targetOut, balancingThrottler);
+        long read = blockSender.sendBlock(targetOut, baseStream, 
+                                          balancingThrottler);
 
         myMetrics.bytesRead.inc((int) read);
         myMetrics.blocksRead.inc();
@@ -1567,6 +1572,7 @@
   class BlockSender implements java.io.Closeable {
     private Block block; // the block to read from
     private InputStream blockIn; // data stream
+    private long blockInPosition = -1; // updated while using transferTo().
     private DataInputStream checksumIn; // checksum datastream
     private DataChecksum checksum; // checksum stream
     private long offset; // starting position to read
@@ -1581,7 +1587,6 @@
     private boolean blockReadFully; //set when the whole block is read
     private boolean verifyChecksum; //if true, check is verified while reading
     private Throttler throttler;
-    private OutputStream out;
     
     static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */
                                         8 + /* offset in block */
@@ -1705,8 +1710,14 @@
 
     /**
      * Sends upto maxChunks chunks of data.
+     * 
+     * When blockInPosition is >= 0, assumes 'out' is a 
+     * {@link SocketOutputStream} and tries 
+     * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
+     * send data (and updates blockInPosition).
      */
-    private int sendChunks(ByteBuffer pkt, int maxChunks) throws IOException {
+    private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
+                           throws IOException {
       // Sends multiple chunks in one packet with a single write().
 
       int len = Math.min((int) (endOffset - offset),
@@ -1750,29 +1761,45 @@
       }
       
       int dataOff = checksumOff + checksumLen;
-      IOUtils.readFully(blockIn, buf, dataOff, len);
       
-      if (verifyChecksum) {
-        int dOff = dataOff;
-        int cOff = checksumOff;
-        int dLeft = len;
+      if (blockInPosition >= 0) {
+        //use transferTo(). Checks on out and blockIn are already done. 
         
-        for (int i=0; i<numChunks; i++) {
-          checksum.reset();
-          int dLen = Math.min(dLeft, bytesPerChecksum);
-          checksum.update(buf, dOff, dLen);
-          if (!checksum.compare(buf, cOff)) {
-            throw new ChecksumException("Checksum failed at " + 
-                                        (offset + len - dLeft), len);
+        SocketOutputStream sockOut = (SocketOutputStream)out;
+        //first write the packet
+        sockOut.write(buf, 0, dataOff);
+        // no need to flush. since we know out is not a buffered stream. 
+
+        sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), 
+                                blockInPosition, len);
+        
+        blockInPosition += len;
+      } else {
+        //normal transfer
+        IOUtils.readFully(blockIn, buf, dataOff, len);
+
+        if (verifyChecksum) {
+          int dOff = dataOff;
+          int cOff = checksumOff;
+          int dLeft = len;
+
+          for (int i=0; i<numChunks; i++) {
+            checksum.reset();
+            int dLen = Math.min(dLeft, bytesPerChecksum);
+            checksum.update(buf, dOff, dLen);
+            if (!checksum.compare(buf, cOff)) {
+              throw new ChecksumException("Checksum failed at " + 
+                                          (offset + len - dLeft), len);
+            }
+            dLeft -= dLen;
+            dOff += dLen;
+            cOff += checksumSize;
           }
-          dLeft -= dLen;
-          dOff += dLen;
-          cOff += checksumSize;
         }
+
+        out.write(buf, 0, dataOff + len);
       }
 
-      out.write(buf, 0, dataOff + len);
-
       if (throttler != null) { // rebalancing so throttle
         throttler.throttle(packetLen);
       }
@@ -1785,32 +1812,62 @@
      * either a client or to another datanode. 
      * 
      * @param out  stream to which the block is written to
-     * returns total bytes reads, including crc.
+     * @param baseStream optional. if non-null, <code>out</code> is assumed to 
+     *        be a wrapper over this stream. This enables optimizations for
+     *        sending the data, e.g. 
+     *        {@link SocketOutputStream#transferToFully(FileChannel, 
+     *        long, int)}.
+     * @param throttler for sending data.
+     * @return total bytes reads, including crc.
      */
-    long sendBlock(DataOutputStream out, Throttler throttler)
-        throws IOException {
+    long sendBlock(DataOutputStream out, OutputStream baseStream, 
+                   Throttler throttler) throws IOException {
       if( out == null ) {
         throw new IOException( "out stream is null" );
       }
-      this.out = out;
       this.throttler = throttler;
 
       long initialOffset = offset;
       long totalRead = 0;
+      OutputStream streamForSendChunks = out;
+      
       try {
         checksum.writeHeader(out);
         if ( chunkOffsetOK ) {
           out.writeLong( offset );
         }
-        //set up sendBuf:
+        out.flush();
+        
+        //set up packet buffer :
         int maxChunksPerPacket = Math.max(1,
                       (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
-        ByteBuffer pktBuf = ByteBuffer.allocate(PKT_HEADER_LEN + 
-                      (bytesPerChecksum + checksumSize) * maxChunksPerPacket);
+        
+        int pktSize = PKT_HEADER_LEN;
+        
+        if (!verifyChecksum && 
+            baseStream instanceof SocketOutputStream && 
+            blockIn instanceof FileInputStream) {
+          
+          FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
+          
+          // this also indicates sendChunks() uses transferTo.
+          blockInPosition = fileChannel.position();
+          streamForSendChunks = baseStream;
+          
+          /* allocate smaller buffer while using transferTo().
+           * Since we don't actually allocate buffer for data, we could 
+           * increase maxChunksPerPacket. 
+           */
+          pktSize += checksumSize * maxChunksPerPacket;
+        } else {
+          pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+        }
 
+        ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
 
         while (endOffset > offset) {
-          long len = sendChunks(pktBuf, maxChunksPerPacket);
+          long len = sendChunks(pktBuf, maxChunksPerPacket, 
+                                streamForSendChunks);
           offset += len;
           totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
                               checksumSize);
@@ -2606,8 +2663,9 @@
 
         long writeTimeout = socketWriteTimeout + 
                             WRITE_TIMEOUT_EXTENSION * (targets.length-1);
-        out = new DataOutputStream(new BufferedOutputStream(
-          NetUtils.getOutputStream(sock, writeTimeout), SMALL_BUFFER_SIZE));
+        OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
+        out = new DataOutputStream(new BufferedOutputStream(baseStream, 
+                                                            SMALL_BUFFER_SIZE));
 
         blockSender = new BlockSender(b, 0, -1, false, false, false);
 
@@ -2626,7 +2684,7 @@
           targets[i].write(out);
         }
         // send data & checksum
-        blockSender.sendBlock(out, null);
+        blockSender.sendBlock(out, baseStream, null);
 
         // no response necessary
         LOG.info(dnRegistration + ":Transmitted block " + b + " to " + curTarget);
