Index: httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java =================================================================== --- httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java (revision 1534273) +++ httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java (working copy) @@ -64,7 +64,7 @@ private final Queue requestQueue; private final Set endpoints; - private final Set pausedEndpoints; + private final Set pausedEndpoints; private volatile boolean paused; @@ -84,7 +84,7 @@ super(config, threadFactory); this.requestQueue = new ConcurrentLinkedQueue(); this.endpoints = Collections.synchronizedSet(new HashSet()); - this.pausedEndpoints = new HashSet(); + this.pausedEndpoints = new HashSet(); } /** @@ -199,9 +199,10 @@ } } - private ListenerEndpointImpl createEndpoint(final SocketAddress address) { + private ListenerEndpointImpl createEndpoint(final SocketAddress address, final int backlog) { return new ListenerEndpointImpl( address, + backlog, new ListenerEndpointClosedCallback() { public void endpointClosed(final ListenerEndpoint endpoint) { @@ -212,9 +213,13 @@ } public ListenerEndpoint listen(final SocketAddress address) { + return listen(address, 0); + } + + public ListenerEndpoint listen(final SocketAddress address, final int backlog) { Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0, "I/O reactor has been shut down"); - final ListenerEndpointImpl request = createEndpoint(address); + final ListenerEndpointImpl request = createEndpoint(address, backlog); this.requestQueue.add(request); this.selector.wakeup(); return request; @@ -234,7 +239,7 @@ final ServerSocket socket = serverChannel.socket(); socket.setReuseAddress(this.config.isSoReuseAddress()); serverChannel.configureBlocking(false); - socket.bind(address); + socket.bind(address, request.getBacklog()); } catch (final IOException ex) { closeChannel(serverChannel); request.failed(ex); @@ -285,7 +290,7 @@ for (final ListenerEndpointImpl endpoint : this.endpoints) { if (!endpoint.isClosed()) { endpoint.close(); - this.pausedEndpoints.add(endpoint.getAddress()); + this.pausedEndpoints.add(endpoint); } } this.endpoints.clear(); @@ -297,8 +302,8 @@ return; } this.paused = false; - for (final SocketAddress address: this.pausedEndpoints) { - final ListenerEndpointImpl request = createEndpoint(address); + for (final ListenerEndpointImpl endpoint: this.pausedEndpoints) { + final ListenerEndpointImpl request = createEndpoint(endpoint.getAddress(), endpoint.getBacklog()); this.requestQueue.add(request); } this.pausedEndpoints.clear(); Index: httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java =================================================================== --- httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java (revision 1534273) +++ httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java (working copy) @@ -50,14 +50,17 @@ private volatile SocketAddress address; private volatile IOException exception; + private final int backlog; private final ListenerEndpointClosedCallback callback; public ListenerEndpointImpl( final SocketAddress address, + final int backlog, final ListenerEndpointClosedCallback callback) { super(); Args.notNull(address, "Address"); this.address = address; + this.backlog = backlog; this.callback = callback; } @@ -65,6 +68,10 @@ return this.address; } + public int getBacklog() { + return backlog; + } + public boolean isCompleted() { return this.completed; } Index: httpcore-nio/src/main/java/org/apache/http/nio/reactor/ListenerEndpoint.java =================================================================== --- httpcore-nio/src/main/java/org/apache/http/nio/reactor/ListenerEndpoint.java (revision 1534273) +++ httpcore-nio/src/main/java/org/apache/http/nio/reactor/ListenerEndpoint.java (working copy) @@ -46,6 +46,13 @@ SocketAddress getAddress(); /** + * Returns the socket backlog size of this endpoint. + * + * @return socket backlog size. + */ + int getBacklog(); + + /** * Returns an instance of {@link IOException} thrown during initialization * of this endpoint or null, if initialization was successful. * Index: httpcore-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java =================================================================== --- httpcore-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java (revision 1534273) +++ httpcore-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java (working copy) @@ -45,8 +45,11 @@ * connections and propagates I/O activity notifications to the I/O event * dispatcher. *

+ * An invocation of this method is equivalent to the following: + * listen(address, 0); + *

* {@link ListenerEndpoint#waitFor()} can be used to wait for the - * listener to be come ready to accept incoming connections. + * listener to be come ready to accept incoming connections. *

* {@link ListenerEndpoint#close()} can be used to shut down * the listener even before it is fully initialized. @@ -57,6 +60,24 @@ ListenerEndpoint listen(SocketAddress address); /** + * Opens a new listener endpoint with the given socket address. Once + * the endpoint is fully initialized it starts accepting incoming + * connections and propagates I/O activity notifications to the I/O event + * dispatcher. + *

+ * {@link ListenerEndpoint#waitFor()} can be used to wait for the + * listener to be come ready to accept incoming connections. + *

+ * {@link ListenerEndpoint#close()} can be used to shut down + * the listener even before it is fully initialized. + * + * @param address the socket address to listen on. + * @param backlog the socket backlog size. + * @return listener endpoint. + */ + ListenerEndpoint listen(SocketAddress address, int backlog); + + /** * Suspends the I/O reactor preventing it from accepting new connections on * all active endpoints. *