Index: conf/hadoop-default.xml
===================================================================
--- conf/hadoop-default.xml	(revision 639496)
+++ conf/hadoop-default.xml	(working copy)
@@ -1010,12 +1010,6 @@
 <!-- ipc properties -->
 
 <property>
-  <name>ipc.client.timeout</name>
-  <value>60000</value>
-  <description>Defines the timeout for IPC calls in milliseconds.</description>
-</property>
-
-<property>
   <name>ipc.client.idlethreshold</name>
   <value>4000</value>
   <description>Defines the threshold number of connections after which
@@ -1026,7 +1020,7 @@
 <property>
   <name>ipc.client.maxidletime</name>
   <value>120000</value>
-  <description>Defines the maximum idle time for a connected client after 
+  <description>Defines the maximum idle time in msec for a connected client after 
                which it may be disconnected.
   </description>
 </property>
@@ -1040,8 +1034,8 @@
 
 <property>
   <name>ipc.client.connection.maxidletime</name>
-  <value>1000</value>
-  <description>The maximum time after which a client will bring down the
+  <value>10000</value>
+  <description>The maximum time in msec after which a client will bring down the
                connection to the server.
   </description>
 </property>
Index: src/test/org/apache/hadoop/ipc/TestSocketFactory.java
===================================================================
--- src/test/org/apache/hadoop/ipc/TestSocketFactory.java	(revision 639496)
+++ src/test/org/apache/hadoop/ipc/TestSocketFactory.java	(working copy)
@@ -85,7 +85,7 @@
       assertTrue(dfs.exists(filePath));
 
       // This will test TPC to a JobTracker
-      MiniMRCluster mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
+      MiniMRCluster mr = new MiniMRCluster(1, directDfs.getUri().toString(), 1);
       final int jobTrackerPort = mr.getJobTrackerPort();
 
       JobConf jconf = new JobConf(cconf);
Index: src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java
===================================================================
--- src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java	(revision 639496)
+++ src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java	(working copy)
@@ -64,7 +64,6 @@
     public TestServer(final int handlerCount, final boolean sleep) 
                                               throws IOException {
       super(ADDRESS, 0, BytesWritable.class, handlerCount, conf);
-      this.setTimeout(1000);
       // Set the buffer size to half of the maximum parameter/result size 
       // to force the socket to block
       this.setSocketSendBufSize(BYTE_COUNT / 2);
@@ -95,7 +94,6 @@
       this.client = client;
       this.address = address;
       this.count = count;
-      client.setTimeout(1000);
     }
 
     @Override
Index: src/test/org/apache/hadoop/ipc/TestIPC.java
===================================================================
--- src/test/org/apache/hadoop/ipc/TestIPC.java	(revision 639496)
+++ src/test/org/apache/hadoop/ipc/TestIPC.java	(working copy)
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 
 import java.util.Random;
@@ -36,9 +37,13 @@
 public class TestIPC extends TestCase {
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.TestIPC");
-
-  private static Configuration conf = new Configuration();
   
+  final private static Configuration conf = new Configuration();
+  final static private int PING_INTERVAL = 1000;
+  
+  static {
+    Client.setPingInterval(conf, PING_INTERVAL);
+  }
   public TestIPC(String name) { super(name); }
 
   private static final Random RANDOM = new Random();
@@ -51,14 +56,13 @@
     public TestServer(int handlerCount, boolean sleep) 
       throws IOException {
       super(ADDRESS, 0, LongWritable.class, handlerCount, conf);
-      this.setTimeout(1000);
       this.sleep = sleep;
     }
 
     public Writable call(Writable param, long receivedTime) throws IOException {
       if (sleep) {
         try {
-          Thread.sleep(RANDOM.nextInt(200));      // sleep a bit
+          Thread.sleep(RANDOM.nextInt(2*PING_INTERVAL));      // sleep a bit
         } catch (InterruptedException e) {}
       }
       return param;                               // echo param as result
@@ -75,7 +79,6 @@
       this.client = client;
       this.server = server;
       this.count = count;
-      client.setTimeout(1000);
     }
 
     public void run() {
@@ -90,7 +93,7 @@
             break;
           }
         } catch (Exception e) {
-          LOG.fatal("Caught: " + e);
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
           failed = true;
         }
       }
@@ -108,7 +111,6 @@
       this.client = client;
       this.addresses = addresses;
       this.count = count;
-      client.setTimeout(1000);
     }
 
     public void run() {
@@ -126,7 +128,7 @@
             }
           }
         } catch (Exception e) {
-          LOG.fatal("Caught: " + e);
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
           failed = true;
         }
       }
Index: src/java/org/apache/hadoop/mapred/JobClient.java
===================================================================
--- src/java/org/apache/hadoop/mapred/JobClient.java	(revision 639496)
+++ src/java/org/apache/hadoop/mapred/JobClient.java	(working copy)
@@ -318,7 +318,6 @@
   }
 
   JobSubmissionProtocol jobSubmitClient;
-  private JobSubmissionProtocol rpcProxy;
   
   FileSystem fs = null;
 
@@ -352,8 +351,7 @@
     if ("local".equals(tracker)) {
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
-      this.rpcProxy = createRPCProxy(JobTracker.getAddress(conf), conf);
-      this.jobSubmitClient = createRetryProxy(this.rpcProxy);
+      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
     }        
   }
 
@@ -363,27 +361,6 @@
         JobSubmissionProtocol.versionID, addr, conf,
         NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
   }
-  /**
-   * Create a proxy JobSubmissionProtocol that retries timeouts.
-   * 
-   * @param addr the address to connect to.
-   * @param conf the server's configuration.
-   * @return a proxy object that will retry timeouts.
-   * @throws IOException
-   */
-  private JobSubmissionProtocol createRetryProxy(JobSubmissionProtocol raw
-                                            ) throws IOException {
-    RetryPolicy backoffPolicy =
-      RetryPolicies.retryUpToMaximumCountWithProportionalSleep
-      (5, 10, java.util.concurrent.TimeUnit.SECONDS);
-    Map<Class<? extends Exception>, RetryPolicy> handlers = 
-      new HashMap<Class<? extends Exception>, RetryPolicy>();
-    handlers.put(SocketTimeoutException.class, backoffPolicy);
-    RetryPolicy backoffTimeOuts = 
-      RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,handlers);
-    return (JobSubmissionProtocol)
-      RetryProxy.create(JobSubmissionProtocol.class, raw, backoffTimeOuts);
-  }
 
   /**
    * Build a job client, connect to the indicated job tracker.
@@ -393,15 +370,16 @@
    */
   public JobClient(InetSocketAddress jobTrackAddr, 
                    Configuration conf) throws IOException {
-    rpcProxy =  createRPCProxy(jobTrackAddr, conf);
-    jobSubmitClient = createRetryProxy(rpcProxy);
+    jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
   }
 
   /**
    * Close the <code>JobClient</code>.
    */
   public synchronized void close() throws IOException {
-    RPC.stopProxy(rpcProxy);
+    if (!(jobSubmitClient instanceof LocalJobRunner)) {
+      RPC.stopProxy(jobSubmitClient);
+    }
   }
 
   /**
Index: src/java/org/apache/hadoop/ipc/Server.java
===================================================================
--- src/java/org/apache/hadoop/ipc/Server.java	(revision 639496)
+++ src/java/org/apache/hadoop/ipc/Server.java	(working copy)
@@ -76,17 +76,10 @@
   public static final byte CURRENT_VERSION = 1;
   
   /**
-   * How much time should be allocated for actually running the handler?
-   * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME
-   * are ignored when the handler takes them off the queue.
-   */
-  private static final float MAX_CALL_QUEUE_TIME = 0.6f;
-  
-  /**
    * How many calls/handler are allowed in the queue.
    */
   private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
-  
+
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.Server");
 
@@ -135,7 +128,7 @@
   private String bindAddress; 
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
-  private Class paramClass;                       // class of call parameters
+  private Class<?> paramClass;                    // class of call parameters
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
   private int thresholdIdleConnections;           // the number of idle connections
@@ -150,8 +143,6 @@
   
   private Configuration conf;
 
-  private int timeout;
-  private long maxCallStartAge;
   private int maxQueueSize;
   private int socketSendBufferSize;
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
@@ -169,7 +160,7 @@
   private Handler[] handlers = null;
 
   /**
-   * A convience method to bind to a given address and report 
+   * A convenience method to bind to a given address and report 
    * better exceptions if the address is not a valid host.
    * @param socket the socket to bind
    * @param address the address to bind to
@@ -308,10 +299,9 @@
         SelectionKey key = null;
         try {
           selector.select();
-          Iterator iter = selector.selectedKeys().iterator();
-          
+          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
           while (iter.hasNext()) {
-            key = (SelectionKey)iter.next();
+            key = iter.next();
             iter.remove();
             try {
               if (key.isValid()) {
@@ -455,12 +445,11 @@
     public void run() {
       LOG.info(getName() + ": starting");
       SERVER.set(Server.this);
-      long lastPurgeTime = 0;   // last check for old calls.
 
       while (running) {
         try {
           waitPending();     // If a channel is being registered, wait.
-          writeSelector.select(maxCallStartAge);
+          writeSelector.select();
           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
           while (iter.hasNext()) {
             SelectionKey key = iter.next();
@@ -473,25 +462,6 @@
               LOG.info(getName() + ": doAsyncWrite threw exception " + e);
             }
           }
-          long now = System.currentTimeMillis();
-          if (now < lastPurgeTime + maxCallStartAge) {
-            continue;
-          }
-          lastPurgeTime = now;
-          //
-          // If there were some calls that have not been sent out for a
-          // long time, discard them.
-          //
-          LOG.debug("Checking for old call responses.");
-          iter = writeSelector.keys().iterator();
-          while (iter.hasNext()) {
-            SelectionKey key = iter.next();
-            try {
-              doPurge(key, now);
-            } catch (IOException e) {
-              LOG.warn("Error in purging old calls " + e);
-            }
-          }
         } catch (OutOfMemoryError e) {
           //
           // we can run out of memory if we have too many threads
@@ -533,49 +503,6 @@
       }
     }
 
-    //
-    // Remove calls that have been pending in the responseQueue 
-    // for a long time.
-    //
-    private void doPurge(SelectionKey key, long now) throws IOException {
-      Call call = (Call)key.attachment();
-      if (call == null) {
-        return;
-      }
-      if (key.channel() != call.connection.channel) {
-        LOG.info("doPurge: bad channel");
-        return;
-      }
-      boolean close = false;
-      LinkedList<Call> responseQueue = call.connection.responseQueue;
-      synchronized (responseQueue) {
-        Iterator<Call> iter = responseQueue.listIterator(0);
-        while (iter.hasNext()) {
-          call = iter.next();
-          if (now > call.receivedTime + maxCallStartAge) {
-            LOG.info(getName() + ", call " + call +
-                     ": response discarded for being too old (" +
-                     (now - call.receivedTime) + ")");
-            iter.remove();
-            if (call.response.position() > 0) {
-              /* We should probably use a different start time 
-               * than receivedTime. receivedTime starts when the RPC
-               * was first read.
-               * We have written a partial response. will close the
-               * connection for now.
-               */
-              close = true;
-              break;
-            }            
-          }
-        }
-      }
-      
-      if (close) {
-        closeConnection(call.connection);
-      }
-    }
-
     // Processes one response. Returns true if there are no more pending
     // data for this channel.
     //
@@ -793,6 +720,10 @@
         if (data == null) {
           dataLengthBuffer.flip();
           dataLength = dataLengthBuffer.getInt();
+       
+          if (dataLength == Client.PING_CALL_ID) {
+            return 0;  //ping message
+          }
           data = ByteBuffer.allocate(dataLength);
         }
         
@@ -870,18 +801,6 @@
         try {
           Call call = callQueue.take(); // pop the queue; maybe blocked here
 
-          // throw the message away if it is too old
-          if (System.currentTimeMillis() - call.receivedTime > 
-              maxCallStartAge) {
-            ReflectionUtils.logThreadInfo(LOG, "Discarding call " + call, 30);
-            int timeInQ = (int) (System.currentTimeMillis() - call.receivedTime);
-            LOG.warn(getName()+", call "+call
-                     +": discarded for being too old (" +
-                     timeInQ + ")");
-            rpcMetrics.rpcDiscardedOps.inc(timeInQ);
-            continue;
-          }
-          
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + ": has #" + call.id + " from " +
                       call.connection);
@@ -941,7 +860,7 @@
    * the number of handler threads that will be used to process calls.
    * 
    */
-  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf,
+  protected Server(String bindAddress, int port, Class<?> paramClass, int handlerCount, Configuration conf,
                   String serverName) 
     throws IOException {
     this.bindAddress = bindAddress;
@@ -949,10 +868,8 @@
     this.port = port;
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
-    this.timeout = conf.getInt("ipc.client.timeout", 10000);
     this.socketSendBufferSize = 0;
-    maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
-    maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
+    this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
@@ -980,9 +897,6 @@
     }
   }
   
-  /** Sets the timeout used for network i/o. */
-  public void setTimeout(int timeout) { this.timeout = timeout; }
-
   /** Sets the socket buffer size used for responding to RPCs */
   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
 
Index: src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java
===================================================================
--- src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java	(revision 639496)
+++ src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java	(working copy)
@@ -86,20 +86,7 @@
    */
   long getRpcOpsAvgQueueTimeMax();
   
-  
   /**
-   * Number of Discarded RPC operations due to timeout in the last interval
-   * @return number of operations
-   */
-  int getRpcOpsDiscardedOpsNum();
-  
-  /**
-   * Average Queued time for Discarded RPC Operations in last interval
-   * @return time in msec
-   */
-  long getRpcOpsDiscardedOpsQtime();
-  
-  /**
    * Reset all min max times
    */
   void resetAllMinMax();
Index: src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
===================================================================
--- src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java	(revision 639496)
+++ src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java	(working copy)
@@ -71,7 +71,6 @@
   
   public MetricsTimeVaryingRate rpcQueueTime = new MetricsTimeVaryingRate("RpcQueueTime");
   public MetricsTimeVaryingRate rpcProcessingTime = new MetricsTimeVaryingRate("RpcProcessingTime");
-  public MetricsTimeVaryingRate rpcDiscardedOps = new MetricsTimeVaryingRate("RpcDiscardedOps");
 
   public Map <String, MetricsTimeVaryingRate> metricsList = Collections.synchronizedMap(new HashMap<String, MetricsTimeVaryingRate>());
 
@@ -83,7 +82,6 @@
   public void doUpdates(MetricsContext context) {
     rpcQueueTime.pushMetric(metricsRecord);
     rpcProcessingTime.pushMetric(metricsRecord);
-    rpcDiscardedOps.pushMetric(metricsRecord);
 
     synchronized (metricsList) {
 	// Iterate through the rpcMetrics hashmap to propogate the different rpc metrics.
Index: src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java
===================================================================
--- src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java	(revision 639496)
+++ src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java	(working copy)
@@ -98,20 +98,6 @@
   /**
    * @inheritDoc
    */
-  public int getRpcOpsDiscardedOpsNum() {
-    return myMetrics.rpcDiscardedOps.getPreviousIntervalNumOps();
-  }
-
-  /**
-   * @inheritDoc
-   */
-  public long getRpcOpsDiscardedOpsQtime() {
-    return myMetrics.rpcDiscardedOps.getPreviousIntervalAverageTime();
-  }
-  
-  /**
-   * @inheritDoc
-   */
   public int getNumOpenConnections() {
     return myServer.getNumOpenConnections();
   }
@@ -129,6 +115,5 @@
   public void resetAllMinMax() {
     myMetrics.rpcProcessingTime.resetMinMax();
     myMetrics.rpcQueueTime.resetMinMax();
-    myMetrics.rpcDiscardedOps.resetMinMax();
   }
 }
Index: src/java/org/apache/hadoop/ipc/Client.java
===================================================================
--- src/java/org/apache/hadoop/ipc/Client.java	(revision 639496)
+++ src/java/org/apache/hadoop/ipc/Client.java	(working copy)
@@ -24,16 +24,14 @@
 import java.net.UnknownHostException;
 
 import java.io.IOException;
-import java.io.EOFException;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.FilterInputStream;
-import java.io.FilterOutputStream;
+import java.io.InputStream;
 
 import java.util.Hashtable;
-import java.util.Iterator;
 
 import javax.net.SocketFactory;
 
@@ -41,6 +39,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.FSConstants;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -63,28 +62,65 @@
   private Hashtable<ConnectionId, Connection> connections =
     new Hashtable<ConnectionId, Connection>();
 
-  private Class valueClass;                       // class of call values
-  private int timeout;// timeout for calls
+  private Class<?> valueClass;                       // class of call values
   private int counter;                            // counter for call ids
   private boolean running = true;                 // true while client runs
-  private Configuration conf;
-  private int maxIdleTime; //connections will be culled if it was idle for 
+  final private Configuration conf;
+  final private int maxIdleTime; //connections will be culled if it was idle for 
                            //maxIdleTime msecs
-  private int maxRetries; //the max. no. of retries for socket connections
+  final private int maxRetries; //the max. no. of retries for socket connections
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
-  private Thread connectionCullerThread;
+  private int pingInterval; // how often sends ping to the server in msecs
+
   private SocketFactory socketFactory;           // how to create sockets
-  
   private int refCount = 1;
   
+  final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
+  final public static int DEFAULT_PING_INTERVAL = 60000; // 1 min
+  final static int PING_CALL_ID = -1;
+  
+  /**
+   * set the ping interval value in configuration
+   * 
+   * @param conf Configuration
+   * @param pingInterval the ping interval
+   */
+  final public static void setPingInterval(Configuration conf, int pingInterval) {
+    conf.setInt(PING_INTERVAL_NAME, pingInterval);
+  }
+
+  /**
+   * Get the ping interval from configuration;
+   * If not set in the configuration, return the default value.
+   * 
+   * @param conf Configuration
+   * @return the ping interval
+   */
+  final static int getPingInterval(Configuration conf) {
+    return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
+  }
+  
+  /**
+   * Increment this client's reference count
+   *
+   */
   synchronized void incCount() {
-	  refCount++;
+    refCount++;
   }
   
+  /**
+   * Decrement this client's reference count
+   *
+   */
   synchronized void decCount() {
     refCount--;
   }
   
+  /**
+   * Return if this client has no reference
+   * 
+   * @return true if this client has no reference; false otherwise
+   */
   synchronized boolean isZeroReference() {
     return refCount==0;
   }
@@ -94,9 +130,7 @@
     int id;                                       // call id
     Writable param;                               // parameter
     Writable value;                               // value, null if error
-    String error;                                 // exception, null if value
-    String errorClass;                            // class of exception
-    long lastActivity;                            // time of last i/o
+    IOException error;                            // exception, null if value
     boolean done;                                 // true when call is done
 
     protected Call(Writable param) {
@@ -104,30 +138,34 @@
       synchronized (Client.this) {
         this.id = counter++;
       }
-      touch();
     }
 
-    /** Called by the connection thread when the call is complete and the
-     * value or error string are available.  Notifies by default.  */
-    public synchronized void callComplete() {
+    /** Indicate when the call is complete and the
+     * value or error are available.  Notifies by default.  */
+    protected synchronized void callComplete() {
+      this.done = true;
       notify();                                 // notify caller
     }
 
-    /** Update lastActivity with the current time. */
-    public synchronized void touch() {
-      lastActivity = System.currentTimeMillis();
+    /** Set the exception when there is an error.
+     * Notify the caller the call is done.
+     * 
+     * @param error exception thrown by the call; either local or remote
+     */
+    public synchronized void setException(IOException error) {
+      this.error = error;
+      callComplete();
     }
-
-    /** Update lastActivity with the current time. */
-    public synchronized void setResult(Writable value, 
-                                       String errorClass,
-                                       String error) {
+    
+    /** Set the return value when there is no error. 
+     * Notify the caller the call is done.
+     * 
+     * @param value return value of the call.
+     */
+    public synchronized void setValue(Writable value) {
       this.value = value;
-      this.error = error;
-      this.errorClass =errorClass;
-      this.done = true;
+      callComplete();
     }
-    
   }
 
   /** Thread that reads responses and notifies callers.  Each connection owns a
@@ -140,11 +178,8 @@
     private DataOutputStream out;
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
-    private Call readingCall;
-    private Call writingCall;
-    private int inUse = 0;
-    private long lastActivity = 0;
-    private boolean shouldCloseConnection = false;
+    private long lastActivity = 0;     // last I/O activity time
+    private boolean isClosed = false;  // indicate if the connection is closed
 
     public Connection(InetSocketAddress address) throws IOException {
       this(new ConnectionId(address, null));
@@ -156,34 +191,89 @@
                                        remoteId.getAddress().getHostName());
       }
       this.remoteId = remoteId;
-      this.setName("IPC Client connection to " + 
-                   remoteId.getAddress().toString());
+      UserGroupInformation ticket = remoteId.getTicket();
+      this.setName("IPC Client connection to " +
+          remoteId.getAddress().toString() +
+          " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
       this.setDaemon(true);
     }
 
-    public synchronized void setupIOstreams() throws IOException {
-      if (socket != null) {
-        notify();
+    /** Update lastActivity with the current time. */
+    private synchronized void touch() {
+      lastActivity = System.currentTimeMillis();
+    }
+
+    /** Add a call to this connection's call queue */
+    private synchronized boolean addCall(Call call) {
+      if (isClosed)
+        return false;
+      calls.put(call.id, call);
+      notify();
+      return true;
+    }
+    
+    /** This class sends a ping to the remote side when timeout on
+     * reading. If no failure is detected, it retries until at least
+     * a byte is read.
+     */
+    private class PingInputStream extends FilterInputStream {
+      /* constructor */
+      protected PingInputStream(InputStream in) {
+        super(in);
+      }
+
+      /** Read a byte from the stream.
+       * Send a ping if timeout on read. Retries if no failure is detected
+       * until a byte is read.
+       */
+      public int read() throws IOException {
+        do {
+          try {
+            return super.read();
+          } catch (SocketTimeoutException e) {
+            sendPing();
+          }
+        } while (true);
+      }
+
+      /** Read bytes into a buffer starting from offset <code>off</code>
+       * Send a ping if timeout on read. Retries if no failure is detected
+       * until a byte is read.
+       * 
+       * @Return the total number of bytes read; -1 if the connection is closed.
+       */
+      public int read(byte[] buf, int off, int len) throws IOException {
+        do {
+          try {
+            return super.read(buf, off, len);
+          } catch (SocketTimeoutException e) {
+            sendPing();
+          }
+        } while (true);
+      }
+    }
+    
+    /** Connect to the server and set up the I/O streams. It then sends
+     * a header to the server and starts
+     * the connection thread that waits for responses.
+     * 
+     * @throws IOException when any IOException during streams setup.
+     */
+    private synchronized void setupIOstreams() throws IOException {
+      if (socket != null || isClosed) {
         return;
       }
       short failures = 0;
+      try {
       while (true) {
         try {
           this.socket = socketFactory.createSocket();
           this.socket.setTcpNoDelay(tcpNoDelay);
           this.socket.connect(remoteId.getAddress());
+          this.socket.setSoTimeout(pingInterval);
           break;
         } catch (IOException ie) { //SocketTimeoutException is also caught 
           if (failures == maxRetries) {
-            //reset inUse so that the culler gets a chance to throw this
-            //connection object out of the table. We don't want to increment
-            //inUse to infinity (everytime getConnection is called inUse is
-            //incremented)!
-            inUse = 0;
-            // set socket to null so that the next call to setupIOstreams
-            // can start the process of connect all over again.
-            socket.close();
-            socket = null;
             throw ie;
           }
           failures++;
@@ -195,30 +285,22 @@
           }
         }
       }
-      socket.setSoTimeout(timeout);
       this.in = new DataInputStream
         (new BufferedInputStream
-         (new FilterInputStream(NetUtils.getInputStream(socket)) {
-             public int read(byte[] buf, int off, int len) throws IOException {
-               int value = super.read(buf, off, len);
-               if (readingCall != null) {
-                 readingCall.touch();
-               }
-               return value;
-             }
-           }));
+         (new PingInputStream(NetUtils.getInputStream(socket))));
       this.out = new DataOutputStream
-        (new BufferedOutputStream
-         (new FilterOutputStream(NetUtils.getOutputStream(socket)) {
-             public void write(byte[] buf, int o, int len) throws IOException {
-               out.write(buf, o, len);
-               if (writingCall != null) {
-                 writingCall.touch();
-               }
-             }
-           }));
+        (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
       writeHeader();
-      notify();
+      
+      // update last activity time
+      touch();
+      
+      // start the receiver thread after the socket connection has been set up
+      start();
+      } catch (IOException e) {
+        close(e);
+        throw e;
+      }
     }
 
     private synchronized void writeHeader() throws IOException {
@@ -233,156 +315,191 @@
       out.write(buf.getData(), 0, bufLen);
     }
     
+    /* wait till someone signals us to start reading RPC response or
+     * it is idle long enough so this connection is to be closed, 
+     * or the client is marked as not running.
+     * 
+     * Return true if it is time to read a response; false otherwise.
+     */
     private synchronized boolean waitForWork() {
-      //wait till someone signals us to start reading RPC response or
-      //close the connection. If we are idle long enough (blocked in wait),
-      //the ConnectionCuller thread will wake us up and ask us to close the
-      //connection. 
-      //We need to wait when inUse is 0 or socket is null (it may be null if
-      //the Connection object has been created but the socket connection
-      //has not been setup yet). We stop waiting if we have been asked to close
-      //connection
-      while ((inUse == 0 || socket == null) && !shouldCloseConnection) {
+      while (calls.size() == 0 && !isClosed  && running)  {
+        long timeout = maxIdleTime-(System.currentTimeMillis()-lastActivity);
+        if (timeout<=0) {
+          break;
+        }
         try {
-          wait();
+          wait(timeout);
         } catch (InterruptedException e) {}
       }
-      return !shouldCloseConnection;
-    }
-
-    private synchronized void incrementRef() {
-      inUse++;
-    }
-
-    private synchronized void decrementRef() {
-      lastActivity = System.currentTimeMillis();
-      inUse--;
-    }
-
-    public synchronized boolean isIdle() {
-      //check whether the connection is in use or just created
-      if (inUse != 0) return false;
-      long currTime = System.currentTimeMillis();
-      if (currTime - lastActivity > maxIdleTime)
+      if (!calls.isEmpty() && !isClosed && running) {
         return true;
-      return false;
+      } else if (!running && !calls.isEmpty() ) {
+        close((IOException)new IOException().initCause(
+            new InterruptedException()));
+        return false;
+      } else {
+        close (null);
+        return false;
+      }
     }
 
     public InetSocketAddress getRemoteAddress() {
       return remoteId.getAddress();
     }
 
-    public void setCloseConnection() {
-      shouldCloseConnection = true;
+    /* Send a ping to the server if the time elapsed 
+     * since last I/O activity is equal to or greater than the ping interval
+     */
+    private synchronized void sendPing() throws IOException {
+      if (System.currentTimeMillis() - lastActivity >= pingInterval) {
+        touch();
+        out.writeInt(PING_CALL_ID);
+        out.flush();
+      }
     }
 
     public void run() {
       if (LOG.isDebugEnabled())
-        LOG.debug(getName() + ": starting");
+        LOG.debug(getName() + ": starting, having connections " 
+            + connections.size());
+      
+      while (waitForWork()) {//wait here for work - read or close connection
+        receiveResponse();
+      }
+      
+      if (LOG.isDebugEnabled())
+        LOG.debug(getName() + ": stopped, remaining connections "
+            + connections.size());
+    }
+
+    /** Initiates a call by sending the parameter to the remote server.
+     * Note: this is not called from the Connection thread, but by other
+     * threads.
+     */
+    public void sendParam(Call call) {
+      synchronized (this) {
+        if (isClosed) {
+          return;
+        }
+      }
+
       try {
-        while (running) {
-          int id;
-          //wait here for work - read connection or close connection
-          if (waitForWork() == false)
-            break;
-          try {
-            id = in.readInt();                    // try to read an id
-          } catch (SocketTimeoutException e) {
-            continue;
-          }
+        synchronized (out) {
+          if (LOG.isDebugEnabled())
+            LOG.debug(getName() + " sending #" + call.id);
 
+          DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
+          //data to be written
+          d.writeInt(call.id);
+          call.param.write(d);
+          byte[] data = d.getData();
+          int dataLength = d.getLength();
+
+          out.writeInt(dataLength);      //first put the data length
+          out.write(data, 0, dataLength);//write the data
+          out.flush();
+        }
+      } catch(IOException e) {
+        close(e);
+      }
+    }  
+
+    private void receiveResponse() {
+      synchronized (this) {
+        if (isClosed) {
+          return;
+        }
+      }
+      touch();
+      
+      try {
+        synchronized (in) {
+          int id = in.readInt();                    // try to read an id
+
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + " got value #" + id);
 
           Call call = calls.remove(id);
+
           boolean isError = in.readBoolean();     // read if error
           if (isError) {
-            call.setResult(null, WritableUtils.readString(in),
-                           WritableUtils.readString(in));
+            call.setException(new RemoteException( WritableUtils.readString(in),
+                WritableUtils.readString(in)));
           } else {
             Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
-            try {
-              readingCall = call;
-              value.readFields(in);                 // read value
-            } finally {
-              readingCall = null;
-            }
-            call.setResult(value, null, null);
+            value.readFields(in);                 // read value
+            call.setValue(value);
           }
-          call.callComplete();                   // deliver result to caller
-          //received the response. So decrement the ref count
-          decrementRef();
         }
-      } catch (EOFException eof) {
-        // This is what happens when the remote side goes down
-      } catch (Exception e) {
-        LOG.info(StringUtils.stringifyException(e));
-      } finally {
-        //If there was no exception thrown in this method, then the only
-        //way we reached here is by breaking out of the while loop (after
-        //waitForWork). And if we took that route to reach here, we have 
-        //already removed the connection object in the ConnectionCuller thread.
-        //We don't want to remove this again as some other thread might have
-        //actually put a new Connection object in the table in the meantime.
-        synchronized (connections) {
-          if (connections.get(remoteId) == this) {
-            connections.remove(remoteId);
-          }
-        }
-        close();
+      } catch (IOException e) {
+        close(e);
       }
     }
+    
+    /** Close the connection. */
+    private synchronized void close(IOException cause) {
+      if (isClosed) {
+        return;
+      }
+      isClosed = true; // mark that this connection is closed
+      notifyAll(); // wake up the connection receiver thread
 
-    /** Initiates a call by sending the parameter to the remote server.
-     * Note: this is not called from the Connection thread, but by other
-     * threads.
-     */
-    public void sendParam(Call call) throws IOException {
-      boolean error = true;
-      try {
-        calls.put(call.id, call);
+      synchronized (in) {
         synchronized (out) {
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + " sending #" + call.id);
-          try {
-            writingCall = call;
-            DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
-                                                         //data to be written
-            d.writeInt(call.id);
-            call.param.write(d);
-            byte[] data = d.getData();
-            int dataLength = d.getLength();
+          // release the resources
+          close();
 
-            out.writeInt(dataLength);      //first put the data length
-            out.write(data, 0, dataLength);//write the data
-            out.flush();
-          } finally {
-            writingCall = null;
+          // clean up all calls
+          if (cause == null) {
+            if (!calls.isEmpty()) {
+              LOG.warn(
+              "A connection is closed for no cause and calls are not empty");
+              // clean up calls anyway
+              cause = new IOException("Unexpected closed connection");
+              for (Integer key : calls.keySet()) {
+                Call c = calls.remove(key);         
+                c.setException(cause);
+              }
+            }
+          } else {
+            // log the info
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("closing ipc connection to " + remoteId.address + ": " +
+                  StringUtils.stringifyException(cause));
+            }
+
+            // remove calls
+            for (Integer key : calls.keySet()) {
+              Call c = calls.remove(key);         
+              c.setException(cause); // local exception
+            }
           }
+
         }
-        error = false;
-      } finally {
-        if (error) {
-          synchronized (connections) {
-            if (connections.get(remoteId) == this)
-              connections.remove(remoteId);
-          }
-          close();                                // close on error
+      }
+    }
+    
+    /* close a connection no matter due to exception or not */
+    private void close() {
+      if (LOG.isDebugEnabled())
+        LOG.debug(getName() + ": closing");
+      
+      // first thing to do;take the connection out of the connection list
+      synchronized (connections) {
+        if (connections.get(remoteId) == this) {
+          connections.remove(remoteId);
         }
       }
-    }  
 
-    /** Close the connection. */
-    public void close() {
-      //socket may be null if the connection could not be established to the
-      //server in question, and the culler asked us to close the connection
-      if (socket == null) return;
-      try {
-        socket.close();                           // close socket
-      } catch (IOException e) {}
-      if (LOG.isDebugEnabled())
-        LOG.debug(getName() + ": closing");
+      // close the socket and streams
+      IOUtils.closeStream(in);
+      IOUtils.closeStream(out);
+      IOUtils.closeSocket(socket);
     }
+    
+    private boolean isClosed() {
+      return isClosed;
+    }
   }
 
   /** Call implementation used for parallel calls. */
@@ -397,7 +514,7 @@
     }
 
     /** Deliver result to result collector. */
-    public void callComplete() {
+    protected void callComplete() {
       results.callComplete(this);
     }
   }
@@ -422,58 +539,21 @@
     }
   }
 
-  private class ConnectionCuller extends Thread {
-
-    public static final int MIN_SLEEP_TIME = 1000;
-
-    public void run() {
-
-      LOG.debug(getName() + ": starting");
-
-      while (running) {
-        try {
-          Thread.sleep(MIN_SLEEP_TIME);
-        } catch (InterruptedException ie) {}
-
-        synchronized (connections) {
-          Iterator i = connections.values().iterator();
-          while (i.hasNext()) {
-            Connection c = (Connection)i.next();
-            if (c.isIdle()) { 
-              //We don't actually close the socket here (i.e., don't invoke
-              //the close() method). We leave that work to the response receiver
-              //thread. The reason for that is since we have taken a lock on the
-              //connections table object, we don't want to slow down the entire
-              //system if we happen to talk to a slow server.
-              i.remove();
-              synchronized (c) {
-                c.setCloseConnection();
-                c.notify();
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-
   /** Construct an IPC client whose values are of the given {@link Writable}
    * class. */
   public Client(Class valueClass, Configuration conf, 
       SocketFactory factory) {
     this.valueClass = valueClass;
-    this.timeout = conf.getInt("ipc.client.timeout", 10000);
-    this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime", 1000);
+    this.maxIdleTime = 
+      conf.getInt("ipc.client.connection.maxidletime", 10000); //10s
     this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
     this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false);
+    this.pingInterval = getPingInterval(conf);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The ping interval is" + this.pingInterval + "ms.");
+    }
     this.conf = conf;
     this.socketFactory = factory;
-    this.connectionCullerThread = new ConnectionCuller();
-    connectionCullerThread.setDaemon(true);
-    connectionCullerThread.setName(valueClass.getName() + " Connection Culler");
-    LOG.debug(valueClass.getName() + 
-              "Connection culler maxidletime= " + maxIdleTime + "ms");
-    connectionCullerThread.start();
   }
 
   /**
@@ -504,34 +584,25 @@
       return;
     }
     running = false;
-
-    connectionCullerThread.interrupt();
-    try {
-      connectionCullerThread.join();
-    } catch(InterruptedException e) {}
-
-    // close and wake up all connections
+    
+    // wake up all connections
     synchronized (connections) {
       for (Connection conn : connections.values()) {
         synchronized (conn) {
-          conn.setCloseConnection();
           conn.notifyAll();
         }
       }
     }
-    
+
     // wait until all connections are closed
     while (!connections.isEmpty()) {
       try {
         Thread.sleep(100);
-      } catch (InterruptedException e) {
+      } catch( InterruptedException e) {
       }
     }
   }
 
-  /** Sets the timeout used for network i/o. */
-  public void setTimeout(int timeout) { this.timeout = timeout; }
-
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception. */
@@ -543,20 +614,24 @@
   public Writable call(Writable param, InetSocketAddress addr, 
                        UserGroupInformation ticket)  
                        throws InterruptedException, IOException {
-    Connection connection = getConnection(addr, ticket);
     Call call = new Call(param);
+    Connection connection = getConnection(addr, ticket, call);
+    connection.sendParam(call);                 // send the parameter
     synchronized (call) {
-      connection.sendParam(call);                 // send the parameter
-      long wait = timeout;
-      do {
-        call.wait(wait);                       // wait for the result
-        wait = timeout - (System.currentTimeMillis() - call.lastActivity);
-      } while (!call.done && wait > 0);
+      while (!call.done) {
+        try {
+          call.wait();                           // wait for the result
+        } catch (InterruptedException ignored) {}
+      }
 
       if (call.error != null) {
-        throw new RemoteException(call.errorClass, call.error);
-      } else if (!call.done) {
-        throw new SocketTimeoutException("timed out waiting for rpc response");
+        if (call.error instanceof RemoteException) {
+          call.error.fillInStackTrace();
+          throw call.error;
+        } else { // local exception
+          throw (IOException)new IOException(
+              "Call failed on local exception").initCause(call.error);
+        }
       } else {
         return call.value;
       }
@@ -576,7 +651,7 @@
       for (int i = 0; i < params.length; i++) {
         ParallelCall call = new ParallelCall(params[i], results, i);
         try {
-          Connection connection = getConnection(addresses[i], null);
+          Connection connection = getConnection(addresses[i], null, call);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
           LOG.info("Calling "+addresses[i]+" caught: " + 
@@ -584,22 +659,21 @@
           results.size--;                         //  wait for one fewer result
         }
       }
-      try {
-        results.wait(timeout);                    // wait for all results
-      } catch (InterruptedException e) {}
+      while (results.count != results.size) {
+        try {
+          results.wait();                    // wait for all results
+        } catch (InterruptedException e) {}
+      }
 
-      if (results.count == 0) {
-        throw new IOException("no responses");
-      } else {
-        return results.values;
-      }
+      return results.values;
     }
   }
 
   /** Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given host/port are reused. */
   private Connection getConnection(InetSocketAddress addr, 
-                                   UserGroupInformation ticket)
+                                   UserGroupInformation ticket,
+                                   Call call)
                                    throws IOException {
     Connection connection;
     /* we could avoid this allocation for each RPC by having a  
@@ -607,15 +681,16 @@
      * refs for keys in HashMap properly. For now its ok.
      */
     ConnectionId remoteId = new ConnectionId(addr, ticket);
-    synchronized (connections) {
-      connection = connections.get(remoteId);
-      if (connection == null) {
-        connection = new Connection(remoteId);
-        connections.put(remoteId, connection);
-        connection.start();
+    do {
+      synchronized (connections) {
+        connection = connections.get(remoteId);
+        if (connection == null) {
+          connection = new Connection(remoteId);
+          connections.put(remoteId, connection);
+        }
       }
-      connection.incrementRef();
-    }
+    } while (!connection.addCall(call));
+    
     //we don't invoke the method below inside "synchronized (connections)"
     //block above. The reason for that is if the server happens to be slow,
     //it will take longer to establish a connection and that will slow the
Index: src/java/org/apache/hadoop/dfs/DFSClient.java
===================================================================
--- src/java/org/apache/hadoop/dfs/DFSClient.java	(revision 639496)
+++ src/java/org/apache/hadoop/dfs/DFSClient.java	(working copy)
@@ -98,8 +98,6 @@
 
   private static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
     throws IOException {
-    RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
-        5, 200, TimeUnit.MILLISECONDS);
     RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
         5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
     
@@ -112,28 +110,10 @@
     exceptionToPolicyMap.put(RemoteException.class, 
         RetryPolicies.retryByRemoteException(
             RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
-    exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
     RetryPolicy methodPolicy = RetryPolicies.retryByException(
         RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
     Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
     
-    methodNameToPolicyMap.put("open", methodPolicy);
-    methodNameToPolicyMap.put("setReplication", methodPolicy);
-    methodNameToPolicyMap.put("abandonBlock", methodPolicy);
-    methodNameToPolicyMap.put("abandonFileInProgress", methodPolicy);
-    methodNameToPolicyMap.put("reportBadBlocks", methodPolicy);
-    methodNameToPolicyMap.put("exists", methodPolicy);
-    methodNameToPolicyMap.put("isDir", methodPolicy);
-    methodNameToPolicyMap.put("getListing", methodPolicy);
-    methodNameToPolicyMap.put("getHints", methodPolicy);
-    methodNameToPolicyMap.put("getBlockLocations", methodPolicy);
-    methodNameToPolicyMap.put("renewLease", methodPolicy);
-    methodNameToPolicyMap.put("getStats", methodPolicy);
-    methodNameToPolicyMap.put("getDatanodeReport", methodPolicy);
-    methodNameToPolicyMap.put("getPreferredBlockSize", methodPolicy);
-    methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
-    methodNameToPolicyMap.put("complete", methodPolicy);
-    methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
     methodNameToPolicyMap.put("create", methodPolicy);
 
     return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
