Index: C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMWithServer.java =================================================================== --- C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMWithServer.java (revision 658286) +++ C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMWithServer.java (working copy) @@ -31,8 +31,15 @@ package org.apache.http.impl.conn; +import java.io.IOException; import java.lang.ref.WeakReference; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.Test; import junit.framework.TestSuite; @@ -42,21 +49,25 @@ import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.HttpVersion; -import org.apache.http.conn.ConnectionPoolTimeoutException; -import org.apache.http.conn.routing.HttpRoute; -import org.apache.http.conn.scheme.SchemeRegistry; import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.ClientConnectionRequest; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.http.conn.ManagedClientConnection; import org.apache.http.conn.params.ConnPerRouteBean; import org.apache.http.conn.params.HttpConnectionManagerParams; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.conn.scheme.PlainSocketFactory; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.scheme.SchemeRegistry; +import org.apache.http.conn.scheme.SocketFactory; +import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; import org.apache.http.localserver.ServerTestBase; import org.apache.http.message.BasicHttpRequest; import org.apache.http.params.HttpParams; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.protocol.ExecutionContext; import org.apache.http.util.EntityUtils; -import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; /** @@ -440,6 +451,260 @@ assertNull("TSCCM not garbage collected", wref.get()); } + + public void testAbortDuringConnecting() throws Exception { + HttpParams mgrpar = defaultParams.copy(); + HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1); + + final CountDownLatch connectLatch = new CountDownLatch(1); + final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, PlainSocketFactory.getSocketFactory()); + stallingSocketFactory.setWaitPolicy(WaitPolicy.BEFORE_CONNECT); + Scheme scheme = new Scheme("http", stallingSocketFactory, 80); + SchemeRegistry registry = new SchemeRegistry(); + registry.register(scheme); + ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry); + final HttpHost target = getServerHttp(); + final HttpRoute route = new HttpRoute(target, null, false); + + final ManagedClientConnection conn = getConnection(mgr, route); + assertTrue(conn instanceof AbstractClientConnAdapter); + + final AtomicReference throwRef = new AtomicReference(); + Thread abortingThread = new Thread(new Runnable() { + public void run() { + try { + stallingSocketFactory.waitForState(); + conn.abortConnection(); + connectLatch.countDown(); + } catch (Throwable e) { + throwRef.set(e); + } + } + }); + abortingThread.start(); + + try { + conn.open(route, httpContext, defaultParams); + fail("expected SocketException"); + } catch(SocketException expected) {} + + abortingThread.join(5000); + if(throwRef.get() != null) + throw new RuntimeException(throwRef.get()); + + assertFalse(conn.isOpen()); + + // check that there are no connections available + try { + // this should fail quickly, connection has not been released + getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS); + fail("ConnectionPoolTimeoutException should have been thrown"); + } catch (ConnectionPoolTimeoutException e) { + // expected + } + + // return it back to the manager + ((AbstractClientConnAdapter) conn).releaseConnection(); + + // the connection is expected to be released back to the manager + ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS); + assertFalse("connection should have been closed", conn2.isOpen()); + + mgr.releaseConnection(conn2); + mgr.shutdown(); + } + + public void testAbortBeforeSocketCreate() throws Exception { + HttpParams mgrpar = defaultParams.copy(); + HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1); + + final CountDownLatch connectLatch = new CountDownLatch(1); + final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, PlainSocketFactory.getSocketFactory()); + stallingSocketFactory.setWaitPolicy(WaitPolicy.BEFORE_CREATE); + Scheme scheme = new Scheme("http", stallingSocketFactory, 80); + SchemeRegistry registry = new SchemeRegistry(); + registry.register(scheme); + + ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry); + + final HttpHost target = getServerHttp(); + final HttpRoute route = new HttpRoute(target, null, false); + + final ManagedClientConnection conn = getConnection(mgr, route); + assertTrue(conn instanceof AbstractClientConnAdapter); + + final AtomicReference throwRef = new AtomicReference(); + Thread abortingThread = new Thread(new Runnable() { + public void run() { + try { + stallingSocketFactory.waitForState(); + conn.abortConnection(); + connectLatch.countDown(); + } catch (Throwable e) { + throwRef.set(e); + } + } + }); + abortingThread.start(); + + try { + conn.open(route, httpContext, defaultParams); + fail("expected SocketException"); + } catch(SocketException expected) {} + + abortingThread.join(5000); + if(throwRef.get() != null) + throw new RuntimeException(throwRef.get()); + + assertFalse(conn.isOpen()); + + // check that there are no connections available + try { + // this should fail quickly, connection has not been released + getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS); + fail("ConnectionPoolTimeoutException should have been thrown"); + } catch (ConnectionPoolTimeoutException e) { + // expected + } + + // return it back to the manager + ((AbstractClientConnAdapter) conn).releaseConnection(); + + // the connection is expected to be released back to the manager + ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS); + assertFalse("connection should have been closed", conn2.isOpen()); + + mgr.releaseConnection(conn2); + mgr.shutdown(); + } + + public void testAbortAfterSocketConnect() throws Exception { + HttpParams mgrpar = defaultParams.copy(); + HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1); + + final CountDownLatch connectLatch = new CountDownLatch(1); + final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, PlainSocketFactory.getSocketFactory()); + stallingSocketFactory.setWaitPolicy(WaitPolicy.AFTER_CONNECT); + Scheme scheme = new Scheme("http", stallingSocketFactory, 80); + SchemeRegistry registry = new SchemeRegistry(); + registry.register(scheme); + + ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry); + + final HttpHost target = getServerHttp(); + final HttpRoute route = new HttpRoute(target, null, false); + + final ManagedClientConnection conn = getConnection(mgr, route); + assertTrue(conn instanceof AbstractClientConnAdapter); + + final AtomicReference throwRef = new AtomicReference(); + Thread abortingThread = new Thread(new Runnable() { + public void run() { + try { + stallingSocketFactory.waitForState(); + conn.abortConnection(); + connectLatch.countDown(); + } catch (Throwable e) { + throwRef.set(e); + } + } + }); + abortingThread.start(); + + try { + conn.open(route, httpContext, defaultParams); + fail("expected SocketException"); + } catch(SocketException expected) {} + + abortingThread.join(5000); + if(throwRef.get() != null) + throw new RuntimeException(throwRef.get()); + + assertFalse(conn.isOpen()); + + // check that there are no connections available + try { + // this should fail quickly, connection has not been released + getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS); + fail("ConnectionPoolTimeoutException should have been thrown"); + } catch (ConnectionPoolTimeoutException e) { + // expected + } + + // return it back to the manager + ((AbstractClientConnAdapter) conn).releaseConnection(); + + // the connection is expected to be released back to the manager + ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS); + assertFalse("connection should have been closed", conn2.isOpen()); + + mgr.releaseConnection(conn2); + mgr.shutdown(); + } + + + private static class StallingSocketFactory implements SocketFactory { + private final CountDownLatch connectContinueLatch; + private final CountDownLatch waitingForConnectLatch = new CountDownLatch(1); + private final SocketFactory delegate; + private volatile WaitPolicy waitPolicy; + + public StallingSocketFactory(CountDownLatch connectContinueLatch, + SocketFactory delegate) { + this.connectContinueLatch = connectContinueLatch; + this.delegate = delegate; + } + + void setWaitPolicy(WaitPolicy waitPolicy) { + this.waitPolicy = waitPolicy; + } + + void waitForState() throws InterruptedException { + if(!waitingForConnectLatch.await(1, TimeUnit.SECONDS)) + throw new RuntimeException("waited too long"); + } + + public Socket connectSocket(Socket sock, String host, int port, + InetAddress localAddress, int localPort, HttpParams params) + throws IOException, UnknownHostException, + ConnectTimeoutException { + if(waitPolicy == WaitPolicy.BEFORE_CONNECT) + latch(); + + Socket socket = delegate.connectSocket(sock, host, port, localAddress, + localPort, params); + + if(waitPolicy == WaitPolicy.AFTER_CONNECT) + latch(); + + return socket; + } + + public Socket createSocket() throws IOException { + if(waitPolicy == WaitPolicy.BEFORE_CREATE) + latch(); + + return delegate.createSocket(); + } + + public boolean isSecure(Socket sock) throws IllegalArgumentException { + return delegate.isSecure(sock); + } + + private void latch() { + waitingForConnectLatch.countDown(); + try { + if (!connectContinueLatch.await(1, TimeUnit.SECONDS)) + throw new RuntimeException("waited too long!"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private enum WaitPolicy { BEFORE_CREATE, BEFORE_CONNECT, AFTER_CONNECT } + + } // class TestTSCCMWithServer Index: C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMNoServer.java =================================================================== --- C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMNoServer.java (revision 658286) +++ C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMNoServer.java (working copy) @@ -30,6 +30,11 @@ package org.apache.http.impl.conn; +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import junit.framework.Test; @@ -40,6 +45,7 @@ import org.apache.http.HttpVersion; import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.ClientConnectionRequest; +import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.http.conn.ManagedClientConnection; import org.apache.http.conn.params.ConnPerRouteBean; Index: C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/AbstractPoolEntry.java =================================================================== --- C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/AbstractPoolEntry.java (revision 658286) +++ C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/AbstractPoolEntry.java (working copy) @@ -72,7 +72,7 @@ protected final ClientConnectionOperator connOperator; /** The underlying connection being pooled or used. */ - protected volatile OperatedClientConnection connection; + protected final OperatedClientConnection connection; /** The route for which this entry gets allocated. */ //@@@ currently accessed from connection manager(s) as attribute Index: C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/DefaultClientConnection.java =================================================================== --- C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/DefaultClientConnection.java (revision 658286) +++ C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/DefaultClientConnection.java (working copy) @@ -102,7 +102,7 @@ } - public void opening(Socket sock, HttpHost target) { + public void opening(Socket sock, HttpHost target) throws IOException { assertNotOpen(); this.socket = sock; this.targetHost = target;