--- module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultConnectingIOReactor.java (revision 4) +++ module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultConnectingIOReactor.java Tue Jan 23 14:14:10 PST 2007 @@ -38,8 +38,12 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import java.nio.channels.ClosedChannelException; import java.util.Iterator; import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.HashSet; import org.apache.http.nio.reactor.ConnectingIOReactor; import org.apache.http.nio.reactor.IOEventDispatch; @@ -56,6 +60,7 @@ private final HttpParams params; private final Selector selector; + private final Set newConnections = new HashSet(); private long lastTimeoutCheck; @@ -80,6 +85,11 @@ if (this.closed) { break; } + + synchronized(this.newConnections) { + processConnections(); + } + if (readyCount > 0) { processEvents(this.selector.selectedKeys()); } @@ -96,6 +106,27 @@ } } + private void processConnections() throws ClosedChannelException + { + Iterator iter = this.newConnections.iterator(); + + while (iter.hasNext()) { + ConnectInfo info = (ConnectInfo) iter.next(); + + iter.remove(); + + SelectionKey key = info.socketChannel.register(this.selector, 0); + SessionRequestImpl sessionRequest = new SessionRequestImpl( + info.remoteAddress, info.localAddress, info.attachment, key); + sessionRequest.setConnectTimeout(HttpConnectionParams.getConnectionTimeout(this.params)); + + SessionRequestHandle requestHandle = new SessionRequestHandle(sessionRequest); + key.attach(requestHandle); + key.interestOps(SelectionKey.OP_CONNECT); + } + + } + private void processEvents(final Set selectedKeys) throws IOException { for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) { @@ -169,6 +200,7 @@ final SocketAddress remoteAddress, final SocketAddress localAddress, final Object attachment) throws IOException { + SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); if (localAddress != null) { @@ -175,18 +207,31 @@ socketChannel.socket().bind(localAddress); } socketChannel.connect(remoteAddress); - SelectionKey key = socketChannel.register(this.selector, 0); + ConnectInfo info = new ConnectInfo(); - SessionRequestImpl sessionRequest = new SessionRequestImpl( - remoteAddress, localAddress, attachment, key); - sessionRequest.setConnectTimeout(HttpConnectionParams.getConnectionTimeout(this.params)); + info.remoteAddress = remoteAddress; + info.localAddress = localAddress; + info.attachment = attachment; + info.socketChannel = socketChannel; - SessionRequestHandle requestHandle = new SessionRequestHandle(sessionRequest); - key.attach(requestHandle); - key.interestOps(SelectionKey.OP_CONNECT); - return sessionRequest; + synchronized(this.newConnections) { + this.newConnections.add(info); - } + } + + this.selector.wakeup(); + + return null; + } + + private static class ConnectInfo + { + public SocketAddress remoteAddress; + public SocketAddress localAddress; + public Object attachment; + public SocketChannel socketChannel; + } + public void shutdown() throws IOException { if (this.closed) { return;