Index: java/org/apache/commons/httpclient/MultiThreadedHttpConnectionManager.java =================================================================== RCS file: /home/cvspublic/jakarta-commons/httpclient/src/java/org/apache/commons/httpclient/MultiThreadedHttpConnectionManager.java,v retrieving revision 1.12 diff -u -r1.12 MultiThreadedHttpConnectionManager.java --- java/org/apache/commons/httpclient/MultiThreadedHttpConnectionManager.java 6 Mar 2003 07:49:03 -0000 1.12 +++ java/org/apache/commons/httpclient/MultiThreadedHttpConnectionManager.java 2 Apr 2003 04:23:38 -0000 @@ -85,6 +85,7 @@ * @author Michael Becke * @author Eric Johnson * @author Mike Bowler + * @author Carl A. Dunham * * @since 2.0 */ @@ -95,14 +96,15 @@ private static final Log LOG = LogFactory.getLog(MultiThreadedHttpConnectionManager.class); // ----------------------------------------------------- Instance Variables - /** - * Map where keys are {@link HostConfiguration}s and values are {@link - * HostConnectionPool}s - */ - private final Map mapHosts = new HashMap(); - /** Maximum number of connections allowed */ - private int maxConnections = 2; // Per RFC 2616 sec 8.1.4 + /** Maximum number of connections allowed per host */ + private int maxHostConnections = 2; // Per RFC 2616 sec 8.1.4 + + /** Maximum number of connections allowed overall */ + private int maxTotalConnections = 20; + + /** Connection Pool */ + private ConnectionPool connectionPool; /** mapping from reference to hostConfiguration */ private Map referenceToHostConfig; @@ -119,6 +121,8 @@ public MultiThreadedHttpConnectionManager() { this.referenceToHostConfig = Collections.synchronizedMap(new HashMap()); + this.connectionPool = new ConnectionPool(); + this.referenceQueue = new ReferenceQueue(); new ReferenceQueueThread().start(); @@ -129,11 +133,11 @@ * Sets the maximum number of connections allowed for a given * HostConfiguration. Per RFC 2616 section 8.1.4, this value defaults to 2. * - * @param maxConnections the number of connections allowed for each + * @param maxHostConnections the number of connections allowed for each * hostConfiguration */ - public void setMaxConnectionsPerHost(int maxConnections) { - this.maxConnections = maxConnections; + public void setMaxConnectionsPerHost(int maxHostConnections) { + this.maxHostConnections = maxHostConnections; } /** @@ -144,7 +148,25 @@ * hostConfiguration. */ public int getMaxConnectionsPerHost() { - return maxConnections; + return maxHostConnections; + } + + /** + * Sets the maximum number of connections allowed in the system. + * + * @param maxTotalConnections the maximum number of connections allowed + */ + public void setMaxTotalConnections(int maxTotalConnections) { + this.maxTotalConnections = maxTotalConnections; + } + + /** + * Gets the maximum number of connections allowed in the system. + * + * @return The maximum number of connections allowed + */ + public int getMaxTotalConnections() { + return maxTotalConnections; } /** @@ -182,12 +204,7 @@ + hostConfiguration + ", timeout = " + timeout); } - // we get the connection pool with a clone of the hostConfiguration - // so that it cannot be changed once the connecton has been retrieved - final HttpConnection conn - = getConnection(getConnectionPool(new HostConfiguration(hostConfiguration)), - hostConfiguration, timeout - ); + final HttpConnection conn = doGetConnection(hostConfiguration, timeout); // wrap the connection in an adapter so we can ensure it is used // only once @@ -197,9 +214,9 @@ /** * Gets a connection or waits if one is not available. A connection is * available if one exists that is not being used or if fewer than - * maxConnections have been created in the connectionPool. + * maxHostConnections have been created in the connectionPool, and fewer + * than maxTotalConnections have been created in all connectionPools. * - * @param connectionPool The connection pool to use. * @param hostConfiguration The host configuration. * @param timeout the number of milliseconds to wait for a connection, 0 to * wait indefinitely @@ -209,97 +226,119 @@ * @throws HttpException if a connection does not become available in * 'timeout' milliseconds */ - private HttpConnection getConnection(HostConnectionPool connectionPool, - HostConfiguration hostConfiguration, long timeout) throws HttpException { + private HttpConnection doGetConnection(HostConfiguration hostConfiguration, + long timeout) throws HttpException { HttpConnection connection = null; synchronized (connectionPool) { - // keep trying until a connection is available, should happen at - // most twice + // we clone the hostConfiguration + // so that it cannot be changed once the connection has been retrieved + + hostConfiguration = new HostConfiguration(hostConfiguration); + HostConnectionPool hostPool = connectionPool.getHostPool(hostConfiguration); + WaitingThread waitingThread = null; + + boolean useTimeout = (timeout > 0); + long timeToWait = timeout; + long startWait = 0; + long endWait = 0; + while (connection == null) { - if (connectionPool.freeConnections.size() > 0) { - connection = (HttpConnection) connectionPool - .freeConnections.removeFirst(); + // happen to have a free connection with the right specs + // + if (hostPool.freeConnections.size() > 0) { + connection = connectionPool.getFreeConnection(hostConfiguration); + + // have room to make more + // + } else if ((hostPool.numConnections < maxHostConnections) + && (connectionPool.numConnections < maxTotalConnections)) { + + connection = connectionPool.createConnection(hostConfiguration); + + // have room to add host connection, and there is at least one free + // connection that can be liberated to make overall room + // + } else if ((hostPool.numConnections < maxHostConnections) + && (connectionPool.freeConnections.size() > 0)) { + + connectionPool.deleteLeastUsedConnection(); + connection = connectionPool.createConnection(hostConfiguration); + + // otherwise, we have to wait for one of the above conditions to + // become true + // } else { - // get number of connections hostConfig - if (connectionPool.numConnections < maxConnections) { - // Create a new connection - connection = new HttpConnection(hostConfiguration); - connection.setHttpConnectionManager(this); - connectionPool.numConnections++; - - // add a weak reference to this connection - referenceToHostConfig.put(new WeakReference(connection, referenceQueue), - hostConfiguration); - - } else { - - TimeoutThread threadTimeout = new TimeoutThread(); - threadTimeout.setTimeout(timeout); - threadTimeout.setWakeupThread(Thread.currentThread()); - threadTimeout.start(); - - try { - LOG.debug( - "HttpConnectionManager.getConnection: waiting for " - + "connection from " + connectionPool - ); - connectionPool.wait(); - // we were woken up before the timeout occurred, so - // there should be a connection available - threadTimeout.interrupt(); - } catch (InterruptedException e) { - throw new HttpException("Timeout waiting for connection."); - } + // todo: keep track of which hostConfigurations have waiting + // threads, so they avoid being sacrificed before necessary + try { + + if (useTimeout && timeToWait <= 0) { + throw new HttpException("Timeout waiting for connection"); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for a connection "); + } + + if (waitingThread == null) { + waitingThread = new WaitingThread(); + waitingThread.hostConnectionPool = hostPool; + waitingThread.thread = Thread.currentThread(); + } + + if (useTimeout) { + startWait = System.currentTimeMillis(); + } + + hostPool.waitingThreads.addLast(waitingThread); + connectionPool.waitingThreads.addLast(waitingThread); + connectionPool.wait(timeToWait); + + // we have not been interrupted so we need to remove ourselves from the + // wait queue + hostPool.waitingThreads.remove(waitingThread); + connectionPool.waitingThreads.remove(waitingThread); + } catch (InterruptedException e) { + // do nothing + } finally { + if (useTimeout) { + endWait = System.currentTimeMillis(); + timeToWait -= (endWait - startWait); + } } } } - } - return connection; } /** - * Get the pool (list) of connections available for the given hostConfig. + * Gets the number of connections in use for this configuration. * - * @param hostConfiguration the configuraton for the connection pool - * @return a pool (list) of connections available for the given config + * @param hostConfiguration the key that connections are tracked on + * @return the number of connections in use */ - private HostConnectionPool getConnectionPool(HostConfiguration hostConfiguration) { - LOG.trace("enter HttpConnectionManager.getConnections(String)"); - - // Look for a list of connections for the given config - HostConnectionPool listConnections = null; - synchronized (mapHosts) { - listConnections = (HostConnectionPool) mapHosts.get(hostConfiguration); - if (listConnections == null) { - // First time for this config - listConnections = new HostConnectionPool(); - mapHosts.put(hostConfiguration, listConnections); - } + public int getConnectionsInUse(HostConfiguration hostConfiguration) { + synchronized (connectionPool) { + HostConnectionPool hostPool = connectionPool.getHostPool(hostConfiguration); + return hostPool.numConnections; } - return listConnections; } /** - * Get the number of connections in use for this configuration. - * - * @param hostConfiguration the key that connections are tracked on - * @return the number of connections in use + * Gets the total number of connections in use. + * + * @return the total number of connections in use */ - public int getConnectionsInUse(HostConfiguration hostConfiguration) { - LOG.trace("enter HttpConnectionManager.getConnectionsInUse(String)"); - - HostConnectionPool connectionPool = getConnectionPool(hostConfiguration); + public int getConnectionsInUse() { synchronized (connectionPool) { return connectionPool.numConnections; } - } /** @@ -323,43 +362,247 @@ // make sure that the response has been read. SimpleHttpConnectionManager.finishLastResponse(conn); + connectionPool.freeConnection(conn); + } + + /** + * Gets the host configuration for a connection. + * @param conn the connection to get the configuration of + * @return a new HostConfiguration + */ + private HostConfiguration configurationForConnection(HttpConnection conn) { + HostConfiguration connectionConfiguration = new HostConfiguration(); connectionConfiguration.setHost(conn.getHost(), - conn.getPort(), conn.getProtocol()); + conn.getPort(), conn.getProtocol()); if (conn.getProxyHost() != null) { connectionConfiguration.setProxy(conn.getProxyHost(), conn.getProxyPort()); } - if (LOG.isDebugEnabled()) { - LOG.debug("HttpConnectionManager.releaseConnection: Release connection for " - + connectionConfiguration); + return connectionConfiguration; + } + + + /** + * Global Connection Pool, including per-host pools + */ + private class ConnectionPool { + + /** The list of free connections */ + private LinkedList freeConnections = new LinkedList(); + + /** The list of WaitingThreads waiting for a connection */ + private LinkedList waitingThreads = new LinkedList(); + + /** + * Map where keys are {@link HostConfiguration}s and values are {@link + * HostConnectionPool}s + */ + private final Map mapHosts = new HashMap(); + + /** The number of created connections */ + private int numConnections = 0; + + /** + * Creates a new connection and returns is for use of the calling method. + * + * @param hostConfiguration the configuration for the connection + * @return a new connection or null if none are available + */ + private synchronized HttpConnection createConnection(HostConfiguration hostConfiguration) { + HttpConnection connection = null; + + HostConnectionPool hostPool = getHostPool(hostConfiguration); + + if ((hostPool.numConnections < maxHostConnections) + && (numConnections < maxTotalConnections)) { + + connection = new HttpConnection(hostConfiguration); + connection.setHttpConnectionManager(MultiThreadedHttpConnectionManager.this); + connectionPool.numConnections++; + hostPool.numConnections++; + + // add a weak reference to this connection + referenceToHostConfig.put(new WeakReference(connection, referenceQueue), + hostConfiguration); + } + return connection; } + + /** + * Get the pool (list) of connections available for the given hostConfig. + * + * @param hostConfiguration the configuraton for the connection pool + * @return a pool (list) of connections available for the given config + */ + public synchronized HostConnectionPool getHostPool(HostConfiguration hostConfiguration) { + LOG.trace("enter HttpConnectionManager.ConnectionPool.getHostPool(HostConfiguration)"); - final HostConnectionPool listConnections = getConnectionPool(connectionConfiguration); - synchronized (listConnections) { - // Put the connect back in the available list and notify a waiter - listConnections.freeConnections.addFirst(conn); - if (listConnections.numConnections == 0) { - // for some reason this connection pool didn't already exist - LOG.error("connection pool not found for: " - + connectionConfiguration); - listConnections.numConnections = 1; + // Look for a list of connections for the given config + HostConnectionPool listConnections = (HostConnectionPool) + mapHosts.get(hostConfiguration); + if (listConnections == null) { + // First time for this config + listConnections = new HostConnectionPool(); + mapHosts.put(hostConfiguration, listConnections); } - listConnections.notify(); + + return listConnections; + } + + /** + * If available, get a free connection for this host + * + * @param hostConfiguration the configuraton for the connection pool + * @return an available connection for the given config + */ + public synchronized HttpConnection getFreeConnection(HostConfiguration hostConfiguration) { + + HttpConnection connection = null; + + HostConnectionPool hostPool = getHostPool(hostConfiguration); + + if (hostPool.freeConnections.size() > 0) { + connection = (HttpConnection) hostPool.freeConnections.removeFirst(); + freeConnections.remove(connection); + } + return connection; + } + + /** + * Close and delete an old, unused connection to make room for a new one. + */ + public synchronized void deleteLeastUsedConnection() { + + HttpConnection connection = (HttpConnection) freeConnections.removeFirst(); + + if (connection != null) { + HostConfiguration connectionConfiguration = configurationForConnection(connection); + + if (LOG.isDebugEnabled()) { + LOG.debug("Reclaiming unused connection for hostConfig: " + + connectionConfiguration); + } + + connection.close(); + + HostConnectionPool hostPool = getHostPool(connectionConfiguration); + + hostPool.freeConnections.remove(connection); + hostPool.numConnections--; + numConnections--; + } + } + + /** + * Notifies a waiting thread that a connection for the given configuration is + * available. + * @param configuration the host config to use for notifying + * @see #notifyWaitingThread(HostConnectionPool) + */ + public synchronized void notifyWaitingThread(HostConfiguration configuration) { + notifyWaitingThread(getHostPool(configuration)); + } + + /** + * Notifies a waiting thread that a connection for the given configuration is + * available. This will wake a thread witing in tis hostPool or if there is not + * one a thread in the ConnectionPool will be notified. + * + * @param hostPool the host pool to use for notifying + */ + public synchronized void notifyWaitingThread(HostConnectionPool hostPool) { + + // find the thread we are going to notify, we want to ensure that each + // waiting thread is only interrupted once so we will remove it from + // all wait queues before interrupting it + WaitingThread waitingThread = null; + + if (hostPool.waitingThreads.size() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Notifying thread waiting on hostPool"); + } + waitingThread = (WaitingThread) hostPool.waitingThreads.removeFirst(); + waitingThreads.remove(waitingThread); + } else if (waitingThreads.size() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Notifying next waiting thread"); + } + waitingThread = (WaitingThread) waitingThreads.removeFirst(); + waitingThread.hostConnectionPool.waitingThreads.remove(waitingThread); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Notifying no-one, there are no waiting threads"); + } + + if (waitingThread != null) { + waitingThread.thread.interrupt(); + } + } + + /** + * Marks the given connection as free. + * @param conn a connection that is no longer being used + */ + public void freeConnection(HttpConnection conn) { + + HostConfiguration connectionConfiguration = configurationForConnection(conn); + + if (LOG.isDebugEnabled()) { + LOG.debug("Freeing connection: " + conn); + } + + synchronized (this) { + HostConnectionPool hostPool = getHostPool(connectionConfiguration); + + // Put the connect back in the available list and notify a waiter + hostPool.freeConnections.add(conn); + if (hostPool.numConnections == 0) { + // for some reason this connection pool didn't already exist + LOG.error("host connection pool not found for: " + + connectionConfiguration); + hostPool.numConnections = 1; + } + + freeConnections.add(conn); + if (numConnections == 0) { + // for some reason this connection pool didn't already exist + LOG.error("connection pool not found for: " + + connectionConfiguration); + numConnections = 1; + } + + notifyWaitingThread(hostPool); + } + } } /** - * A simple struct-link class to combine the connection list and the count + * A simple struct-like class to combine the connection list and the count * of created connections. */ private class HostConnectionPool { /** The list of free connections */ private LinkedList freeConnections = new LinkedList(); + + /** The list of WaitingThreads for this host */ + private LinkedList waitingThreads = new LinkedList(); /** The number of created connections */ private int numConnections = 0; } + + /** + * A simple struct-like class to combine the waiting thread and the connection + * pool it is waiting on. + */ + private class WaitingThread { + /** The thread that is waiting for a connection */ + private Thread thread; + + /** The connection pool the thread is waiting for */ + private HostConnectionPool hostConnectionPool; + } /** * A thread for listening for HttpConnections reclaimed by the garbage @@ -385,13 +628,14 @@ Reference ref = referenceQueue.remove(); if (ref != null) { - HostConfiguration config = (HostConfiguration) - referenceToHostConfig.get(ref); - referenceToHostConfig.remove(ref); - HostConnectionPool connectionPool = getConnectionPool(config); synchronized (connectionPool) { + HostConfiguration config = (HostConfiguration) + referenceToHostConfig.get(ref); + referenceToHostConfig.remove(ref); + HostConnectionPool hostPool = connectionPool.getHostPool(config); + hostPool.numConnections--; connectionPool.numConnections--; - connectionPool.notify(); + connectionPool.notifyWaitingThread(config); } } } catch (InterruptedException e) { @@ -402,76 +646,6 @@ } - } - - /** - * In getConnection, if the maximum number of connections has already been - * reached the call will block. This class is used to help provide a - * timeout facility for this wait. Because Java does not provide a way to - * determine if wait() returned due to a notify() or a timeout, we need an - * outside mechanism to interrupt the waiting thread after the specified - * timeout interval. - */ - private static class TimeoutThread extends Thread { - - /** The timeout in milliseconds. */ - private long timeout = 0; - - /** The thread that will be woken up after the specified timeout. */ - private Thread wakeupThread = null; - - /** - * Set the timeout - * @param timeout The timeout in milliseconds. - */ - public void setTimeout(long timeout) { - this.timeout = timeout; - } - - /** - * Return the timeout value in milliseconds. - * @return long The timeout. - */ - public long getTimeout() { - return timeout; - } - - /** - * Set the thread that will be woken up after the specified timeout. - * @param newWakeupThread The thread to be woken. - */ - public void setWakeupThread(Thread newWakeupThread) { - this.wakeupThread = newWakeupThread; - } - - /** - * Return the thread that will be woken up after the specified timeout. - * @return Thread The thread to be woken. - */ - public Thread getWakeupThread() { - return wakeupThread; - } - - /** - * Start execution. - */ - public void run() { - LOG.trace("TimeoutThread.run()"); - if (timeout == 0) { - return; - } - if (wakeupThread == null) { - return; - } - - try { - sleep(timeout); - wakeupThread.interrupt(); - } catch (InterruptedException e) { - LOG.debug("InterruptedException caught as expected"); - // This is expected - } - } } /** Index: test/org/apache/commons/httpclient/TestHttpConnectionManager.java =================================================================== RCS file: /home/cvspublic/jakarta-commons/httpclient/src/test/org/apache/commons/httpclient/TestHttpConnectionManager.java,v retrieving revision 1.6 diff -u -r1.6 TestHttpConnectionManager.java --- test/org/apache/commons/httpclient/TestHttpConnectionManager.java 5 Mar 2003 04:02:56 -0000 1.6 +++ test/org/apache/commons/httpclient/TestHttpConnectionManager.java 2 Apr 2003 04:23:39 -0000 @@ -221,6 +221,124 @@ } + /** + * Tests the MultiThreadedHttpConnectionManager's ability to reclaim unused + * connections. + */ + public void testConnectionReclaiming() { + + MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager(); + connectionManager.setMaxConnectionsPerHost(1); + connectionManager.setMaxTotalConnections(1); + + HostConfiguration host1 = new HostConfiguration(); + host1.setHost("host1", -1, "http"); + + HostConfiguration host2 = new HostConfiguration(); + host2.setHost("host2", -1, "http"); + + HttpConnection connection = connectionManager.getConnection(host1); + // now release this connection + connection.releaseConnection(); + connection = null; + + try { + // the connection from host1 should be reclaimed + connection = connectionManager.getConnection(host2, 100); + } catch (HttpException e) { + e.printStackTrace(); + fail("a httpConnection should have been available: " + e); + } + } + + /** + * Tests the MultiThreadedHttpConnectionManager's ability to restrict the maximum number + * of connections. + */ + public void testMaxConnections() { + + MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager(); + connectionManager.setMaxConnectionsPerHost(1); + connectionManager.setMaxTotalConnections(2); + + HostConfiguration host1 = new HostConfiguration(); + host1.setHost("host1", -1, "http"); + + HostConfiguration host2 = new HostConfiguration(); + host2.setHost("host2", -1, "http"); + + HttpConnection connection1 = connectionManager.getConnection(host1); + HttpConnection connection2 = connectionManager.getConnection(host2); + + try { + // this should fail quickly since the connection has not been released + connectionManager.getConnection(host2, 100); + fail("a httpConnection should not be available"); + } catch (HttpException e) { + // this should throw an exception + } + + // release one of the connections + connection2.releaseConnection(); + connection2 = null; + + try { + // there should be a connection available now + connection2 = connectionManager.getConnection(host2, 100); + } catch (HttpException e) { + e.printStackTrace(); + fail("a httpConnection should have been available: " + e); + } + } + + public void testHostReusePreference() { + + final MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager(); + connectionManager.setMaxConnectionsPerHost(1); + connectionManager.setMaxTotalConnections(1); + + final HostConfiguration host1 = new HostConfiguration(); + host1.setHost("host1", -1, "http"); + + final HostConfiguration host2 = new HostConfiguration(); + host2.setHost("host2", -1, "http"); + + HttpConnection connection = connectionManager.getConnection(host1); + + GetConnectionThread getHost1 = new GetConnectionThread(host1, connectionManager, 200); + GetConnectionThread getHost2 = new GetConnectionThread(host2, connectionManager, 200); + + getHost2.start(); + getHost1.start(); + + // give the threads some time to startup + try { + Thread.sleep(100); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + + // after the connection to host1 is released it should be given to getHost1 + connection.releaseConnection(); + connection = null; + + try { + getHost1.join(); + getHost2.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + assertNotSame( + "Connection should have been given to someone", + getHost1.getConnection(), + getHost2.getConnection() + ); + assertNotNull("Connection should have been given to host1", getHost1.getConnection()); + assertNull("Connection should NOT have been given to host2", getHost2.getConnection()); + + } + public void testMaxConnectionsPerServer() { MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager(); @@ -324,12 +442,43 @@ HttpConnection conn1 = mgr.getConnection(hostConfig); HttpConnection conn2 = mgr.getConnection(hostConfig); + HttpConnection conn3 = mgr.getConnection(hostConfig, 1000); fail("Expected an HttpException."); }catch(HttpException e){ //Expected result } + } + + static class GetConnectionThread extends Thread { + + private HostConfiguration hostConfiguration; + private MultiThreadedHttpConnectionManager connectionManager; + private HttpConnection connection; + private long timeout; + + public GetConnectionThread( + HostConfiguration hostConfiguration, + MultiThreadedHttpConnectionManager connectionManager, + long timeout + ) { + this.hostConfiguration = hostConfiguration; + this.connectionManager = connectionManager; + this.timeout = timeout; + } + + public void run() { + try { + connection = connectionManager.getConnection(hostConfiguration, timeout); + } catch (HttpException e) { + } + } + + public HttpConnection getConnection() { + return connection; + } + } }