Index: module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java =================================================================== --- module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java (revision 630026) +++ module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java (working copy) @@ -45,6 +45,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import org.apache.http.nio.reactor.IOReactor; import org.apache.http.nio.reactor.IOReactorException; @@ -61,6 +62,7 @@ private final Set sessions; private final Queue closedSessions; private final Queue newChannels; + private final ReactorExecutor executor; public AbstractIOReactor(long selectTimeout) throws IOReactorException { super(); @@ -71,6 +73,7 @@ this.sessions = Collections.synchronizedSet(new HashSet()); this.closedSessions = new ConcurrentLinkedQueue(); this.newChannels = new ConcurrentLinkedQueue(); + this.executor = new ReactorExecutor(); try { this.selector = Selector.open(); } catch (IOException ex) { @@ -137,6 +140,10 @@ closeNewChannels(); } + if(this.status == IOReactorStatus.ACTIVE) { + executor.runCommands(); + } + // Process selected I/O events if (readyCount > 0) { processEvents(this.selector.selectedKeys()); @@ -221,7 +228,7 @@ "with the selector", ex); } - IOSession session = new IOSessionImpl(key, new SessionClosedCallback() { + IOSession session = new IOSessionImpl(key, executor, new SessionClosedCallback() { public void sessionClosed(IOSession session) { closedSessions.add(session); @@ -359,4 +366,21 @@ shutdown(1000); } + private class ReactorExecutor implements Executor { + private final Queue commands = new ConcurrentLinkedQueue(); + + @Override + public void execute(Runnable command) { + selector.wakeup(); + commands.add(command); + } + + void runCommands() { + Runnable command; + while ((command = this.commands.poll()) != null) { + command.run(); + } + } + } + } Index: module-nio/src/main/java/org/apache/http/impl/nio/reactor/SSLIOSession.java =================================================================== --- module-nio/src/main/java/org/apache/http/impl/nio/reactor/SSLIOSession.java (revision 630026) +++ module-nio/src/main/java/org/apache/http/impl/nio/reactor/SSLIOSession.java (working copy) @@ -36,6 +36,7 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ByteChannel; +import java.util.concurrent.Executor; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -350,6 +351,10 @@ public ByteChannel channel() { return this.channel; } + + public Executor getIOExecutor() { + return this.session.getIOExecutor(); + } public SocketAddress getLocalAddress() { return this.session.getLocalAddress(); Index: module-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java =================================================================== --- module-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java (revision 630026) +++ module-nio/src/main/java/org/apache/http/impl/nio/reactor/IOSessionImpl.java (working copy) @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; import org.apache.http.nio.reactor.IOSession; import org.apache.http.nio.reactor.SessionBufferStatus; @@ -52,16 +53,19 @@ private final ByteChannel channel; private final SessionClosedCallback callback; private final Map attributes; + private final Executor ioExecutor; private SessionBufferStatus bufferStatus; private int socketTimeout; - public IOSessionImpl(final SelectionKey key, final SessionClosedCallback callback) { + public IOSessionImpl(final SelectionKey key, final Executor ioExecutor, + final SessionClosedCallback callback) { super(); if (key == null) { throw new IllegalArgumentException("Selection key may not be null"); } this.key = key; + this.ioExecutor = ioExecutor; this.channel = (ByteChannel) this.key.channel(); this.callback = callback; this.attributes = Collections.synchronizedMap(new HashMap()); @@ -73,6 +77,10 @@ return this.channel; } + public Executor getIOExecutor() { + return this.ioExecutor; + } + public SocketAddress getLocalAddress() { Channel channel = this.key.channel(); if (channel instanceof SocketChannel) { Index: module-nio/src/main/java/org/apache/http/impl/nio/NHttpConnectionBase.java =================================================================== --- module-nio/src/main/java/org/apache/http/impl/nio/NHttpConnectionBase.java (revision 630026) +++ module-nio/src/main/java/org/apache/http/impl/nio/NHttpConnectionBase.java (working copy) @@ -35,6 +35,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.concurrent.Executor; import org.apache.http.ConnectionClosedException; import org.apache.http.Header; @@ -137,6 +138,11 @@ public int getStatus() { return this.status; } + + @Override + public Executor getIOExecutor() { + return session.getIOExecutor(); + } public HttpContext getContext() { return this.context; Index: module-nio/src/main/java/org/apache/http/nio/IOControl.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/IOControl.java (revision 630026) +++ module-nio/src/main/java/org/apache/http/nio/IOControl.java (working copy) @@ -32,15 +32,23 @@ package org.apache.http.nio; import java.io.IOException; +import java.util.concurrent.Executor; /** * Connection input/output control interface. It can be used to request or * temporarily suspend event notifications that are triggered when the underlying - * channel is ready for input / output operations. + * channel is ready for input / output operations. It can also be used to + * retrieve an executor that will run events within the correct I/O thread. * * @author Oleg Kalnichevski */ public interface IOControl { + + /** + * Returns the an Executor that will execute events within + * the correct I/O thread. + */ + Executor getIOExecutor(); /** * Requests event notifications to be triggered when the underlying Index: module-nio/src/main/java/org/apache/http/nio/reactor/IOSession.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/reactor/IOSession.java (revision 630026) +++ module-nio/src/main/java/org/apache/http/nio/reactor/IOSession.java (working copy) @@ -33,6 +33,7 @@ import java.net.SocketAddress; import java.nio.channels.ByteChannel; +import java.util.concurrent.Executor; public interface IOSession { @@ -44,6 +45,8 @@ ByteChannel channel(); + Executor getIOExecutor(); + SocketAddress getRemoteAddress(); SocketAddress getLocalAddress();