--- module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java (revision 3) +++ module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java Thu Mar 29 18:54:01 PDT 2007 @@ -53,30 +53,32 @@ import org.apache.http.params.HttpConnectionParams; import org.apache.http.params.HttpParams; -public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor +public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor implements ConnectingIOReactor { public static int TIMEOUT_CHECK_INTERVAL = 1000; - + private volatile boolean closed = false; - + private final HttpParams params; private final Selector selector; private final SessionRequestQueue requestQueue; + private final int timeoutCheckInterval; - + private long lastTimeoutCheck; - + public DefaultConnectingIOReactor( - int workerCount, + int workerCount, final ThreadFactory threadFactory, - final HttpParams params) throws IOReactorException { - super(TIMEOUT_CHECK_INTERVAL, workerCount, threadFactory); + final HttpParams params, int timeoutCheckInterval) throws IOReactorException { + super(timeoutCheckInterval, workerCount, threadFactory); if (params == null) { throw new IllegalArgumentException("HTTP parameters may not be null"); } this.params = params; this.requestQueue = new SessionRequestQueue(); this.lastTimeoutCheck = System.currentTimeMillis(); + this.timeoutCheckInterval = timeoutCheckInterval; try { this.selector = Selector.open(); } catch (IOException ex) { @@ -85,12 +87,12 @@ } public DefaultConnectingIOReactor( - int workerCount, + int workerCount, final HttpParams params) throws IOReactorException { - this(workerCount, null, params); + this(workerCount, null, params, TIMEOUT_CHECK_INTERVAL); } - - public void execute(final IOEventDispatch eventDispatch) + + public void execute(final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException { if (eventDispatch == null) { throw new IllegalArgumentException("Event dispatcher may not be null"); @@ -99,7 +101,7 @@ for (;;) { int readyCount; try { - readyCount = this.selector.select(TIMEOUT_CHECK_INTERVAL); + readyCount = this.selector.select(this.timeoutCheckInterval); } catch (InterruptedIOException ex) { throw ex; } catch (IOException ex) { @@ -109,15 +111,15 @@ if (this.closed) { break; } - + processSessionRequests(); - + if (readyCount > 0) { processEvents(this.selector.selectedKeys()); } - + long currentTime = System.currentTimeMillis(); - if( (currentTime - this.lastTimeoutCheck) >= TIMEOUT_CHECK_INTERVAL) { + if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) { this.lastTimeoutCheck = currentTime; Set keys = this.selector.keys(); if (keys != null) { @@ -127,27 +129,27 @@ verifyWorkers(); } } - + private void processEvents(final Set selectedKeys) { for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) { - + SelectionKey key = (SelectionKey) it.next(); processEvent(key); - + } selectedKeys.clear(); } private void processEvent(final SelectionKey key) { try { - + if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key.channel(); // Get request handle SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment(); SessionRequestImpl sessionRequest = requestHandle.getSessionRequest(); - + // Finish connection process try { channel.finishConnect(); @@ -158,14 +160,14 @@ if (channel.isConnected()) { try { prepareSocket(channel.socket()); - ChannelEntry entry = new ChannelEntry(channel, sessionRequest); + ChannelEntry entry = new ChannelEntry(channel, sessionRequest); addChannel(entry); } catch (IOException ex) { sessionRequest.failed(ex); } } } - + } catch (CancelledKeyException ex) { key.attach(null); } @@ -179,13 +181,13 @@ socket.setSoLinger(linger > 0, linger); } } - + private void processTimeouts(final Set keys) { long now = System.currentTimeMillis(); for (Iterator it = keys.iterator(); it.hasNext();) { SelectionKey key = (SelectionKey) it.next(); Object attachment = key.attachment(); - + if (attachment instanceof SessionRequestHandle) { SessionRequestHandle handle = (SessionRequestHandle) key.attachment(); SessionRequestImpl sessionRequest = handle.getSessionRequest(); @@ -196,12 +198,12 @@ } } } - + } } public SessionRequest connect( - final SocketAddress remoteAddress, + final SocketAddress remoteAddress, final SocketAddress localAddress, final Object attachment, final SessionRequestCallback callback) { @@ -209,13 +211,13 @@ SessionRequestImpl sessionRequest = new SessionRequestImpl( remoteAddress, localAddress, attachment, callback); sessionRequest.setConnectTimeout(HttpConnectionParams.getConnectionTimeout(this.params)); - + this.requestQueue.push(sessionRequest); this.selector.wakeup(); - + return sessionRequest; } - + private void validateAddress(final SocketAddress address) throws UnknownHostException { if (address == null) { return; @@ -227,7 +229,7 @@ } } } - + private void processSessionRequests() throws IOReactorException { SessionRequestImpl request; while ((request = this.requestQueue.pop()) != null) { @@ -244,7 +246,7 @@ try { validateAddress(request.getLocalAddress()); validateAddress(request.getRemoteAddress()); - + if (request.getLocalAddress() != null) { socketChannel.socket().bind(request.getLocalAddress()); } @@ -253,7 +255,7 @@ request.failed(ex); return; } - + SelectionKey key; try { key = socketChannel.register(this.selector, 0); @@ -263,7 +265,7 @@ "with the selector", ex); } - SessionRequestHandle requestHandle = new SessionRequestHandle(request); + SessionRequestHandle requestHandle = new SessionRequestHandle(request); key.attach(requestHandle); key.interestOps(SelectionKey.OP_CONNECT); } @@ -279,5 +281,5 @@ // Stop the workers stopWorkers(500); } - + }