Index: C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMWithServer.java =================================================================== --- C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMWithServer.java (revision 658286) +++ C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMWithServer.java (working copy) @@ -31,8 +31,15 @@ package org.apache.http.impl.conn; +import java.io.IOException; import java.lang.ref.WeakReference; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.Test; import junit.framework.TestSuite; @@ -42,21 +49,28 @@ import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.HttpVersion; -import org.apache.http.conn.ConnectionPoolTimeoutException; -import org.apache.http.conn.routing.HttpRoute; -import org.apache.http.conn.scheme.SchemeRegistry; import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.conn.ClientConnectionOperator; import org.apache.http.conn.ClientConnectionRequest; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.http.conn.ManagedClientConnection; +import org.apache.http.conn.OperatedClientConnection; import org.apache.http.conn.params.ConnPerRouteBean; import org.apache.http.conn.params.HttpConnectionManagerParams; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.conn.scheme.PlainSocketFactory; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.scheme.SchemeRegistry; +import org.apache.http.conn.scheme.SocketFactory; +import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; import org.apache.http.localserver.ServerTestBase; import org.apache.http.message.BasicHttpRequest; import org.apache.http.params.HttpParams; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.protocol.ExecutionContext; +import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; -import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; /** @@ -440,6 +454,364 @@ assertNull("TSCCM not garbage collected", wref.get()); } + + public void testAbortDuringConnecting() throws Exception { + HttpParams mgrpar = defaultParams.copy(); + HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1); + + final CountDownLatch connectLatch = new CountDownLatch(1); + final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, WaitPolicy.BEFORE_CONNECT, PlainSocketFactory.getSocketFactory()); + Scheme scheme = new Scheme("http", stallingSocketFactory, 80); + SchemeRegistry registry = new SchemeRegistry(); + registry.register(scheme); + ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry); + final HttpHost target = getServerHttp(); + final HttpRoute route = new HttpRoute(target, null, false); + + final ManagedClientConnection conn = getConnection(mgr, route); + assertTrue(conn instanceof AbstractClientConnAdapter); + + final AtomicReference throwRef = new AtomicReference(); + Thread abortingThread = new Thread(new Runnable() { + public void run() { + try { + stallingSocketFactory.waitForState(); + conn.abortConnection(); + connectLatch.countDown(); + } catch (Throwable e) { + throwRef.set(e); + } + } + }); + abortingThread.start(); + + try { + conn.open(route, httpContext, defaultParams); + fail("expected SocketException"); + } catch(SocketException expected) {} + + abortingThread.join(5000); + if(throwRef.get() != null) + throw new RuntimeException(throwRef.get()); + + assertFalse(conn.isOpen()); + assertEquals(0, localServer.getAcceptedConnectionCount()); + + // check that there are no connections available + try { + // this should fail quickly, connection has not been released + getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS); + fail("ConnectionPoolTimeoutException should have been thrown"); + } catch (ConnectionPoolTimeoutException e) { + // expected + } + + // return it back to the manager + ((AbstractClientConnAdapter) conn).releaseConnection(); + + // the connection is expected to be released back to the manager + ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS); + assertFalse("connection should have been closed", conn2.isOpen()); + + mgr.releaseConnection(conn2); + mgr.shutdown(); + } + + public void testAbortBeforeSocketCreate() throws Exception { + HttpParams mgrpar = defaultParams.copy(); + HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1); + + final CountDownLatch connectLatch = new CountDownLatch(1); + final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, WaitPolicy.BEFORE_CREATE, PlainSocketFactory.getSocketFactory()); + Scheme scheme = new Scheme("http", stallingSocketFactory, 80); + SchemeRegistry registry = new SchemeRegistry(); + registry.register(scheme); + + ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry); + + final HttpHost target = getServerHttp(); + final HttpRoute route = new HttpRoute(target, null, false); + + final ManagedClientConnection conn = getConnection(mgr, route); + assertTrue(conn instanceof AbstractClientConnAdapter); + + final AtomicReference throwRef = new AtomicReference(); + Thread abortingThread = new Thread(new Runnable() { + public void run() { + try { + stallingSocketFactory.waitForState(); + conn.abortConnection(); + connectLatch.countDown(); + } catch (Throwable e) { + throwRef.set(e); + } + } + }); + abortingThread.start(); + + try { + conn.open(route, httpContext, defaultParams); + fail("expected exception"); + } catch(IOException expected) { + assertEquals("Connection already shutdown", expected.getMessage()); + } + + abortingThread.join(5000); + if(throwRef.get() != null) + throw new RuntimeException(throwRef.get()); + + assertFalse(conn.isOpen()); + assertEquals(0, localServer.getAcceptedConnectionCount()); + + // check that there are no connections available + try { + // this should fail quickly, connection has not been released + getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS); + fail("ConnectionPoolTimeoutException should have been thrown"); + } catch (ConnectionPoolTimeoutException e) { + // expected + } + + // return it back to the manager + ((AbstractClientConnAdapter) conn).releaseConnection(); + + // the connection is expected to be released back to the manager + ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS); + assertFalse("connection should have been closed", conn2.isOpen()); + + mgr.releaseConnection(conn2); + mgr.shutdown(); + } + + public void testAbortAfterSocketConnect() throws Exception { + HttpParams mgrpar = defaultParams.copy(); + HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1); + + final CountDownLatch connectLatch = new CountDownLatch(1); + final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, WaitPolicy.AFTER_CONNECT, PlainSocketFactory.getSocketFactory()); + Scheme scheme = new Scheme("http", stallingSocketFactory, 80); + SchemeRegistry registry = new SchemeRegistry(); + registry.register(scheme); + + ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry); + + final HttpHost target = getServerHttp(); + final HttpRoute route = new HttpRoute(target, null, false); + + final ManagedClientConnection conn = getConnection(mgr, route); + assertTrue(conn instanceof AbstractClientConnAdapter); + + final AtomicReference throwRef = new AtomicReference(); + Thread abortingThread = new Thread(new Runnable() { + public void run() { + try { + stallingSocketFactory.waitForState(); + conn.abortConnection(); + connectLatch.countDown(); + } catch (Throwable e) { + throwRef.set(e); + } + } + }); + abortingThread.start(); + + try { + conn.open(route, httpContext, defaultParams); + fail("expected SocketException"); + } catch(SocketException expected) {} + + abortingThread.join(5000); + if(throwRef.get() != null) + throw new RuntimeException(throwRef.get()); + + assertFalse(conn.isOpen()); + assertEquals(1, localServer.getAcceptedConnectionCount()); + + // check that there are no connections available + try { + // this should fail quickly, connection has not been released + getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS); + fail("ConnectionPoolTimeoutException should have been thrown"); + } catch (ConnectionPoolTimeoutException e) { + // expected + } + + // return it back to the manager + ((AbstractClientConnAdapter) conn).releaseConnection(); + + // the connection is expected to be released back to the manager + ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS); + assertFalse("connection should have been closed", conn2.isOpen()); + + mgr.releaseConnection(conn2); + mgr.shutdown(); + } + + public void testAbortAfterOperatorOpen() throws Exception { + HttpParams mgrpar = defaultParams.copy(); + HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1); + + final CountDownLatch connectLatch = new CountDownLatch(1); + final AtomicReference operatorRef = new AtomicReference(); + + ThreadSafeClientConnManager mgr = new ThreadSafeClientConnManager(mgrpar, supportedSchemes) { + @Override + protected ClientConnectionOperator createConnectionOperator( + SchemeRegistry schreg) { + operatorRef.set(new StallingOperator(connectLatch, WaitPolicy.AFTER_OPEN, super.createConnectionOperator(schreg))); + return operatorRef.get(); + } + }; + assertNotNull(operatorRef.get()); + + final HttpHost target = getServerHttp(); + final HttpRoute route = new HttpRoute(target, null, false); + + final ManagedClientConnection conn = getConnection(mgr, route); + assertTrue(conn instanceof AbstractClientConnAdapter); + + final AtomicReference throwRef = new AtomicReference(); + Thread abortingThread = new Thread(new Runnable() { + public void run() { + try { + operatorRef.get().waitForState(); + conn.abortConnection(); + connectLatch.countDown(); + } catch (Throwable e) { + throwRef.set(e); + } + } + }); + abortingThread.start(); + + try { + conn.open(route, httpContext, defaultParams); + fail("expected exception"); + } catch(IOException iox) { + assertEquals("Request aborted", iox.getMessage()); + } + + abortingThread.join(5000); + if(throwRef.get() != null) + throw new RuntimeException(throwRef.get()); + + assertFalse(conn.isOpen()); + assertEquals(1, localServer.getAcceptedConnectionCount()); + + // check that there are no connections available + try { + // this should fail quickly, connection has not been released + getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS); + fail("ConnectionPoolTimeoutException should have been thrown"); + } catch (ConnectionPoolTimeoutException e) { + // expected + } + + // return it back to the manager + ((AbstractClientConnAdapter) conn).releaseConnection(); + + // the connection is expected to be released back to the manager + ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS); + assertFalse("connection should have been closed", conn2.isOpen()); + + mgr.releaseConnection(conn2); + mgr.shutdown(); + } + + private static class LatchSupport { + private final CountDownLatch continueLatch; + private final CountDownLatch waitLatch = new CountDownLatch(1); + protected final WaitPolicy waitPolicy; + + LatchSupport(CountDownLatch continueLatch, WaitPolicy waitPolicy) { + this.continueLatch = continueLatch; + this.waitPolicy = waitPolicy; + } + + void waitForState() throws InterruptedException { + if(!waitLatch.await(1, TimeUnit.SECONDS)) + throw new RuntimeException("waited too long"); + } + + void latch() { + waitLatch.countDown(); + try { + if (!continueLatch.await(1, TimeUnit.SECONDS)) + throw new RuntimeException("waited too long!"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static class StallingOperator extends LatchSupport implements ClientConnectionOperator { + private final ClientConnectionOperator delegate; + + public StallingOperator(CountDownLatch continueLatch, + WaitPolicy waitPolicy, ClientConnectionOperator delegate) { + super(continueLatch, waitPolicy); + this.delegate = delegate; + } + + public OperatedClientConnection createConnection() { + return delegate.createConnection(); + } + + public void openConnection(OperatedClientConnection conn, + HttpHost target, InetAddress local, HttpContext context, + HttpParams params) throws IOException { + delegate.openConnection(conn, target, local, context, params); + if(waitPolicy == WaitPolicy.AFTER_OPEN) + latch(); + } + + public void updateSecureConnection(OperatedClientConnection conn, + HttpHost target, HttpContext context, HttpParams params) + throws IOException { + delegate.updateSecureConnection(conn, target, context, params); + } + } + + private static class StallingSocketFactory extends LatchSupport implements SocketFactory { + private final SocketFactory delegate; + + public StallingSocketFactory(CountDownLatch continueLatch, + WaitPolicy waitPolicy, SocketFactory delegate) { + super(continueLatch, waitPolicy); + this.delegate = delegate; + } + + public Socket connectSocket(Socket sock, String host, int port, + InetAddress localAddress, int localPort, HttpParams params) + throws IOException, UnknownHostException, + ConnectTimeoutException { + if(waitPolicy == WaitPolicy.BEFORE_CONNECT) + latch(); + + Socket socket = delegate.connectSocket(sock, host, port, localAddress, + localPort, params); + + if(waitPolicy == WaitPolicy.AFTER_CONNECT) + latch(); + + return socket; + } + + public Socket createSocket() throws IOException { + if(waitPolicy == WaitPolicy.BEFORE_CREATE) + latch(); + + return delegate.createSocket(); + } + + public boolean isSecure(Socket sock) throws IllegalArgumentException { + return delegate.isSecure(sock); + } + } + + private enum WaitPolicy { BEFORE_CREATE, BEFORE_CONNECT, AFTER_CONNECT, AFTER_OPEN } + + } // class TestTSCCMWithServer Index: C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/localserver/LocalTestServer.java =================================================================== --- C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/localserver/LocalTestServer.java (revision 658286) +++ C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/test/java/org/apache/http/localserver/LocalTestServer.java (working copy) @@ -39,6 +39,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.http.ConnectionReuseStrategy; import org.apache.http.HttpException; @@ -104,6 +105,9 @@ /** The request listening thread, while listening. */ protected volatile Thread listenerThread; + + /** The number of connections this accepted. */ + private final AtomicInteger acceptedConnections = new AtomicInteger(0); /** @@ -161,8 +165,14 @@ "LocalTestServer/1.1"); return params; } + + /** + * Returns the number of connections this test server has accepted. + */ + public int getAcceptedConnectionCount() { + return acceptedConnections.get(); + } - /** * {@link #register Registers} a set of default request handlers. *
@@ -332,6 +342,7 @@
         protected void accept() throws IOException {
             // Set up HTTP connection
             Socket socket = servicedSocket.accept();
+            acceptedConnections.incrementAndGet();
             DefaultHttpServerConnection conn =
                 new DefaultHttpServerConnection();
             conn.bind(socket, serverParams);
Index: C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/AbstractPoolEntry.java
===================================================================
--- C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/AbstractPoolEntry.java	(revision 658286)
+++ C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/AbstractPoolEntry.java	(working copy)
@@ -72,7 +72,7 @@
     protected final ClientConnectionOperator connOperator;
 
     /** The underlying connection being pooled or used. */
-    protected volatile OperatedClientConnection connection;
+    protected final OperatedClientConnection connection;
 
     /** The route for which this entry gets allocated. */
     //@@@ currently accessed from connection manager(s) as attribute
@@ -167,10 +167,18 @@
              route.getLocalAddress(),
              context, params);
 
+        RouteTracker localTracker = tracker; // capture volatile        
+
+        // If this tracker was reset while connecting,
+        // fail early.
+        if(localTracker == null) {
+            throw new IOException("Request aborted");
+        }
+
         if (proxy == null)
-            this.tracker.connectTarget(this.connection.isSecure());
+            localTracker.connectTarget(this.connection.isSecure());
         else
-            this.tracker.connectProxy(proxy, this.connection.isSecure());
+            localTracker.connectProxy(proxy, this.connection.isSecure());
 
     } // open
 
@@ -300,6 +308,9 @@
 
     /**
      * Resets tracked route.
+     * 
+     * If {@link #open(HttpRoute, HttpContext, HttpParams)} is in progress,
+     * this will cause that open to possibly throw an {@link IOException}.
      */
     protected void resetTrackedRoute() { 
         tracker = null;
Index: C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/DefaultClientConnection.java
===================================================================
--- C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/DefaultClientConnection.java	(revision 658286)
+++ C:/Documents and Settings/Sam/workspace/httpcomponents-client/module-client/src/main/java/org/apache/http/impl/conn/DefaultClientConnection.java	(working copy)
@@ -79,6 +79,9 @@
 
     /** Whether this connection is secure. */
     private boolean connSecure;
+    
+    /** True if this connection was shutdown. */
+    private volatile boolean shutdown;
 
 
     // public default constructor
@@ -102,10 +105,18 @@
     }
 
 
-    public void opening(Socket sock, HttpHost target) {
-        assertNotOpen();
+    public void opening(Socket sock, HttpHost target) throws IOException {
+        assertNotOpen();        
         this.socket = sock;
         this.targetHost = target;
+        
+        // Check for shutdown after assigning socket
+        // so that 
+        if(shutdown) {
+            socket.close(); // Allow this to throw...
+            // ...but if it doesn't, explicitly throw one ourselves.
+            throw new IOException("Connection already shutdown");
+        }
     }
 
     
@@ -127,14 +138,17 @@
      * socket that is being connected to a remote address will be closed. 
      * That will interrupt a thread that is blocked on connecting 
      * the socket.
+     * If the connection is not yet open, this will prevent the connection
+     * from being opened.
      *
      * @throws IOException      in case of a problem
      */
     @Override
     public void shutdown() throws IOException {
         LOG.debug("Connection shut down");
+        shutdown = true;
         
-        super.shutdown();
+        super.shutdown();        
         Socket sock = this.socket; // copy volatile attribute
         if (sock != null)
             sock.close();