Index: src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java
===================================================================
--- src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java	(revision 627923)
+++ src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java	(working copy)
@@ -63,7 +63,6 @@
     public TestServer(final int handlerCount, final boolean sleep) 
                                               throws IOException {
       super(ADDRESS, 0, BytesWritable.class, handlerCount, conf);
-      this.setTimeout(1000);
       // Set the buffer size to half of the maximum parameter/result size 
       // to force the socket to block
       this.setSocketSendBufSize(BYTE_COUNT / 2);
@@ -94,7 +93,6 @@
       this.client = client;
       this.address = address;
       this.count = count;
-      client.setTimeout(1000);
     }
 
     @Override
Index: src/test/org/apache/hadoop/ipc/TestIPC.java
===================================================================
--- src/test/org/apache/hadoop/ipc/TestIPC.java	(revision 627923)
+++ src/test/org/apache/hadoop/ipc/TestIPC.java	(working copy)
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
 
 import java.util.Random;
 import java.io.IOException;
@@ -50,7 +51,6 @@
     public TestServer(int handlerCount, boolean sleep) 
       throws IOException {
       super(ADDRESS, 0, LongWritable.class, handlerCount, conf);
-      this.setTimeout(1000);
       this.sleep = sleep;
     }
 
@@ -74,7 +74,6 @@
       this.client = client;
       this.server = server;
       this.count = count;
-      client.setTimeout(1000);
     }
 
     public void run() {
@@ -89,7 +88,7 @@
             break;
           }
         } catch (Exception e) {
-          LOG.fatal("Caught: " + e);
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
           failed = true;
         }
       }
@@ -107,7 +106,6 @@
       this.client = client;
       this.addresses = addresses;
       this.count = count;
-      client.setTimeout(1000);
     }
 
     public void run() {
@@ -125,7 +123,7 @@
             }
           }
         } catch (Exception e) {
-          LOG.fatal("Caught: " + e);
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
           failed = true;
         }
       }
Index: src/java/org/apache/hadoop/ipc/Server.java
===================================================================
--- src/java/org/apache/hadoop/ipc/Server.java	(revision 627923)
+++ src/java/org/apache/hadoop/ipc/Server.java	(working copy)
@@ -77,13 +77,6 @@
   public static final byte CURRENT_VERSION = 1;
   
   /**
-   * How much time should be allocated for actually running the handler?
-   * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME
-   * are ignored when the handler takes them off the queue.
-   */
-  private static final float MAX_CALL_QUEUE_TIME = 0.6f;
-  
-  /**
    * How many calls/handler are allowed in the queue.
    */
   private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
@@ -144,7 +137,7 @@
   private String bindAddress; 
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
-  private Class paramClass;                       // class of call parameters
+  private Class<?> paramClass;                    // class of call parameters
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
   private int thresholdIdleConnections;           // the number of idle connections
@@ -159,9 +152,7 @@
   
   private Configuration conf;
 
-  private int timeout;
-  private long maxCallStartAge;
-  private int maxQueueSize;
+  private int maxQueueSize;  // we do not need this for now but leave here for later usage
   private int socketSendBufferSize;
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
 
@@ -178,7 +169,7 @@
   private Handler[] handlers = null;
 
   /**
-   * A convience method to bind to a given address and report 
+   * A convenience method to bind to a given address and report 
    * better exceptions if the address is not a valid host.
    * @param socket the socket to bind
    * @param address the address to bind to
@@ -317,10 +308,10 @@
         SelectionKey key = null;
         try {
           selector.select();
-          Iterator iter = selector.selectedKeys().iterator();
+          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
           
           while (iter.hasNext()) {
-            key = (SelectionKey)iter.next();
+            key = iter.next();
             iter.remove();
             try {
               if (key.isValid()) {
@@ -453,12 +444,11 @@
     public void run() {
       LOG.info(getName() + ": starting");
       SERVER.set(Server.this);
-      long lastPurgeTime = 0;   // last check for old calls.
 
       while (running) {
         try {
           waitPending();     // If a channel is being registered, wait.
-          writeSelector.select(maxCallStartAge);
+          writeSelector.select();
           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
           while (iter.hasNext()) {
             SelectionKey key = iter.next();
@@ -471,25 +461,6 @@
               LOG.info(getName() + ": doAsyncWrite threw exception " + e);
             }
           }
-          long now = System.currentTimeMillis();
-          if (now < lastPurgeTime + maxCallStartAge) {
-            continue;
-          }
-          lastPurgeTime = now;
-          //
-          // If there were some calls that have not been sent out for a
-          // long time, discard them.
-          //
-          LOG.debug("Checking for old call responses.");
-          iter = writeSelector.keys().iterator();
-          while (iter.hasNext()) {
-            SelectionKey key = iter.next();
-            try {
-              doPurge(key, now);
-            } catch (IOException e) {
-              LOG.warn("Error in purging old calls " + e);
-            }
-          }
         } catch (OutOfMemoryError e) {
           //
           // we can run out of memory if we have too many threads
@@ -531,49 +502,6 @@
       }
     }
 
-    //
-    // Remove calls that have been pending in the responseQueue 
-    // for a long time.
-    //
-    private void doPurge(SelectionKey key, long now) throws IOException {
-      Call call = (Call)key.attachment();
-      if (call == null) {
-        return;
-      }
-      if (key.channel() != call.connection.channel) {
-        LOG.info("doPurge: bad channel");
-        return;
-      }
-      boolean close = false;
-      LinkedList<Call> responseQueue = call.connection.responseQueue;
-      synchronized (responseQueue) {
-        Iterator<Call> iter = responseQueue.listIterator(0);
-        while (iter.hasNext()) {
-          call = iter.next();
-          if (now > call.receivedTime + maxCallStartAge) {
-            LOG.info(getName() + ", call " + call +
-                     ": response discarded for being too old (" +
-                     (now - call.receivedTime) + ")");
-            iter.remove();
-            if (call.response.position() > 0) {
-              /* We should probably use a different start time 
-               * than receivedTime. receivedTime starts when the RPC
-               * was first read.
-               * We have written a partial response. will close the
-               * connection for now.
-               */
-              close = true;
-              break;
-            }            
-          }
-        }
-      }
-      
-      if (close) {
-        closeConnection(call.connection);
-      }
-    }
-
     // Processes one response. Returns true if there are no more pending
     // data for this channel.
     //
@@ -791,6 +719,10 @@
         if (data == null) {
           dataLengthBuffer.flip();
           dataLength = dataLengthBuffer.getInt();
+       
+          if (dataLength == -1) {
+            return 0;  //ping message
+          }
           data = ByteBuffer.allocate(dataLength);
         }
         
@@ -837,10 +769,6 @@
         
       Call call = new Call(id, param, this);
       synchronized (callQueue) {
-        if (callQueue.size() >= maxQueueSize) {
-          Call oldCall = callQueue.removeFirst();
-          LOG.warn("Call queue overflow discarding oldest call " + oldCall);
-        }
         callQueue.addLast(call);              // queue the call
         callQueue.notify();                   // wake up a waiting handler
       }
@@ -877,24 +805,12 @@
           Call call;
           synchronized (callQueue) {
             while (running && callQueue.size()==0) { // wait for a call
-              callQueue.wait(timeout);
+              callQueue.wait();
             }
             if (!running) break;
             call = callQueue.removeFirst(); // pop the queue
           }
 
-          // throw the message away if it is too old
-          if (System.currentTimeMillis() - call.receivedTime > 
-              maxCallStartAge) {
-            ReflectionUtils.logThreadInfo(LOG, "Discarding call " + call, 30);
-            int timeInQ = (int) (System.currentTimeMillis() - call.receivedTime);
-            LOG.warn(getName()+", call "+call
-                     +": discarded for being too old (" +
-                     timeInQ + ")");
-            rpcMetrics.rpcDiscardedOps.inc(timeInQ);
-            continue;
-          }
-          
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + ": has #" + call.id + " from " +
                       call.connection);
@@ -951,7 +867,7 @@
    * the number of handler threads that will be used to process calls.
    * 
    */
-  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf,
+  protected Server(String bindAddress, int port, Class<?> paramClass, int handlerCount, Configuration conf,
                   String serverName) 
     throws IOException {
     this.bindAddress = bindAddress;
@@ -959,9 +875,7 @@
     this.port = port;
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
-    this.timeout = conf.getInt("ipc.client.timeout", 10000);
     this.socketSendBufferSize = 0;
-    maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
     maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
@@ -989,9 +903,6 @@
     }
   }
   
-  /** Sets the timeout used for network i/o. */
-  public void setTimeout(int timeout) { this.timeout = timeout; }
-
   /** Sets the socket buffer size used for responding to RPCs */
   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
 
Index: src/java/org/apache/hadoop/ipc/Client.java
===================================================================
--- src/java/org/apache/hadoop/ipc/Client.java	(revision 627923)
+++ src/java/org/apache/hadoop/ipc/Client.java	(working copy)
@@ -31,6 +31,7 @@
 import java.io.BufferedOutputStream;
 import java.io.FilterInputStream;
 import java.io.FilterOutputStream;
+import java.io.InputStream;
 
 import java.util.Hashtable;
 import java.util.Iterator;
@@ -41,6 +42,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.FSConstants;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -63,16 +65,16 @@
   private Hashtable<ConnectionId, Connection> connections =
     new Hashtable<ConnectionId, Connection>();
 
-  private Class valueClass;                       // class of call values
-  private int timeout;// timeout for calls
+  private Class<?> valueClass;                       // class of call values
   private int counter;                            // counter for call ids
   private boolean running = true;                 // true while client runs
-  private Configuration conf;
-  private int maxIdleTime; //connections will be culled if it was idle for 
+  final private Configuration conf;
+  final private int maxIdleTime; //connections will be culled if it was idle for 
                            //maxIdleTime msecs
-  private int maxRetries; //the max. no. of retries for socket connections
+  final private int maxRetries; //the max. no. of retries for socket connections
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
-  private Thread connectionCullerThread;
+  private int pingInterval; // how often sends ping to the server in msecs
+
   private SocketFactory socketFactory;           // how to create sockets
   private boolean simulateError = false;         // unit tests
 
@@ -81,9 +83,7 @@
     int id;                                       // call id
     Writable param;                               // parameter
     Writable value;                               // value, null if error
-    String error;                                 // exception, null if value
-    String errorClass;                            // class of exception
-    long lastActivity;                            // time of last i/o
+    IOException error;                            // exception, null if value
     boolean done;                                 // true when call is done
 
     protected Call(Writable param) {
@@ -91,30 +91,34 @@
       synchronized (Client.this) {
         this.id = counter++;
       }
-      touch();
     }
 
-    /** Called by the connection thread when the call is complete and the
-     * value or error string are available.  Notifies by default.  */
-    public synchronized void callComplete() {
+    /** Indicate when the call is complete and the
+     * value or error are available.  Notifies by default.  */
+    protected synchronized void callComplete() {
+      this.done = true;
       notify();                                 // notify caller
     }
 
-    /** Update lastActivity with the current time. */
-    public synchronized void touch() {
-      lastActivity = System.currentTimeMillis();
+    /** Set the exception when there is an error.
+     * Notify the caller the call is done.
+     * 
+     * @param error exception thrown by the call; either local or remote
+     */
+    public synchronized void setException(IOException error) {
+      this.error = error;
+      callComplete();
     }
-
-    /** Update lastActivity with the current time. */
-    public synchronized void setResult(Writable value, 
-                                       String errorClass,
-                                       String error) {
+    
+    /** Set the return value when there is no error. 
+     * Notify the caller the call is done.
+     * 
+     * @param value return value of the call.
+     */
+    public synchronized void setValue(Writable value) {
       this.value = value;
-      this.error = error;
-      this.errorClass =errorClass;
-      this.done = true;
+      callComplete();
     }
-    
   }
 
   /** Thread that reads responses and notifies callers.  Each connection owns a
@@ -127,11 +131,8 @@
     private DataOutputStream out;
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
-    private Call readingCall;
-    private Call writingCall;
-    private int inUse = 0;
-    private long lastActivity = 0;
-    private boolean shouldCloseConnection = false;
+    private long lastActivity = 0;     // last I/O activity time
+    private boolean isClosed = false;  // indicate if the connection is closed
 
     public Connection(InetSocketAddress address) throws IOException {
       this(new ConnectionId(address, null));
@@ -148,29 +149,73 @@
       this.setDaemon(true);
     }
 
+    /** Update lastActivity with the current time. */
+    private synchronized void touch() {
+      lastActivity = System.currentTimeMillis();
+    }
+
+    /** This class sends a ping to the remote side when timeout on
+     * reading. If no failure is detected, it retries until at least
+     * a byte is read.
+     */
+    private class PingInputStream extends FilterInputStream {
+      /* constructor */
+      protected PingInputStream(InputStream in) {
+        super(in);
+      }
+
+      /** Read a byte from the stream.
+       * Send a ping if timeout on read. Retries if no failure is detected
+       * until a byte is read.
+       */
+      public int read() throws IOException {
+        while (true) {
+          try {
+            return super.read();
+          } catch (SocketTimeoutException e) {
+            sendPing();
+          }
+        }
+      }
+
+      /** Read bytes into a buffer starting from offset <code>off</code>
+       * Send a ping if timeout on read. Retries if no failure is detected
+       * until a byte is read.
+       * 
+       * @Return the total number of bytes read; -1 if the connection is closed.
+       */
+      public int read(byte[] buf, int off, int len) throws IOException {
+        while (true) {
+          try {
+            return super.read(buf, off, len);
+          } catch (SocketTimeoutException e) {
+            sendPing();
+          }
+        }
+      }
+    }
+    
+    /** Connect to the server and set up the I/O streams. It then sends
+     * a header to the server and starts
+     * the connection thread that waits for responses.
+     * 
+     * @throws IOException when any IOException during streams setup.
+     */
     public synchronized void setupIOstreams() throws IOException {
       if (socket != null) {
-        notify();
         return;
       }
       short failures = 0;
+      try {
       while (true) {
         try {
           this.socket = socketFactory.createSocket();
           this.socket.setTcpNoDelay(tcpNoDelay);
           this.socket.connect(remoteId.getAddress(), FSConstants.READ_TIMEOUT);
+          this.socket.setSoTimeout(pingInterval);
           break;
         } catch (IOException ie) { //SocketTimeoutException is also caught 
           if (failures == maxRetries) {
-            //reset inUse so that the culler gets a chance to throw this
-            //connection object out of the table. We don't want to increment
-            //inUse to infinity (everytime getConnection is called inUse is
-            //incremented)!
-            inUse = 0;
-            // set socket to null so that the next call to setupIOstreams
-            // can start the process of connect all over again.
-            socket.close();
-            socket = null;
             throw ie;
           }
           failures++;
@@ -182,30 +227,22 @@
           }
         }
       }
-      socket.setSoTimeout(timeout);
       this.in = new DataInputStream
         (new BufferedInputStream
-         (new FilterInputStream(socket.getInputStream()) {
-             public int read(byte[] buf, int off, int len) throws IOException {
-               int value = super.read(buf, off, len);
-               if (readingCall != null) {
-                 readingCall.touch();
-               }
-               return value;
-             }
-           }));
+         (new PingInputStream(socket.getInputStream())));
       this.out = new DataOutputStream
-        (new BufferedOutputStream
-         (new FilterOutputStream(socket.getOutputStream()) {
-             public void write(byte[] buf, int o, int len) throws IOException {
-               out.write(buf, o, len);
-               if (writingCall != null) {
-                 writingCall.touch();
-               }
-             }
-           }));
+        (new BufferedOutputStream(socket.getOutputStream()));
       writeHeader();
-      notify();
+      
+      // update last activity time
+      touch();
+      
+      // start the receiver thread after the socket connection has been set up
+      start();
+      } catch (IOException e) {
+        close(e);
+        throw e;
+      }
     }
 
     private synchronized void writeHeader() throws IOException {
@@ -220,104 +257,70 @@
       out.write(buf.getData(), 0, bufLen);
     }
     
+    /* wait till someone signals us to start reading RPC response or
+     * it is idle long enough so this connection is to be closed. 
+     * We need to wait when inUse is 0.
+     * We stop waiting if it is idle long enough or
+     * it is time to read a response.
+     * 
+     * Return true if it is time to read a response; false if this connection
+     * has been idle too long or is closed.
+     */
     private synchronized boolean waitForWork() {
-      //wait till someone signals us to start reading RPC response or
-      //close the connection. If we are idle long enough (blocked in wait),
-      //the ConnectionCuller thread will wake us up and ask us to close the
-      //connection. 
-      //We need to wait when inUse is 0 or socket is null (it may be null if
-      //the Connection object has been created but the socket connection
-      //has not been setup yet). We stop waiting if we have been asked to close
-      //connection
-      while ((inUse == 0 || socket == null) && !shouldCloseConnection) {
+      while (calls.size() == 0 && !isClosed)  {
+        long timeout = maxIdleTime-(System.currentTimeMillis()-lastActivity);
+        if (timeout<=0) {
+          break;
+        }
         try {
-          wait();
+          wait(timeout);
         } catch (InterruptedException e) {}
       }
-      return !shouldCloseConnection;
+      return calls.size() != 0 && !isClosed;
     }
 
-    private synchronized void incrementRef() {
-      inUse++;
-    }
-
-    private synchronized void decrementRef() {
-      lastActivity = System.currentTimeMillis();
-      inUse--;
-    }
-
-    public synchronized boolean isIdle() {
-      //check whether the connection is in use or just created
-      if (inUse != 0) return false;
-      long currTime = System.currentTimeMillis();
-      if (currTime - lastActivity > maxIdleTime)
-        return true;
-      return false;
-    }
-
     public InetSocketAddress getRemoteAddress() {
       return remoteId.getAddress();
     }
 
-    public void setCloseConnection() {
-      shouldCloseConnection = true;
+    /* Send a ping to the server if the time elapsed 
+     * since last I/O activity is equal to or greater than the ping interval
+     */
+    synchronized private void sendPing() throws IOException {
+      if (System.currentTimeMillis() - lastActivity >= pingInterval) {
+        touch();
+        out.write(-1);
+        out.flush();
+      }
     }
 
     public void run() {
+      LOG.info(getName() + ": starting");
       if (LOG.isDebugEnabled())
         LOG.debug(getName() + ": starting");
+      IOException error = null;
       try {
         while (running) {
-          int id;
           //wait here for work - read connection or close connection
           if (waitForWork() == false)
             break;
-          try {
-            id = in.readInt();                    // try to read an id
-          } catch (SocketTimeoutException e) {
-            continue;
-          }
-
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + " got value #" + id);
-
-          Call call = calls.remove(id);
-          boolean isError = in.readBoolean();     // read if error
-          if (isError) {
-            call.setResult(null, WritableUtils.readString(in),
-                           WritableUtils.readString(in));
-          } else {
-            Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
-            try {
-              waitForEndSimulation();
-              readingCall = call;
-              value.readFields(in);                 // read value
-            } finally {
-              readingCall = null;
-            }
-            call.setResult(value, null, null);
-          }
-          call.callComplete();                   // deliver result to caller
-          //received the response. So decrement the ref count
-          decrementRef();
+          receiveResponse();
         }
-      } catch (EOFException eof) {
-        // This is what happens when the remote side goes down
-      } catch (Exception e) {
-        LOG.info(StringUtils.stringifyException(e));
+      } catch (IOException e) {
+        error = e;
       } finally {
+        if (error != null && !running) { // get here because stop is called
+          error = new IOException();
+          error.initCause(new InterruptedException());
+        }
         //If there was no exception thrown in this method, then the only
         //way we reached here is by breaking out of the while loop (after
         //waitForWork). And if we took that route to reach here, we have 
         //already removed the connection object in the ConnectionCuller thread.
         //We don't want to remove this again as some other thread might have
         //actually put a new Connection object in the table in the meantime.
-        synchronized (connections) {
-          if (connections.get(remoteId) == this) {
-            connections.remove(remoteId);
-          }
-        }
-        close();
+        close(error);
+        LOG.info(getName() + ": stopped");
       }
     }
 
@@ -326,48 +329,107 @@
      * threads.
      */
     public void sendParam(Call call) throws IOException {
-      boolean error = true;
       try {
-        calls.put(call.id, call);
+        touch();
+        synchronized (this) {
+          calls.put(call.id, call);
+          if (calls.size() == 1) {
+            notify();
+          }
+        }
         synchronized (out) {
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + " sending #" + call.id);
-          try {
-            writingCall = call;
-            DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
-                                                         //data to be written
-            d.writeInt(call.id);
-            call.param.write(d);
-            byte[] data = d.getData();
-            int dataLength = d.getLength();
+          DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
+          //data to be written
+          d.writeInt(call.id);
+          call.param.write(d);
+          byte[] data = d.getData();
+          int dataLength = d.getLength();
 
-            out.writeInt(dataLength);      //first put the data length
-            out.write(data, 0, dataLength);//write the data
-            out.flush();
-          } finally {
-            writingCall = null;
-          }
+          out.writeInt(dataLength);      //first put the data length
+          out.write(data, 0, dataLength);//write the data
+          out.flush();
         }
-        error = false;
-      } finally {
-        if (error) {
-          synchronized (connections) {
-            if (connections.get(remoteId) == this)
-              connections.remove(remoteId);
+      } catch(IOException e) {
+        close(e);
+        throw e;
+      }
+    }  
+
+    private void receiveResponse() throws IOException {
+      touch();
+      
+      int id = in.readInt();                    // try to read an id
+
+      if (LOG.isDebugEnabled())
+        LOG.debug(getName() + " got value #" + id);
+
+      Call call;
+      synchronized (this) {
+        call = calls.remove(id);
+      }
+      boolean isError = in.readBoolean();     // read if error
+      if (isError) {
+        call.setException(new RemoteException( WritableUtils.readString(in),
+                       WritableUtils.readString(in)));
+      } else {
+        Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
+        waitForEndSimulation();
+        value.readFields(in);                 // read value
+        call.setValue(value);
+      }
+    }
+    
+    /** Close the connection. */
+    private synchronized void close(IOException cause) {
+      if (isClosed) {
+        return;
+      }
+      
+      // release the resources
+      close();
+
+      // clean up all calls
+      if (cause == null) {
+        if(!calls.isEmpty()) {
+          LOG.warn(
+              "A connection is closed for no cause and calls are not empty");
+          // clean up calls anyway
+          cause = new IOException("Unexpected closed connection");
+          for(Call c : calls.values()) {
+            c.setException(cause);
           }
-          close();                                // close on error
         }
+      } else {
+        // log the info
+        LOG.warn("closing ipc connection: " + remoteId.address + " " +
+            StringUtils.stringifyException(cause));
+
+        // remove calls
+        for(Call c: calls.values()) {
+          c.setException(cause); // local exception
+        }
       }
-    }  
+      
+      isClosed = true;
+      notify(); // wait up the connection receiver thread
+    }
+    
+    /* close a connection no matter due to exception or not */
+    private synchronized void close() {
+      // first thing to do;take the connection out of the connection list
+      synchronized (connections) {
+        if (connections.get(remoteId) == this) {
+          connections.remove(remoteId);
+        }
+      }
 
-    /** Close the connection. */
-    public void close() {
-      //socket may be null if the connection could not be established to the
-      //server in question, and the culler asked us to close the connection
-      if (socket == null) return;
-      try {
-        socket.close();                           // close socket
-      } catch (IOException e) {}
+      //close the socket
+      IOUtils.closeStream(in);
+      IOUtils.closeStream(out);
+      IOUtils.closeSocket(socket);
+      
       if (LOG.isDebugEnabled())
         LOG.debug(getName() + ": closing");
     }
@@ -385,7 +447,7 @@
     }
 
     /** Deliver result to result collector. */
-    public void callComplete() {
+    protected void callComplete() {
       results.callComplete(this);
     }
   }
@@ -410,58 +472,17 @@
     }
   }
 
-  private class ConnectionCuller extends Thread {
-
-    public static final int MIN_SLEEP_TIME = 1000;
-
-    public void run() {
-
-      LOG.debug(getName() + ": starting");
-
-      while (running) {
-        try {
-          Thread.sleep(MIN_SLEEP_TIME);
-        } catch (InterruptedException ie) {}
-
-        synchronized (connections) {
-          Iterator i = connections.values().iterator();
-          while (i.hasNext()) {
-            Connection c = (Connection)i.next();
-            if (c.isIdle()) { 
-              //We don't actually close the socket here (i.e., don't invoke
-              //the close() method). We leave that work to the response receiver
-              //thread. The reason for that is since we have taken a lock on the
-              //connections table object, we don't want to slow down the entire
-              //system if we happen to talk to a slow server.
-              i.remove();
-              synchronized (c) {
-                c.setCloseConnection();
-                c.notify();
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-
   /** Construct an IPC client whose values are of the given {@link Writable}
    * class. */
   public Client(Class valueClass, Configuration conf, 
       SocketFactory factory) {
     this.valueClass = valueClass;
-    this.timeout = conf.getInt("ipc.client.timeout", 10000);
-    this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime", 1000);
+    this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime", 300000); //5 mins
     this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
     this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false);
+    this.pingInterval = conf.getInt("ipc.ping.interval", 60000); // 1 min
     this.conf = conf;
     this.socketFactory = factory;
-    this.connectionCullerThread = new ConnectionCuller();
-    connectionCullerThread.setDaemon(true);
-    connectionCullerThread.setName(valueClass.getName() + " Connection Culler");
-    LOG.debug(valueClass.getName() + 
-              "Connection culler maxidletime= " + maxIdleTime + "ms");
-    connectionCullerThread.start();
   }
 
   /**
@@ -478,15 +499,8 @@
   public void stop() {
     LOG.info("Stopping client");
     running = false;
-    connectionCullerThread.interrupt();
-    try {
-      connectionCullerThread.join();
-    } catch(InterruptedException e) {}
   }
 
-  /** Sets the timeout used for network i/o. */
-  public void setTimeout(int timeout) { this.timeout = timeout; }
-
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception. */
@@ -502,16 +516,14 @@
     Call call = new Call(param);
     synchronized (call) {
       connection.sendParam(call);                 // send the parameter
-      long wait = timeout;
       do {
-        call.wait(wait);                       // wait for the result
-        wait = timeout - (System.currentTimeMillis() - call.lastActivity);
-      } while (!call.done && wait > 0);
+        try {
+          call.wait();                           // wait for the result
+        } catch (InterruptedException ignored) {}
+      } while (!call.done);
 
       if (call.error != null) {
-        throw new RemoteException(call.errorClass, call.error);
-      } else if (!call.done) {
-        throw new SocketTimeoutException("timed out waiting for rpc response");
+        throw call.error;
       } else {
         return call.value;
       }
@@ -539,15 +551,13 @@
           results.size--;                         //  wait for one fewer result
         }
       }
-      try {
-        results.wait(timeout);                    // wait for all results
-      } catch (InterruptedException e) {}
+      do {
+        try {
+          results.wait();                    // wait for all results
+        } catch (InterruptedException e) {}
+      } while (results.count != results.size);
 
-      if (results.count == 0) {
-        throw new IOException("no responses");
-      } else {
-        return results.values;
-      }
+      return results.values;
     }
   }
 
@@ -567,9 +577,7 @@
       if (connection == null) {
         connection = new Connection(remoteId);
         connections.put(remoteId, connection);
-        connection.start();
       }
-      connection.incrementRef();
     }
     //we don't invoke the method below inside "synchronized (connections)"
     //block above. The reason for that is if the server happens to be slow,
