Index: module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java =================================================================== --- module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java (revision 6) +++ module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java Mon Aug 18 13:29:45 PDT 2008 @@ -39,6 +39,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import java.nio.channels.ClosedChannelException; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -54,14 +55,14 @@ public abstract class AbstractIOReactor implements IOReactor { private volatile IOReactorStatus status; - + private final Object shutdownMutex; private final long selectTimeout; private final Selector selector; private final Set sessions; private final Queue closedSessions; private final Queue newChannels; - + public AbstractIOReactor(long selectTimeout) throws IOReactorException { super(); if (selectTimeout <= 0) { @@ -81,23 +82,23 @@ } protected abstract void acceptable(SelectionKey key); - + protected abstract void connectable(SelectionKey key); protected abstract void readable(SelectionKey key); protected abstract void writable(SelectionKey key); - + protected abstract void timeoutCheck(SelectionKey key, long now); protected abstract void validate(Set keys); - + protected abstract void keyCreated(SelectionKey key, IOSession session); - + protected abstract IOSession keyCancelled(SelectionKey key); - + protected abstract void sessionClosed(IOSession session); - + public IOReactorStatus getStatus() { return this.status; } @@ -109,13 +110,13 @@ this.newChannels.add(channelEntry); this.selector.wakeup(); } - + protected void execute() throws InterruptedIOException, IOReactorException { this.status = IOReactorStatus.ACTIVE; try { for (;;) { - + int readyCount; try { readyCount = this.selector.select(this.selectTimeout); @@ -124,7 +125,7 @@ } catch (IOException ex) { throw new IOReactorException("Unexpected selector failure", ex); } - + if (this.status == IOReactorStatus.SHUT_DOWN) { // Hard shut down. Exit select loop immediately break; @@ -136,15 +137,15 @@ closeSessions(); closeNewChannels(); } - - // Process selected I/O events + + // Process selected I/O events if (readyCount > 0) { processEvents(this.selector.selectedKeys()); } - + // Validate active channels validate(this.selector.keys()); - + // Process closed sessions processClosedSessions(); @@ -152,18 +153,18 @@ if (this.status == IOReactorStatus.ACTIVE) { processNewChannels(); } - + // Exit select loop if graceful shutdown has been completed - if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0 + if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0 && this.sessions.isEmpty()) { break; } - + } - + // Close remaining active channels and the selector itself closeActiveChannels(); - + } catch (ClosedSelectorException ex) { } finally { synchronized (this.shutdownMutex) { @@ -172,13 +173,13 @@ } } } - + private void processEvents(final Set selectedKeys) { for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) { - + SelectionKey key = it.next(); processEvent(key); - + } selectedKeys.clear(); } @@ -209,13 +210,20 @@ private void processNewChannels() throws IOReactorException { ChannelEntry entry; while ((entry = this.newChannels.poll()) != null) { - + SocketChannel channel; SelectionKey key; try { channel = entry.getChannel(); channel.configureBlocking(false); key = channel.register(this.selector, 0); + } catch (ClosedChannelException ex) { + SessionRequestImpl sessionRequest = entry.getSessionRequest(); + if (sessionRequest != null) { + sessionRequest.failed(ex); + } + return; + } catch (IOException ex) { throw new IOReactorException("Failure registering channel " + "with the selector", ex); @@ -226,9 +234,9 @@ public void sessionClosed(IOSession session) { closedSessions.add(session); } - + }); - + int timeout = 0; try { timeout = channel.socket().getSoTimeout(); @@ -237,14 +245,14 @@ // as the protocol layer is expected to overwrite // this value anyways } - + session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment()); session.setSocketTimeout(timeout); this.sessions.add(session); try { keyCreated(key, session); - + SessionRequestImpl sessionRequest = entry.getSessionRequest(); if (sessionRequest != null) { sessionRequest.completed(session); @@ -273,7 +281,7 @@ } } } - + protected void closeNewChannels() throws IOReactorException { ChannelEntry entry; while ((entry = this.newChannels.poll()) != null) { @@ -288,7 +296,7 @@ } } } - + protected void closeActiveChannels() throws IOReactorException { Set keys = this.selector.keys(); for (Iterator it = keys.iterator(); it.hasNext(); ) { @@ -306,7 +314,7 @@ } catch (IOException ignore) { } } - + public void gracefulShutdown() { if (this.status != IOReactorStatus.ACTIVE) { // Already shutting down @@ -315,7 +323,7 @@ this.status = IOReactorStatus.SHUTTING_DOWN; this.selector.wakeup(); } - + public void hardShutdown() throws IOReactorException { if (this.status == IOReactorStatus.SHUT_DOWN) { // Already shut down @@ -325,7 +333,7 @@ closeNewChannels(); closeActiveChannels(); } - + public void awaitShutdown(long timeout) throws InterruptedException { synchronized (this.shutdownMutex) { long deadline = System.currentTimeMillis() + timeout; @@ -341,7 +349,7 @@ } } } - + public void shutdown(long gracePeriod) throws IOReactorException { if (this.status != IOReactorStatus.INACTIVE) { gracefulShutdown(); @@ -354,9 +362,9 @@ hardShutdown(); } } - + public void shutdown() throws IOReactorException { shutdown(1000); } - + }