Index: src/test/org/apache/hadoop/ipc/TestIPC.java
===================================================================
--- src/test/org/apache/hadoop/ipc/TestIPC.java	(revision 596211)
+++ src/test/org/apache/hadoop/ipc/TestIPC.java	(working copy)
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
 
 import java.util.Random;
 import java.io.IOException;
@@ -74,7 +75,6 @@
       this.client = client;
       this.server = server;
       this.count = count;
-      client.setTimeout(1000);
     }
 
     public void run() {
@@ -89,7 +89,7 @@
             break;
           }
         } catch (Exception e) {
-          LOG.fatal("Caught: " + e);
+          LOG.fatal("caught " + StringUtils.stringifyException(e));
           failed = true;
         }
       }
@@ -107,7 +107,6 @@
       this.client = client;
       this.addresses = addresses;
       this.count = count;
-      client.setTimeout(1000);
     }
 
     public void run() {
@@ -125,7 +124,7 @@
             }
           }
         } catch (Exception e) {
-          LOG.fatal("Caught: " + e);
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
           failed = true;
         }
       }
Index: src/java/org/apache/hadoop/ipc/RPC.java
===================================================================
--- src/java/org/apache/hadoop/ipc/RPC.java	(revision 596211)
+++ src/java/org/apache/hadoop/ipc/RPC.java	(working copy)
@@ -297,7 +297,7 @@
   /** Expert: Make multiple, parallel calls to a set of servers. */
   public static Object[] call(Method method, Object[][] params,
                               InetSocketAddress[] addrs, Configuration conf)
-    throws IOException {
+    throws IOException,InterruptedException {
 
     Invocation[] invocations = new Invocation[params.length];
     for (int i = 0; i < params.length; i++)
Index: src/java/org/apache/hadoop/ipc/Server.java
===================================================================
--- src/java/org/apache/hadoop/ipc/Server.java	(revision 596211)
+++ src/java/org/apache/hadoop/ipc/Server.java	(working copy)
@@ -35,7 +35,6 @@
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.UnknownHostException;
 
@@ -66,18 +65,6 @@
    */
   public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
   
-  /**
-   * How much time should be allocated for actually running the handler?
-   * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME
-   * are ignored when the handler takes them off the queue.
-   */
-  private static final float MAX_CALL_QUEUE_TIME = 0.6f;
-  
-  /**
-   * How many calls/handler are allowed in the queue.
-   */
-  private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
-  
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.Server");
 
@@ -117,7 +104,7 @@
   private String bindAddress; 
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
-  private Class paramClass;                       // class of call parameters
+  private Class<?> paramClass;                    // class of call parameters
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
   private int thresholdIdleConnections;           // the number of idle connections
@@ -131,8 +118,6 @@
   private Configuration conf;
 
   private int timeout;
-  private long maxCallStartAge;
-  private int maxQueueSize;
 
   volatile private boolean running = true;         // true while server runs
   private LinkedList<Call> callQueue = new LinkedList<Call>(); // queued calls
@@ -178,13 +163,11 @@
     private int id;                               // the client's call id
     private Writable param;                       // the parameter passed
     private Connection connection;                // connection to client
-    private long receivedTime;                    // the time received
 
     public Call(int id, Writable param, Connection connection) {
       this.id = id;
       this.param = param;
       this.connection = connection;
-      this.receivedTime = System.currentTimeMillis();
     }
     
     public String toString() {
@@ -283,10 +266,10 @@
         SelectionKey key = null;
         try {
           selector.select();
-          Iterator iter = selector.selectedKeys().iterator();
+          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
           
           while (iter.hasNext()) {
-            key = (SelectionKey)iter.next();
+            key = iter.next();
             iter.remove();
             try {
               if (key.isValid()) {
@@ -500,6 +483,10 @@
           firstData = false;
         }
         dataLength = dataLengthBuffer.getInt();
+        // if the length is -1, then it is a ping
+        if (dataLength == -1) {
+          return 0;
+        }
         data = ByteBuffer.allocate(dataLength);
       }
       count = channel.read(data);
@@ -525,10 +512,6 @@
         
       Call call = new Call(id, param, this);
       synchronized (callQueue) {
-        if (callQueue.size() >= maxQueueSize) {
-          Call oldCall = callQueue.removeFirst();
-          LOG.warn("Call queue overflow discarding oldest call " + oldCall);
-        }
         callQueue.addLast(call);              // queue the call
         callQueue.notify();                   // wake up a waiting handler
       }
@@ -573,16 +556,6 @@
             call = callQueue.removeFirst(); // pop the queue
           }
 
-          // throw the message away if it is too old
-          if (System.currentTimeMillis() - call.receivedTime > 
-              maxCallStartAge) {
-            ReflectionUtils.logThreadInfo(LOG, "Discarding call " + call, 30);
-            LOG.warn(getName()+", call "+call
-                     +": discarded for being too old (" +
-                     (System.currentTimeMillis() - call.receivedTime) + ")");
-            continue;
-          }
-          
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + ": has #" + call.id + " from " +
                       call.connection);
@@ -639,7 +612,8 @@
    * the number of handler threads that will be used to process calls.
    * 
    */
-  protected Server(String bindAddress, int port, Class paramClass, int handlerCount, Configuration conf) 
+  protected Server(String bindAddress, int port, Class<?> paramClass, 
+                   int handlerCount, Configuration conf) 
     throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;
@@ -647,8 +621,6 @@
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
     this.timeout = conf.getInt("ipc.client.timeout", 10000);
-    maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
-    maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
Index: src/java/org/apache/hadoop/ipc/Client.java
===================================================================
--- src/java/org/apache/hadoop/ipc/Client.java	(revision 596211)
+++ src/java/org/apache/hadoop/ipc/Client.java	(working copy)
@@ -29,8 +29,6 @@
 import java.io.DataOutputStream;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
-import java.io.FilterInputStream;
-import java.io.FilterOutputStream;
 
 import java.util.Hashtable;
 import java.util.Iterator;
@@ -58,15 +56,16 @@
   /** Should the client send the header on the connection? */
   private static final boolean SEND_HEADER = true;
   private static final byte CURRENT_VERSION = 0;
+  private static final long PING_INTERVAL = 60000;
   
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.Client");
   private Hashtable<InetSocketAddress, Connection> connections =
     new Hashtable<InetSocketAddress, Connection>();
 
-  private Class valueClass;                       // class of call values
+  private Class<?> valueClass;                       // class of call values
   private int timeout;// timeout for calls
-  private int counter;                            // counter for call ids
+  private int counter = 0;                        // counter for call ids
   private boolean running = true;                 // true while client runs
   private Configuration conf;
   private int maxIdleTime; //connections will be culled if it was idle for 
@@ -78,40 +77,57 @@
   private class Call {
     int id;                                       // call id
     Writable param;                               // parameter
-    Writable value;                               // value, null if error
-    String error;                                 // exception, null if value
-    String errorClass;                            // class of exception
-    long lastActivity;                            // time of last i/o
-    boolean done;                                 // true when call is done
+    Writable value = null;                        // value, null if error
+    String error = null;                          // exception, null if value
+    String errorClass = null;                     // class of exception
+    boolean done = false;                         // true when call is done
+    IOException localException = null;
 
     protected Call(Writable param) {
       this.param = param;
       synchronized (Client.this) {
         this.id = counter++;
       }
-      touch();
+     }
+
+    synchronized void waitForCompletion() throws InterruptedException {
+      while (!done) {
+        wait();
+      }
     }
 
     /** Called by the connection thread when the call is complete and the
      * value or error string are available.  Notifies by default.  */
-    public synchronized void callComplete() {
+    synchronized void callComplete() {
+      done = true;
       notify();                                 // notify caller
     }
 
-    /** Update lastActivity with the current time. */
-    public synchronized void touch() {
-      lastActivity = System.currentTimeMillis();
+    /**
+     * Mark a local exception that made the call fail
+     * @param error The IOException that caused the problem
+     */
+    synchronized void setLocalError(IOException error) {
+      localException = error;
+      callComplete();
     }
 
-    /** Update lastActivity with the current time. */
-    public synchronized void setResult(Writable value, 
-                                       String errorClass,
-                                       String error) {
-      this.value = value;
+    /**
+     * Mark a remote exception that made the call fail
+     * @param errorClass the class name of the exception
+     * @param error the message from the exception
+     */
+    synchronized void setRemoteError(String errorClass, String error) {
       this.error = error;
-      this.errorClass =errorClass;
-      this.done = true;
+      this.errorClass = errorClass;
+      callComplete();
     }
+
+    /** Mark a nominal result to the call. */
+    synchronized void setResult(Writable value) {
+      this.value = value;
+      callComplete();
+    }
     
   }
 
@@ -125,11 +141,9 @@
     private DataOutputStream out;
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
-    private Call readingCall;
-    private Call writingCall;
     private int inUse = 0;
     private long lastActivity = 0;
-    private boolean shouldCloseConnection = false;
+    private IOException closingCause = null;
 
     public Connection(InetSocketAddress address) throws IOException {
       if (address.isUnresolved()) {
@@ -140,11 +154,8 @@
       this.setDaemon(true);
     }
 
-    public synchronized void setupIOstreams() throws IOException {
-      if (socket != null) {
-        notify();
-        return;
-      }
+    public synchronized void setupIOstreams() 
+                             throws IOException, InterruptedException {
       short failures = 0;
       while (true) {
         try {
@@ -155,7 +166,7 @@
           if (failures == maxRetries) {
             //reset inUse so that the culler gets a chance to throw this
             //connection object out of the table. We don't want to increment
-            //inUse to infinity (everytime getConnection is called inUse is
+            //inUse to infinity (every time getConnection is called inUse is
             //incremented)!
             inUse = 0;
             // set socket to null so that the next call to setupIOstreams
@@ -167,34 +178,13 @@
           failures++;
           LOG.info("Retrying connect to server: " + address + 
                    ". Already tried " + failures + " time(s).");
-          try { 
-            Thread.sleep(1000);
-          } catch (InterruptedException iex){
-          }
+          Thread.sleep(1000);
         }
       }
-      socket.setSoTimeout(timeout);
       this.in = new DataInputStream
-        (new BufferedInputStream
-         (new FilterInputStream(socket.getInputStream()) {
-             public int read(byte[] buf, int off, int len) throws IOException {
-               int value = super.read(buf, off, len);
-               if (readingCall != null) {
-                 readingCall.touch();
-               }
-               return value;
-             }
-           }));
+        (new BufferedInputStream(socket.getInputStream()));
       this.out = new DataOutputStream
-        (new BufferedOutputStream
-         (new FilterOutputStream(socket.getOutputStream()) {
-             public void write(byte[] buf, int o, int len) throws IOException {
-               out.write(buf, o, len);
-               if (writingCall != null) {
-                 writingCall.touch();
-               }
-             }
-           }));
+        (new BufferedOutputStream(socket.getOutputStream()));
       if (SEND_HEADER) {
         out.write(Server.HEADER.array());
         out.write(CURRENT_VERSION);
@@ -202,7 +192,7 @@
       notify();
     }
 
-    private synchronized boolean waitForWork() {
+    private synchronized boolean waitForWork() throws InterruptedException {
       //wait till someone signals us to start reading RPC response or
       //close the connection. If we are idle long enough (blocked in wait),
       //the ConnectionCuller thread will wake us up and ask us to close the
@@ -211,12 +201,10 @@
       //the Connection object has been created but the socket connection
       //has not been setup yet). We stop waiting if we have been asked to close
       //connection
-      while ((inUse == 0 || socket == null) && !shouldCloseConnection) {
-        try {
-          wait();
-        } catch (InterruptedException e) {}
+      while (socket == null && closingCause == null) {
+        wait();
       }
-      return !shouldCloseConnection;
+      return closingCause == null;
     }
 
     private synchronized void incrementRef() {
@@ -241,9 +229,17 @@
       return address;
     }
 
-    public void setCloseConnection() {
-      shouldCloseConnection = true;
+    public synchronized void setCloseConnection(IOException ie) {
+      closingCause = ie;
+      notify();
     }
+    
+    /**
+     * Is this connection being closed?
+     */
+    public synchronized boolean isClosing() {
+      return closingCause != null;
+    }
 
     public void run() {
       if (LOG.isDebugEnabled())
@@ -252,8 +248,9 @@
         while (running) {
           int id;
           //wait here for work - read connection or close connection
-          if (waitForWork() == false)
+          if (!waitForWork()) {
             break;
+          }
           try {
             id = in.readInt();                    // try to read an id
           } catch (SocketTimeoutException e) {
@@ -266,26 +263,24 @@
           Call call = calls.remove(id);
           boolean isError = in.readBoolean();     // read if error
           if (isError) {
-            call.setResult(null, WritableUtils.readString(in),
-                           WritableUtils.readString(in));
+            call.setRemoteError(WritableUtils.readString(in),
+                                WritableUtils.readString(in));
           } else {
             Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
-            try {
-              readingCall = call;
-              value.readFields(in);                 // read value
-            } finally {
-              readingCall = null;
-            }
-            call.setResult(value, null, null);
+            value.readFields(in);                 // read value
+            call.setResult(value);
           }
-          call.callComplete();                   // deliver result to caller
           //received the response. So decrement the ref count
           decrementRef();
         }
       } catch (EOFException eof) {
         // This is what happens when the remote side goes down
+        close(eof);
       } catch (Exception e) {
         LOG.info(StringUtils.stringifyException(e));
+        IOException ie = new IOException("rpc problem");
+        ie.initCause(ie);
+        close(ie);
       } finally {
         //If there was no exception thrown in this method, then the only
         //way we reached here is by breaking out of the while loop (after
@@ -293,12 +288,26 @@
         //already removed the connection object in the ConnectionCuller thread.
         //We don't want to remove this again as some other thread might have
         //actually put a new Connection object in the table in the meantime.
-        synchronized (connections) {
-          if (connections.get(address) == this) {
-            connections.remove(address);
-          }
+        close(closingCause);
+      }
+    }
+
+    /**
+     * Check to make sure the connection to the server is still up. Only sends
+     * pings if there are rpcs in flight and it has been 60 seconds since
+     * we had some activity on the connection.
+     */
+    public synchronized void sendPing() {
+      long now = System.currentTimeMillis();
+      if (out != null && inUse != 0 && now - lastActivity > PING_INTERVAL) {
+        try {
+          out.writeInt(-1);
+          out.close();
+          lastActivity = now;
+        } catch (IOException ie) {
+          setCloseConnection(ie);
+          LOG.info("RPC connection ping failed", ie);
         }
-        close();
       }
     }
 
@@ -306,51 +315,59 @@
      * Note: this is not called from the Connection thread, but by other
      * threads.
      */
-    public void sendParam(Call call) throws IOException {
-      boolean error = true;
+    public synchronized void sendParam(Call call) throws IOException {
+      if (isClosing()) {
+        throw new IOException("starting rpc call to closed connection");
+      }
+      calls.put(call.id, call);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getName() + " sending #" + call.id);
+      }
+      DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
+      //data to be written
+      d.writeInt(call.id);
+      call.param.write(d);
+      byte[] data = d.getData();
+      int dataLength = d.getLength();
+
       try {
-        calls.put(call.id, call);
-        synchronized (out) {
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + " sending #" + call.id);
-          try {
-            writingCall = call;
-            DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
-                                                         //data to be written
-            d.writeInt(call.id);
-            call.param.write(d);
-            byte[] data = d.getData();
-            int dataLength = d.getLength();
-
-            out.writeInt(dataLength);      //first put the data length
-            out.write(data, 0, dataLength);//write the data
-            out.flush();
-          } finally {
-            writingCall = null;
-          }
-        }
-        error = false;
-      } finally {
-        if (error) {
-          synchronized (connections) {
-            if (connections.get(address) == this)
-              connections.remove(address);
-          }
-          close();                                // close on error
-        }
+        out.writeInt(dataLength);      //first put the data length
+        out.write(data, 0, dataLength);//write the data
+        out.flush();
+        lastActivity = System.currentTimeMillis();
+      } catch (IOException ie) {
+        setCloseConnection(ie);
+        throw ie;
       }
     }  
 
     /** Close the connection. */
-    public void close() {
+    public synchronized void close(IOException cause) {
+      LOG.warn("closing ipc connection: " + address + " " + 
+               StringUtils.stringifyException(cause));
+      for(Call c: calls.values()) {
+        c.setLocalError(cause);
+      }
       //socket may be null if the connection could not be established to the
       //server in question, and the culler asked us to close the connection
-      if (socket == null) return;
-      try {
-        socket.close();                           // close socket
-      } catch (IOException e) {}
-      if (LOG.isDebugEnabled())
+      if (socket != null) {
+        if (in != null) {
+          try {
+            in.close();
+          } catch (IOException ie) {}
+        }
+        if (out != null) {
+          try {
+            out.close();
+          } catch (IOException ie) {}
+        }
+        try {
+          socket.close();                           // close socket
+        } catch (IOException e) {}
+      }
+      if (LOG.isDebugEnabled()) {
         LOG.debug(getName() + ": closing");
+      }
     }
   }
 
@@ -402,22 +419,26 @@
       while (running) {
         try {
           Thread.sleep(MIN_SLEEP_TIME);
-        } catch (InterruptedException ie) {}
+        } catch (InterruptedException ie) {
+          return;
+        }
 
         synchronized (connections) {
-          Iterator i = connections.values().iterator();
+          Iterator<Connection> i = connections.values().iterator();
           while (i.hasNext()) {
             Connection c = (Connection)i.next();
-            if (c.isIdle()) { 
-              //We don't actually close the socket here (i.e., don't invoke
-              //the close() method). We leave that work to the response receiver
-              //thread. The reason for that is since we have taken a lock on the
-              //connections table object, we don't want to slow down the entire
-              //system if we happen to talk to a slow server.
-              i.remove();
-              synchronized (c) {
-                c.setCloseConnection();
-                c.notify();
+            synchronized (c) {
+              if (c.isIdle()) {
+                //We don't actually close the socket here (i.e., don't invoke
+                //the close() method). We leave that work to the response receiver
+                //thread. The reason for that is since we have taken a lock on the
+                //connections table object, we don't want to slow down the entire
+                //system if we happen to talk to a slow server.
+                i.remove();
+                c.setCloseConnection(new SocketTimeoutException
+                                     ("idle connection"));                 
+              } else {
+                c.sendPing();
               }
             }
           }
@@ -428,7 +449,7 @@
 
   /** Construct an IPC client whose values are of the given {@link Writable}
    * class. */
-  public Client(Class valueClass, Configuration conf, 
+  public Client(Class<?> valueClass, Configuration conf, 
       SocketFactory factory) {
     this.valueClass = valueClass;
     this.timeout = conf.getInt("ipc.client.timeout", 10000);
@@ -460,9 +481,6 @@
     running = false;
   }
 
-  /** Sets the timeout used for network i/o. */
-  public void setTimeout(int timeout) { this.timeout = timeout; }
-
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception. */
@@ -472,16 +490,14 @@
     Call call = new Call(param);
     synchronized (call) {
       connection.sendParam(call);                 // send the parameter
-      long wait = timeout;
-      do {
-        call.wait(wait);                       // wait for the result
-        wait = timeout - (System.currentTimeMillis() - call.lastActivity);
-      } while (!call.done && wait > 0);
+      call.waitForCompletion();                   // wait for the result
 
       if (call.error != null) {
         throw new RemoteException(call.errorClass, call.error);
-      } else if (!call.done) {
-        throw new SocketTimeoutException("timed out waiting for rpc response");
+      } else if (call.localException != null) {
+        IOException err = new IOException("rpc problem");
+        err.initCause(call.localException);
+        throw err;
       } else {
         return call.value;
       }
@@ -493,7 +509,7 @@
    * or errored, the collected results are returned in an array.  The array
    * contains nulls for calls that timed out or errored.  */
   public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
-    throws IOException {
+    throws IOException,InterruptedException {
     if (addresses.length == 0) return new Writable[0];
 
     ParallelResults results = new ParallelResults(params.length);
@@ -509,9 +525,7 @@
           results.size--;                         //  wait for one fewer result
         }
       }
-      try {
-        results.wait(timeout);                    // wait for all results
-      } catch (InterruptedException e) {}
+      results.wait(timeout);                    // wait for all results
 
       if (results.count == 0) {
         throw new IOException("no responses");
@@ -524,14 +538,16 @@
   /** Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given host/port are reused. */
   private Connection getConnection(InetSocketAddress address)
-    throws IOException {
+    throws IOException,InterruptedException {
     Connection connection;
+    boolean newConnection = false;
     synchronized (connections) {
       connection = connections.get(address);
-      if (connection == null) {
+      if (connection == null || connection.isClosing()) {
         connection = new Connection(address);
         connections.put(address, connection);
         connection.start();
+        newConnection = true;
       }
       connection.incrementRef();
     }
@@ -539,7 +555,9 @@
     //block above. The reason for that is if the server happens to be slow,
     //it will take longer to establish a connection and that will slow the
     //entire system down.
-    connection.setupIOstreams();
+    if (newConnection) {
+      connection.setupIOstreams();
+    }
     return connection;
   }
 
