Index: module-nio/src/test/java/org/apache/http/nio/protocol/TestAllProtocol.java =================================================================== --- module-nio/src/test/java/org/apache/http/nio/protocol/TestAllProtocol.java (revision 628295) +++ module-nio/src/test/java/org/apache/http/nio/protocol/TestAllProtocol.java (working copy) @@ -43,6 +43,7 @@ suite.addTest(TestBufferingNHttpHandlers.suite()); suite.addTest(TestThrottlingNHttpHandler.suite()); suite.addTest(TestNIOSSLHttp.suite()); + suite.addTest(TestAsyncNHttpHandlers.suite()); return suite; } Index: module-nio/src/test/java/org/apache/http/nio/protocol/TestAsyncNHttpHandlers.java =================================================================== --- module-nio/src/test/java/org/apache/http/nio/protocol/TestAsyncNHttpHandlers.java (revision 0) +++ module-nio/src/test/java/org/apache/http/nio/protocol/TestAsyncNHttpHandlers.java (revision 0) @@ -0,0 +1,2220 @@ +/* + * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/nio/protocol/TestBufferingNHttpHandlers.java $ + * $Revision: 617652 $ + * $Date: 2008-02-01 16:18:00 -0500 (Fri, 01 Feb 2008) $ + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.http.nio.protocol; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.HttpVersion; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.DefaultConnectionReuseStrategy; +import org.apache.http.impl.DefaultHttpResponseFactory; +import org.apache.http.message.BasicHttpEntityEnclosingRequest; +import org.apache.http.message.BasicHttpRequest; +import org.apache.http.mockup.ByteSequence; +import org.apache.http.mockup.RequestCount; +import org.apache.http.mockup.ResponseSequence; +import org.apache.http.mockup.SimpleEventListener; +import org.apache.http.mockup.SimpleHttpRequestHandlerResolver; +import org.apache.http.mockup.TestHttpClient; +import org.apache.http.mockup.TestHttpServer; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.NHttpClientHandler; +import org.apache.http.nio.NHttpConnection; +import org.apache.http.nio.NHttpServiceHandler; +import org.apache.http.nio.entity.ConsumingNHttpEntity; +import org.apache.http.nio.entity.NByteArrayEntity; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.nio.reactor.IOReactorExceptionHandler; +import org.apache.http.nio.reactor.ListenerEndpoint; +import org.apache.http.nio.util.HeapByteBufferAllocator; +import org.apache.http.nio.util.SimpleInputBuffer; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.CoreConnectionPNames; +import org.apache.http.params.CoreProtocolPNames; +import org.apache.http.params.HttpParams; +import org.apache.http.protocol.BasicHttpProcessor; +import org.apache.http.protocol.ExecutionContext; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpExpectationVerifier; +import org.apache.http.protocol.HttpRequestHandler; +import org.apache.http.protocol.RequestConnControl; +import org.apache.http.protocol.RequestContent; +import org.apache.http.protocol.RequestExpectContinue; +import org.apache.http.protocol.RequestTargetHost; +import org.apache.http.protocol.RequestUserAgent; +import org.apache.http.protocol.ResponseConnControl; +import org.apache.http.protocol.ResponseContent; +import org.apache.http.protocol.ResponseDate; +import org.apache.http.protocol.ResponseServer; +import org.apache.http.util.EncodingUtils; +import org.apache.http.util.EntityUtils; + +/** + * HttpCore NIO integration tests for async handlers. + * + * @author Oleg Kalnichevski + * @author Sam Berlin + * + * @version $Id: TestAsyncNHttpHandlers.java 617652 2008-02-01 21:18:00Z sebb $ + */ +public class TestAsyncNHttpHandlers extends TestCase { + + // ------------------------------------------------------------ Constructor + public TestAsyncNHttpHandlers(String testName) { + super(testName); + } + + // ------------------------------------------------------------------- Main + public static void main(String args[]) { + String[] testCaseName = { TestAsyncNHttpHandlers.class.getName() }; + junit.textui.TestRunner.main(testCaseName); + } + + // ------------------------------------------------------- TestCase Methods + + public static Test suite() { + return new TestSuite(TestAsyncNHttpHandlers.class); + } + + private TestHttpServer server; + private TestHttpClient client; + + @Override + protected void setUp() throws Exception { + HttpParams serverParams = new BasicHttpParams(); + serverParams + .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000) + .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) + .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false) + .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true) + .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "TEST-SERVER/1.1"); + + this.server = new TestHttpServer(serverParams); + this.server.setExceptionHandler(new IOReactorExceptionHandler() { + @Override + public boolean handle(IOException ex) { + ex.printStackTrace(); + return false; + } + + @Override + public boolean handle(RuntimeException ex) { + ex.printStackTrace(); + return false; + } + }); + + HttpParams clientParams = new BasicHttpParams(); + clientParams + .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000) + .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 2000) + .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) + .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false) + .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true) + .setParameter(CoreProtocolPNames.USER_AGENT, "TEST-CLIENT/1.1"); + + this.client = new TestHttpClient(clientParams); + this.client.setExceptionHandler(new IOReactorExceptionHandler() { + @Override + public boolean handle(IOException ex) { + ex.printStackTrace(); + return false; + } + + @Override + public boolean handle(RuntimeException ex) { + ex.printStackTrace(); + return false; + } + }); + } + + @Override + protected void tearDown() throws Exception { + this.server.shutdown(); + this.client.shutdown(); + } + + private NHttpServiceHandler createHttpServiceHandler( + final HttpRequestHandler requestHandler, + final HttpExpectationVerifier expectationVerifier) { + BasicHttpProcessor httpproc = new BasicHttpProcessor(); + httpproc.addInterceptor(new ResponseDate()); + httpproc.addInterceptor(new ResponseServer()); + httpproc.addInterceptor(new ResponseContent()); + httpproc.addInterceptor(new ResponseConnControl()); + + AsyncNHttpServiceHandler serviceHandler = new AsyncNHttpServiceHandler( + httpproc, + new DefaultHttpResponseFactory(), + new DefaultConnectionReuseStrategy(), + this.server.getParams()); + + serviceHandler.setHandlerResolver( + new SimpleHttpRequestHandlerResolver(requestHandler)); + serviceHandler.setExpectationVerifier(expectationVerifier); + serviceHandler.setEventListener(new SimpleEventListener()); + + return serviceHandler; + } + + private NHttpClientHandler createHttpClientHandler( + final HttpRequestExecutionHandler requestExecutionHandler) { + BasicHttpProcessor httpproc = new BasicHttpProcessor(); + httpproc.addInterceptor(new RequestContent()); + httpproc.addInterceptor(new RequestTargetHost()); + httpproc.addInterceptor(new RequestConnControl()); + httpproc.addInterceptor(new RequestUserAgent()); + httpproc.addInterceptor(new RequestExpectContinue()); + + BufferingHttpClientHandler clientHandler = new BufferingHttpClientHandler( + httpproc, + requestExecutionHandler, + new DefaultConnectionReuseStrategy(), + this.client.getParams()); + + clientHandler.setEventListener(new SimpleEventListener()); + return clientHandler; + } + + /** + * This test case executes a series of simple (non-pipelined) GET requests + * over multiple connections. This uses blocking output entities, and + * checks to make sure the handler converts the entity correctly. + */ + public void testSimpleHttpGetsWithBlockingEntities() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData.add(new ByteSequence()); + } + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + String s = request.getRequestLine().getUri(); + URI uri; + try { + uri = new URI(s); + } catch (URISyntaxException ex) { + throw new HttpException("Invalid request URI: " + s); + } + int index = Integer.parseInt(uri.getQuery()); + byte[] bytes = requestData.getBytes(index); + ByteArrayEntity entity = new ByteArrayEntity(bytes); + response.setEntity(entity); + } + + }; + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ByteSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpRequest get = null; + if (i < reqNo) { + get = new BasicHttpRequest("GET", "/?" + i); + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return get; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ByteSequence list = (ByteSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + byte[] data = EntityUtils.toByteArray(entity); + list.addBytes(data); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < responseData.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData.get(i)); + } + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < responseData.size(); c++) { + ByteSequence receivedPackets = responseData.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), receivedPackets.size()); + for (int p = 0; p < requestData.size(); p++) { + byte[] expected = requestData.getBytes(p); + byte[] received = receivedPackets.getBytes(p); + + assertEquals(expected.length, received.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], received[i]); + } + } + } + + } + + /** + * This test case executes a series of simple (non-pipelined) GET requests + * over multiple connections. This uses non-blocking output entities. + */ + public void testSimpleHttpGetsWithNonBlockingEntities() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData.add(new ByteSequence()); + } + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + String s = request.getRequestLine().getUri(); + URI uri; + try { + uri = new URI(s); + } catch (URISyntaxException ex) { + throw new HttpException("Invalid request URI: " + s); + } + int index = Integer.parseInt(uri.getQuery()); + byte[] bytes = requestData.getBytes(index); + NByteArrayEntity entity = new NByteArrayEntity(bytes); + response.setEntity(entity); + } + + }; + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ByteSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpRequest get = null; + if (i < reqNo) { + get = new BasicHttpRequest("GET", "/?" + i); + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return get; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ByteSequence list = (ByteSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + byte[] data = EntityUtils.toByteArray(entity); + list.addBytes(data); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < responseData.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData.get(i)); + } + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < responseData.size(); c++) { + ByteSequence receivedPackets = responseData.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), receivedPackets.size()); + for (int p = 0; p < requestData.size(); p++) { + byte[] expected = requestData.getBytes(p); + byte[] received = receivedPackets.getBytes(p); + + assertEquals(expected.length, received.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], received[i]); + } + } + } + + } + + /** + * This test case executes a series of simple (non-pipelined) POST requests + * with content length delimited content over multiple connections. + * It uses legacy handlers that are delegated through a {@link NBlockingHttpRequestHandler}, + * to test that the interaction with {@link AsyncNHttpServiceHandler} and the delegate + * handler works correctly. + */ + public void testSimpleHttpPostsWithContentLengthWithBlockingHandlers() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData.add(new ByteSequence()); + } + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + if (request instanceof HttpEntityEnclosingRequest) { + HttpEntity incoming = ((HttpEntityEnclosingRequest) request).getEntity(); + byte[] data = EntityUtils.toByteArray(incoming); + NByteArrayEntity outgoing = new NByteArrayEntity(data); + outgoing.setChunked(false); + response.setEntity(outgoing); + } else { + NStringEntity outgoing = new NStringEntity("No content"); + response.setEntity(outgoing); + } + } + + }; + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ByteSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpEntityEnclosingRequest post = null; + if (i < reqNo) { + post = new BasicHttpEntityEnclosingRequest("POST", "/?" + i); + + byte[] data = requestData.getBytes(i); + ByteArrayEntity outgoing = new ByteArrayEntity(data); + post.setEntity(outgoing); + + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return post; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ByteSequence list = (ByteSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + byte[] data = EntityUtils.toByteArray(entity); + list.addBytes(data); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + new NBlockingHttpRequestHandler(requestHandler, Executors.newCachedThreadPool()), + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < responseData.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData.get(i)); + } + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < responseData.size(); c++) { + ByteSequence receivedPackets = responseData.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), receivedPackets.size()); + for (int p = 0; p < requestData.size(); p++) { + byte[] expected = requestData.getBytes(p); + byte[] received = receivedPackets.getBytes(p); + + assertEquals(expected.length, received.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], received[i]); + } + } + } + } + + /** + * This test case executes a series of simple (non-pipelined) POST requests + * with content length delimited content over multiple connections. + * It uses purely asynchronous handlers. + */ + public void testSimpleHttpPostsWithContentLengthWithNonBlockingHandlers() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData.add(new ByteSequence()); + } + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + if (request instanceof HttpEntityEnclosingRequest) { + ConsumingNHttpEntity incoming = (ConsumingNHttpEntity)((HttpEntityEnclosingRequest) request).getEntity(); + incoming.setContentListener(new ConsumingNHttpEntity.ContentListener() { + final SimpleInputBuffer input = new SimpleInputBuffer(2048, new HeapByteBufferAllocator()); + @Override + public int contentAvailable(ContentDecoder decoder, + IOControl ioctrl) throws IOException { + int read = input.consumeContent(decoder); + if(decoder.isCompleted()) { + byte[] b = new byte[input.length()]; + input.read(b); + response.setEntity(new NByteArrayEntity(b)); + } + return read; + } + @Override + public void finish() { + input.reset(); + } + }); + } else { + NStringEntity outgoing = new NStringEntity("No content"); + response.setEntity(outgoing); + } + } + + }; + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ByteSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpEntityEnclosingRequest post = null; + if (i < reqNo) { + post = new BasicHttpEntityEnclosingRequest("POST", "/?" + i); + + byte[] data = requestData.getBytes(i); + ByteArrayEntity outgoing = new ByteArrayEntity(data); + post.setEntity(outgoing); + + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return post; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ByteSequence list = (ByteSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + byte[] data = EntityUtils.toByteArray(entity); + list.addBytes(data); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < responseData.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData.get(i)); + } + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < responseData.size(); c++) { + ByteSequence receivedPackets = responseData.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), receivedPackets.size()); + for (int p = 0; p < requestData.size(); p++) { + byte[] expected = requestData.getBytes(p); + byte[] received = receivedPackets.getBytes(p); + + assertEquals(expected.length, received.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], received[i]); + } + } + } + + } + + + /** + * This test case executes a series of simple (non-pipelined) POST requests + * with chunk coded content content over multiple connections. This tests + * with blocking handlers & blocking entities. + */ + public void testSimpleHttpPostsChunkedWithBlockingHandlers() throws Exception { + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData.add(new ByteSequence()); + } + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + if (request instanceof HttpEntityEnclosingRequest) { + HttpEntity incoming = ((HttpEntityEnclosingRequest) request).getEntity(); + byte[] data = EntityUtils.toByteArray(incoming); + ByteArrayEntity outgoing = new ByteArrayEntity(data); + outgoing.setChunked(true); + response.setEntity(outgoing); + } else { + StringEntity outgoing = new StringEntity("No content"); + response.setEntity(outgoing); + } + } + + }; + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ByteSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpEntityEnclosingRequest post = null; + if (i < reqNo) { + post = new BasicHttpEntityEnclosingRequest("POST", "/?" + i); + byte[] data = requestData.getBytes(i); + ByteArrayEntity outgoing = new ByteArrayEntity(data); + outgoing.setChunked(true); + post.setEntity(outgoing); + + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return post; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ByteSequence list = (ByteSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + byte[] data = EntityUtils.toByteArray(entity); + list.addBytes(data); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + new NBlockingHttpRequestHandler(requestHandler, Executors.newCachedThreadPool()), + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < responseData.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData.get(i)); + } + + requestCount.await(10000); + if (requestCount.isAborted()) { + System.out.println("Test case aborted"); + } + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < responseData.size(); c++) { + ByteSequence receivedPackets = responseData.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), receivedPackets.size()); + for (int p = 0; p < requestData.size(); p++) { + byte[] expected = requestData.getBytes(p); + byte[] received = receivedPackets.getBytes(p); + + assertEquals(expected.length, received.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], received[i]); + } + } + } + + } + + /** + * This test case executes a series of simple (non-pipelined) POST requests + * with chunk coded content content over multiple connections. This tests + * with nonblocking handlers & nonblocking entities. + */ + public void testSimpleHttpPostsChunkedWithNonBlockingHandlers() throws Exception { + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData.add(new ByteSequence()); + } + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + if (request instanceof HttpEntityEnclosingRequest) { + ConsumingNHttpEntity incoming = (ConsumingNHttpEntity)((HttpEntityEnclosingRequest) request).getEntity(); + incoming.setContentListener(new ConsumingNHttpEntity.ContentListener() { + final SimpleInputBuffer input = new SimpleInputBuffer(2048, new HeapByteBufferAllocator()); + @Override + public int contentAvailable(ContentDecoder decoder, IOControl ioctrl) throws IOException { + int read = input.consumeContent(decoder); + if(decoder.isCompleted()) { + byte[] data = new byte[input.length()]; + input.read(data); + NByteArrayEntity outgoing = new NByteArrayEntity(data); + outgoing.setChunked(true); + response.setEntity(outgoing); + } + return read; + } + @Override + public void finish() { + input.reset(); + } + }); + } else { + NStringEntity outgoing = new NStringEntity("No content"); + response.setEntity(outgoing); + } + } + + }; + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ByteSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpEntityEnclosingRequest post = null; + if (i < reqNo) { + post = new BasicHttpEntityEnclosingRequest("POST", "/?" + i); + byte[] data = requestData.getBytes(i); + ByteArrayEntity outgoing = new ByteArrayEntity(data); + outgoing.setChunked(true); + post.setEntity(outgoing); + + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return post; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ByteSequence list = (ByteSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + byte[] data = EntityUtils.toByteArray(entity); + list.addBytes(data); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < responseData.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData.get(i)); + } + + requestCount.await(10000); + if (requestCount.isAborted()) { + System.out.println("Test case aborted"); + } + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < responseData.size(); c++) { + ByteSequence receivedPackets = responseData.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), receivedPackets.size()); + for (int p = 0; p < requestData.size(); p++) { + byte[] expected = requestData.getBytes(p); + byte[] received = receivedPackets.getBytes(p); + + assertEquals(expected.length, received.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], received[i]); + } + } + } + + } + + /** + * This test case executes a series of simple (non-pipelined) HTTP/1.0 + * POST requests over multiple persistent connections. This tests with blocking + * handlers & entities. + */ + public void testSimpleHttpPostsHTTP10WithBlockingHandlers() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData.add(new ByteSequence()); + } + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + if (request instanceof HttpEntityEnclosingRequest) { + HttpEntity incoming = ((HttpEntityEnclosingRequest) request).getEntity(); + byte[] data = EntityUtils.toByteArray(incoming); + + ByteArrayEntity outgoing = new ByteArrayEntity(data); + outgoing.setChunked(false); + response.setEntity(outgoing); + } else { + StringEntity outgoing = new StringEntity("No content"); + response.setEntity(outgoing); + } + } + + }; + + // Set protocol level to HTTP/1.0 + this.client.getParams().setParameter( + CoreProtocolPNames.PROTOCOL_VERSION, HttpVersion.HTTP_1_0); + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ByteSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpEntityEnclosingRequest post = null; + if (i < reqNo) { + post = new BasicHttpEntityEnclosingRequest("POST", "/?" + i); + byte[] data = requestData.getBytes(i); + ByteArrayEntity outgoing = new ByteArrayEntity(data); + post.setEntity(outgoing); + + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return post; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ByteSequence list = (ByteSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + byte[] data = EntityUtils.toByteArray(entity); + list.addBytes(data); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + new NBlockingHttpRequestHandler(requestHandler, Executors.newCachedThreadPool()), + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < responseData.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData.get(i)); + } + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < responseData.size(); c++) { + ByteSequence receivedPackets = responseData.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), receivedPackets.size()); + for (int p = 0; p < requestData.size(); p++) { + byte[] expected = requestData.getBytes(p); + byte[] received = receivedPackets.getBytes(p); + + assertEquals(expected.length, received.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], received[i]); + } + } + } + + } + + /** + * This test case executes a series of simple (non-pipelined) HTTP/1.0 + * POST requests over multiple persistent connections. This tests with nonblocking + * handlers & entities. + */ + public void testSimpleHttpPostsHTTP10WithNonBlockingHandlers() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData.add(new ByteSequence()); + } + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + if (request instanceof HttpEntityEnclosingRequest) { + ConsumingNHttpEntity incoming = (ConsumingNHttpEntity)((HttpEntityEnclosingRequest) request).getEntity(); + incoming.setContentListener(new ConsumingNHttpEntity.ContentListener() { + final SimpleInputBuffer input = new SimpleInputBuffer(2048, new HeapByteBufferAllocator()); + @Override + public int contentAvailable(ContentDecoder decoder, IOControl ioctrl) throws IOException { + int read = input.consumeContent(decoder); + if(decoder.isCompleted()) { + byte[] data = new byte[input.length()]; + input.read(data); + NByteArrayEntity outgoing = new NByteArrayEntity(data); + outgoing.setChunked(false); + response.setEntity(outgoing); + } + return read; + } + @Override + public void finish() { + input.reset(); + } + }); + } else { + StringEntity outgoing = new StringEntity("No content"); + response.setEntity(outgoing); + } + } + + }; + + // Set protocol level to HTTP/1.0 + this.client.getParams().setParameter( + CoreProtocolPNames.PROTOCOL_VERSION, HttpVersion.HTTP_1_0); + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ByteSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpEntityEnclosingRequest post = null; + if (i < reqNo) { + post = new BasicHttpEntityEnclosingRequest("POST", "/?" + i); + byte[] data = requestData.getBytes(i); + ByteArrayEntity outgoing = new ByteArrayEntity(data); + post.setEntity(outgoing); + + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return post; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ByteSequence list = (ByteSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + byte[] data = EntityUtils.toByteArray(entity); + list.addBytes(data); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < responseData.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData.get(i)); + } + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < responseData.size(); c++) { + ByteSequence receivedPackets = responseData.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), receivedPackets.size()); + for (int p = 0; p < requestData.size(); p++) { + byte[] expected = requestData.getBytes(p); + byte[] received = receivedPackets.getBytes(p); + + assertEquals(expected.length, received.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], received[i]); + } + } + } + + } + + /** + * This test case executes a series of simple (non-pipelined) POST requests + * over multiple connections using the 'expect: continue' handshake. + * This test uses blocking handlers & entities. + */ + public void testHttpPostsWithExpectContinueWithBlockingHandlers() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData.add(new ByteSequence()); + } + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + if (request instanceof HttpEntityEnclosingRequest) { + HttpEntity incoming = ((HttpEntityEnclosingRequest) request).getEntity(); + byte[] data = EntityUtils.toByteArray(incoming); + ByteArrayEntity outgoing = new ByteArrayEntity(data); + outgoing.setChunked(true); + response.setEntity(outgoing); + } else { + StringEntity outgoing = new StringEntity("No content"); + response.setEntity(outgoing); + } + } + + }; + + // Activate 'expect: continue' handshake + this.client.getParams().setBooleanParameter(CoreProtocolPNames.USE_EXPECT_CONTINUE, true); + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ByteSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpEntityEnclosingRequest post = null; + if (i < reqNo) { + post = new BasicHttpEntityEnclosingRequest("POST", "/?" + i); + byte[] data = requestData.getBytes(i); + ByteArrayEntity outgoing = new ByteArrayEntity(data); + outgoing.setChunked(true); + post.setEntity(outgoing); + + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return post; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ByteSequence list = (ByteSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + byte[] data = EntityUtils.toByteArray(entity); + list.addBytes(data); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + new NBlockingHttpRequestHandler(requestHandler, Executors.newCachedThreadPool()), + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < responseData.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData.get(i)); + } + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < responseData.size(); c++) { + ByteSequence receivedPackets = responseData.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), receivedPackets.size()); + for (int p = 0; p < requestData.size(); p++) { + byte[] expected = requestData.getBytes(p); + byte[] received = receivedPackets.getBytes(p); + + assertEquals(expected.length, received.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], received[i]); + } + } + } + + } + + /** + * This test case executes a series of simple (non-pipelined) POST requests + * over multiple connections using the 'expect: continue' handshake. This test + * uses nonblocking handlers & entities. + */ + public void testHttpPostsWithExpectContinueWithNonBlockingHandlers() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData.add(new ByteSequence()); + } + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + if (request instanceof HttpEntityEnclosingRequest) { + ConsumingNHttpEntity incoming = (ConsumingNHttpEntity)((HttpEntityEnclosingRequest) request).getEntity(); + incoming.setContentListener(new ConsumingNHttpEntity.ContentListener() { + final SimpleInputBuffer input = new SimpleInputBuffer(2048, new HeapByteBufferAllocator()); + @Override + public int contentAvailable(ContentDecoder decoder, IOControl ioctrl) throws IOException { + int read = input.consumeContent(decoder); + if(decoder.isCompleted()) { + byte[] data = new byte[input.length()]; + input.read(data); + NByteArrayEntity outgoing = new NByteArrayEntity(data); + outgoing.setChunked(true); + response.setEntity(outgoing); + } + return read; + } + @Override + public void finish() { + input.reset(); + } + }); + } else { + StringEntity outgoing = new StringEntity("No content"); + response.setEntity(outgoing); + } + } + + }; + + // Activate 'expect: continue' handshake + this.client.getParams().setBooleanParameter(CoreProtocolPNames.USE_EXPECT_CONTINUE, true); + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ByteSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpEntityEnclosingRequest post = null; + if (i < reqNo) { + post = new BasicHttpEntityEnclosingRequest("POST", "/?" + i); + byte[] data = requestData.getBytes(i); + ByteArrayEntity outgoing = new ByteArrayEntity(data); + outgoing.setChunked(true); + post.setEntity(outgoing); + + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return post; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ByteSequence list = (ByteSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + byte[] data = EntityUtils.toByteArray(entity); + list.addBytes(data); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < responseData.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData.get(i)); + } + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < responseData.size(); c++) { + ByteSequence receivedPackets = responseData.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), receivedPackets.size()); + for (int p = 0; p < requestData.size(); p++) { + byte[] expected = requestData.getBytes(p); + byte[] received = receivedPackets.getBytes(p); + + assertEquals(expected.length, received.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], received[i]); + } + } + } + + } + + /** + * This test case executes a series of simple (non-pipelined) POST requests + * over multiple connections that do not meet the target server expectations. + * This test uses blocking entities. + */ + public void testHttpPostsWithExpectationVerificationWithBlockingEntity() throws Exception { + + final int reqNo = 3; + final RequestCount requestCount = new RequestCount(reqNo); + final ResponseSequence responses = new ResponseSequence(); + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + StringEntity outgoing = new StringEntity("No content"); + response.setEntity(outgoing); + } + + }; + + HttpExpectationVerifier expectationVerifier = new HttpExpectationVerifier() { + + public void verify( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException { + Header someheader = request.getFirstHeader("Secret"); + if (someheader != null) { + int secretNumber; + try { + secretNumber = Integer.parseInt(someheader.getValue()); + } catch (NumberFormatException ex) { + response.setStatusCode(HttpStatus.SC_BAD_REQUEST); + return; + } + if (secretNumber < 2) { + response.setStatusCode(HttpStatus.SC_EXPECTATION_FAILED); + ByteArrayEntity outgoing = new ByteArrayEntity( + EncodingUtils.getAsciiBytes("Wrong secret number")); + response.setEntity(outgoing); + } + } + } + + }; + + // Activate 'expect: continue' handshake + this.client.getParams().setBooleanParameter(CoreProtocolPNames.USE_EXPECT_CONTINUE, true); + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ResponseSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpEntityEnclosingRequest post = null; + if (i < reqNo) { + post = new BasicHttpEntityEnclosingRequest("POST", "/"); + post.addHeader("Secret", Integer.toString(i)); + ByteArrayEntity outgoing = new ByteArrayEntity( + EncodingUtils.getAsciiBytes("No content")); + post.setEntity(outgoing); + + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return post; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ResponseSequence list = (ResponseSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + HttpEntity entity = response.getEntity(); + if (entity != null) { + try { + entity.consumeContent(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + } + + list.addResponse(response); + requestCount.decrement(); + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + expectationVerifier); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responses); + + requestCount.await(1000); + + this.client.shutdown(); + this.server.shutdown(); + + assertEquals(reqNo, responses.size()); + HttpResponse response = responses.getResponse(0); + assertEquals(HttpStatus.SC_EXPECTATION_FAILED, response.getStatusLine().getStatusCode()); + response = responses.getResponse(1); + assertEquals(HttpStatus.SC_EXPECTATION_FAILED, response.getStatusLine().getStatusCode()); + response = responses.getResponse(2); + assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode()); + } + + /** + * This test case executes a series of simple (non-pipelined) POST requests + * over multiple connections that do not meet the target server expectations. + * This test uses nonblocking entities. + */ + public void testHttpPostsWithExpectationVerificationWithNonBlockingEntity() throws Exception { + + final int reqNo = 3; + final RequestCount requestCount = new RequestCount(reqNo); + final ResponseSequence responses = new ResponseSequence(); + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + NStringEntity outgoing = new NStringEntity("No content"); + response.setEntity(outgoing); + } + + }; + + HttpExpectationVerifier expectationVerifier = new HttpExpectationVerifier() { + + public void verify( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException { + Header someheader = request.getFirstHeader("Secret"); + if (someheader != null) { + int secretNumber; + try { + secretNumber = Integer.parseInt(someheader.getValue()); + } catch (NumberFormatException ex) { + response.setStatusCode(HttpStatus.SC_BAD_REQUEST); + return; + } + if (secretNumber < 2) { + response.setStatusCode(HttpStatus.SC_EXPECTATION_FAILED); + ByteArrayEntity outgoing = new ByteArrayEntity( + EncodingUtils.getAsciiBytes("Wrong secret number")); + response.setEntity(outgoing); + } + } + } + + }; + + // Activate 'expect: continue' handshake + this.client.getParams().setBooleanParameter(CoreProtocolPNames.USE_EXPECT_CONTINUE, true); + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ResponseSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpEntityEnclosingRequest post = null; + if (i < reqNo) { + post = new BasicHttpEntityEnclosingRequest("POST", "/"); + post.addHeader("Secret", Integer.toString(i)); + ByteArrayEntity outgoing = new ByteArrayEntity( + EncodingUtils.getAsciiBytes("No content")); + post.setEntity(outgoing); + + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return post; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ResponseSequence list = (ResponseSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + HttpEntity entity = response.getEntity(); + if (entity != null) { + try { + entity.consumeContent(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + } + + list.addResponse(response); + requestCount.decrement(); + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + expectationVerifier); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responses); + + requestCount.await(1000); + + this.client.shutdown(); + this.server.shutdown(); + + assertEquals(reqNo, responses.size()); + HttpResponse response = responses.getResponse(0); + assertEquals(HttpStatus.SC_EXPECTATION_FAILED, response.getStatusLine().getStatusCode()); + response = responses.getResponse(1); + assertEquals(HttpStatus.SC_EXPECTATION_FAILED, response.getStatusLine().getStatusCode()); + response = responses.getResponse(2); + assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode()); + } + + /** + * This test case executes a series of simple (non-pipelined) HEAD requests + * over multiple connections. + */ + public void testSimpleHttpHeadsWithBlockingEntities() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo * 2); + + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData1 = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData1.add(new ResponseSequence()); + } + List responseData2 = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData2.add(new ResponseSequence()); + } + + final String[] method = new String[1]; + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + String s = request.getRequestLine().getUri(); + URI uri; + try { + uri = new URI(s); + } catch (URISyntaxException ex) { + throw new HttpException("Invalid request URI: " + s); + } + int index = Integer.parseInt(uri.getQuery()); + + byte[] data = requestData.getBytes(index); + ByteArrayEntity entity = new ByteArrayEntity(data); + response.setEntity(entity); + } + + }; + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ResponseSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpRequest request = null; + if (i < reqNo) { + request = new BasicHttpRequest(method[0], "/?" + i); + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return request; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ResponseSequence list = (ResponseSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + list.addResponse(response); + requestCount.decrement(); + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + method[0] = "GET"; + + for (int i = 0; i < responseData1.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData1.get(i)); + } + + requestCount.await(connNo * reqNo, 10000); + assertEquals(connNo * reqNo, requestCount.getValue()); + + method[0] = "HEAD"; + + for (int i = 0; i < responseData2.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData2.get(i)); + } + + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < connNo; c++) { + ResponseSequence getResponses = responseData1.get(c); + ResponseSequence headResponses = responseData2.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), headResponses.size()); + assertEquals(expectedPackets.size(), getResponses.size()); + for (int p = 0; p < requestData.size(); p++) { + HttpResponse getResponse = getResponses.getResponse(p); + HttpResponse headResponse = headResponses.getResponse(p); + assertEquals(null, headResponse.getEntity()); + + Header[] getHeaders = getResponse.getAllHeaders(); + Header[] headHeaders = headResponse.getAllHeaders(); + assertEquals(getHeaders.length, headHeaders.length); + for (int j = 0; j < getHeaders.length; j++) { + if ("Date".equals(getHeaders[j].getName())) { + continue; + } + assertEquals(getHeaders[j].toString(), headHeaders[j].toString()); + } + } + } + } + + /** + * This test case executes a series of simple (non-pipelined) HEAD requests + * over multiple connections. With nonblocking entities. + */ + public void testSimpleHttpHeadsWithNonBlockingEntities() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo * 2); + + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData1 = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData1.add(new ResponseSequence()); + } + List responseData2 = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData2.add(new ResponseSequence()); + } + + final String[] method = new String[1]; + + HttpRequestHandler requestHandler = new HttpRequestHandler() { + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + String s = request.getRequestLine().getUri(); + URI uri; + try { + uri = new URI(s); + } catch (URISyntaxException ex) { + throw new HttpException("Invalid request URI: " + s); + } + int index = Integer.parseInt(uri.getQuery()); + + byte[] data = requestData.getBytes(index); + NByteArrayEntity entity = new NByteArrayEntity(data); + response.setEntity(entity); + } + + }; + + HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ResponseSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpRequest request = null; + if (i < reqNo) { + request = new BasicHttpRequest(method[0], "/?" + i); + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return request; + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ResponseSequence list = (ResponseSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + list.addResponse(response); + requestCount.decrement(); + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + method[0] = "GET"; + + for (int i = 0; i < responseData1.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData1.get(i)); + } + + requestCount.await(connNo * reqNo, 10000); + assertEquals(connNo * reqNo, requestCount.getValue()); + + method[0] = "HEAD"; + + for (int i = 0; i < responseData2.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData2.get(i)); + } + + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < connNo; c++) { + ResponseSequence getResponses = responseData1.get(c); + ResponseSequence headResponses = responseData2.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), headResponses.size()); + assertEquals(expectedPackets.size(), getResponses.size()); + for (int p = 0; p < requestData.size(); p++) { + HttpResponse getResponse = getResponses.getResponse(p); + HttpResponse headResponse = headResponses.getResponse(p); + assertEquals(null, headResponse.getEntity()); + + Header[] getHeaders = getResponse.getAllHeaders(); + Header[] headHeaders = headResponse.getAllHeaders(); + assertEquals(getHeaders.length, headHeaders.length); + for (int j = 0; j < getHeaders.length; j++) { + if ("Date".equals(getHeaders[j].getName())) { + continue; + } + assertEquals(getHeaders[j].toString(), headHeaders[j].toString()); + } + } + } + } + +} Property changes on: module-nio/src/test/java/org/apache/http/nio/protocol/TestAsyncNHttpHandlers.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/ContentEncoderChannel.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/ContentEncoderChannel.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/ContentEncoderChannel.java (revision 0) @@ -0,0 +1,33 @@ +package org.apache.http.nio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +import org.apache.http.nio.ContentEncoder; + +/** + * A {@link WritableByteChannel} that delegates to a {@link ContentEncoder}. + * Attempts to close this channel are ignored, and isOpen always returns true. + * + * @author Sam Berlin + */ +public class ContentEncoderChannel implements WritableByteChannel { + + private final ContentEncoder contentEncoder; + + public ContentEncoderChannel(ContentEncoder contentEncoder) { + this.contentEncoder = contentEncoder; + } + + public int write(ByteBuffer src) throws IOException { + return contentEncoder.write(src); + } + + public void close() {} + + public boolean isOpen() { + return true; + } + +} Property changes on: module-nio/src/main/java/org/apache/http/nio/ContentEncoderChannel.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/protocol/BlockingHandler.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/protocol/BlockingHandler.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/protocol/BlockingHandler.java (revision 0) @@ -0,0 +1,53 @@ +package org.apache.http.nio.protocol; + +import java.io.IOException; + +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.nio.entity.ConsumingNHttpEntity; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpRequestHandler; + +/** + * Allows an {@link HttpRequestHandler} to handle + * {@link ConsumingNHttpEntity ConsumingNHttpEntities} and notify when a + * response should be sent. This is necessary to allow for multi-threaded + * interaction between notifying content is available in one thread & reading it + * from another. This ensures that a response is sent only after the response + * has been used and an entity is prepared. + * + * @see AsyncNHttpServiceHandler + * + * @author Sam Berlin + */ +public interface BlockingHandler { + + /** + * Analog to {@link HttpRequestHandler#handle(HttpRequest, HttpResponse, HttpContext)}, except + * it provides an additional {@link ResponseListener} that must be notified when the handling + * is done. + */ + void handle(HttpRequest request, HttpResponse response, + HttpContext context, ResponseListener responseListener) + throws HttpException, IOException; + + /** A listener to notify when a response has been prepared. */ + public static interface ResponseListener { + /** Notification that a response is now ready to be sent. */ + void responseReady(); + + /** + * Notification that an exception occurred while processing the request + * or building the response entity. + */ + void handleException(HttpException httpException); + + /** + * Notification that an exception occurred while processing the request + * or building the response entity. + */ + void handleException(IOException ioxException); + } + +} Property changes on: module-nio/src/main/java/org/apache/http/nio/protocol/BlockingHandler.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/protocol/AsyncNHttpServiceHandler.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/protocol/AsyncNHttpServiceHandler.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/protocol/AsyncNHttpServiceHandler.java (revision 0) @@ -0,0 +1,604 @@ +/* + * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/BufferingHttpServiceHandler.java $ + * $Revision: 613298 $ + * $Date: 2008-01-18 17:09:22 -0500 (Fri, 18 Jan 2008) $ + * + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.http.nio.protocol; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.http.ConnectionReuseStrategy; +import org.apache.http.HttpConnection; +import org.apache.http.HttpEntity; +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.HttpResponseFactory; +import org.apache.http.HttpStatus; +import org.apache.http.HttpVersion; +import org.apache.http.MethodNotSupportedException; +import org.apache.http.ProtocolException; +import org.apache.http.ProtocolVersion; +import org.apache.http.UnsupportedHttpVersionException; +import org.apache.http.impl.nio.DefaultNHttpServerConnection; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.ContentEncoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.NHttpServerConnection; +import org.apache.http.nio.NHttpServiceHandler; +import org.apache.http.nio.entity.BasicConsumingNHttpEntity; +import org.apache.http.nio.entity.ConsumingNHttpEntity; +import org.apache.http.nio.entity.NBlockingProducingNHttpEntity; +import org.apache.http.nio.entity.NByteArrayEntity; +import org.apache.http.nio.entity.NHttpEntity; +import org.apache.http.nio.entity.ProducingNHttpEntity; +import org.apache.http.nio.entity.ConsumingNHttpEntity.ContentListener; +import org.apache.http.nio.protocol.BlockingHandler.ResponseListener; +import org.apache.http.nio.util.ByteBufferAllocator; +import org.apache.http.nio.util.HeapByteBufferAllocator; +import org.apache.http.params.DefaultedHttpParams; +import org.apache.http.params.HttpParams; +import org.apache.http.protocol.ExecutionContext; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpProcessor; +import org.apache.http.protocol.HttpRequestHandler; +import org.apache.http.util.EncodingUtils; + +/** + * HTTP service handler implementation that works with + * {@link NHttpEntity NHttpEntites}. The contents of HTTP headers are stored in + * memory, HTTP entities are streamed directly from the NHttpEntity to the + * underlying channel (and vice versa). + *

+ * When using this {@link NHttpServiceHandler}, it is important to ensure + * that entities supplied for writing implement {@link ProducingNHttpEntity}. + * Doing so will allow the entity to be written out asynchronously. + * If entities supplied for writing do not implement + * ProducingNHttpEntity, a delegate is added that buffers the entire + * contents in memory. Additionally, the buffering might take place in the I/O + * thread, which could cause I/O to block temporarily. For best results, ensure + * that all entities set on {@link HttpResponse HttpResponses} from + * {@link HttpRequestHandler HttpRequestHandlers} implement + * ProducingNHttpEntity. + *

+ * If incoming requests have entities, HttpRequestHandlers must not call + * {@link HttpEntity#getContent()}. Doing so will throw an + * {@link UnsupportedOperationException}. Instead, handlers must expect that + * incoming entities implement {@link ConsumingNHttpEntity} and install a + * {@link ContentListener} on those entities. The ContentListener will be + * notified when new data is available for reading. After all data has been + * read, the response will automatically be sent. + *

+ * To support legacy HttpRequestHandlers that do use getContent(), you can + * wrap the handler within a {@link NBlockingHttpRequestHandler}. Doing so + * will allow the handler to be processed on a new thread, in a blocking + * manner. + *

+ * + * @author Oleg Kalnichevski + * @author Sam Berlin + * @author Steffen Pingel + * + */ +public class AsyncNHttpServiceHandler extends NHttpServiceHandlerBase + implements NHttpServiceHandler { + + public AsyncNHttpServiceHandler(final HttpProcessor httpProcessor, + final HttpResponseFactory responseFactory, + final ConnectionReuseStrategy connStrategy, + final ByteBufferAllocator allocator, final HttpParams params) { + super(httpProcessor, responseFactory, connStrategy, allocator, params); + } + + public AsyncNHttpServiceHandler(final HttpProcessor httpProcessor, + final HttpResponseFactory responseFactory, + final ConnectionReuseStrategy connStrategy, final HttpParams params) { + this(httpProcessor, responseFactory, connStrategy, + new HeapByteBufferAllocator(), params); + } + + + public void connected(final NHttpServerConnection conn) { + HttpContext context = conn.getContext(); + + ServerConnState connState = new ServerConnState(); + context.setAttribute(CONN_STATE, connState); + + if (this.eventListener != null) { + this.eventListener.connectionOpen(conn); + } + } + + public void requestReceived(final NHttpServerConnection conn) { + HttpContext context = conn.getContext(); + + HttpRequest request = conn.getHttpRequest(); + request.setParams(new DefaultedHttpParams(request.getParams(), this.params)); + + ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE); + + // Update connection state + connState.resetInput(); + connState.setRequest(request); + + ProtocolVersion ver = request.getRequestLine().getProtocolVersion(); + // TODO: should this be greaterEquals? + if (!ver.lessEquals(HttpVersion.HTTP_1_1)) { + // Downgrade protocol version if greater than HTTP/1.1 + ver = HttpVersion.HTTP_1_1; + } + + HttpResponse response; + + try { + + if (request instanceof HttpEntityEnclosingRequest) { + if (((HttpEntityEnclosingRequest) request).expectContinue()) { + response = this.responseFactory.newHttpResponse( + ver, HttpStatus.SC_CONTINUE, context); + response.setParams( + new DefaultedHttpParams(response.getParams(), this.params)); + + if (this.expectationVerifier != null) { + try { + this.expectationVerifier.verify(request, response, context); + } catch (HttpException ex) { + response = this.responseFactory.newHttpResponse( + HttpVersion.HTTP_1_0, + HttpStatus.SC_INTERNAL_SERVER_ERROR, + context); + response.setParams( + new DefaultedHttpParams(response.getParams(), this.params)); + handleException(ex, response); + } + } + + if (response.getStatusLine().getStatusCode() < 200) { + // Send 1xx response indicating the server expections + // have been met + conn.submitResponse(response); + } else { + // The request does not meet the server expections + conn.resetInput(); + connState.resetInput(); + sendResponse(conn, response); + } + } + // Request content is expected. + // Wait until the request content is fully received + } else { + // No request content is expected. + // Process request right away + conn.suspendInput(); + processRequest(conn, request); + sendResponse(conn, connState.getResponse()); + } + + } catch (IOException ex) { + shutdownConnection(conn, ex); + if (this.eventListener != null) { + this.eventListener.fatalIOException(ex, conn); + } + } catch (HttpException ex) { + closeConnection(conn, ex); + if (this.eventListener != null) { + this.eventListener.fatalProtocolException(ex, conn); + } + } + + } + + public void closed(final NHttpServerConnection conn) { + ServerConnState connState = (ServerConnState) conn.getContext().getAttribute(CONN_STATE); + connState.resetOutput(); + + if (this.eventListener != null) { + this.eventListener.connectionClosed(conn); + } + } + + public void exception(final NHttpServerConnection conn, final HttpException httpex) { + if (conn.isResponseSubmitted()) { + closeConnection(conn, httpex); // TODO: Is this correct? + if (eventListener != null) { + eventListener.fatalProtocolException(httpex, conn); + } + return; + } + + HttpContext context = conn.getContext(); + try { + HttpResponse response = this.responseFactory.newHttpResponse( + HttpVersion.HTTP_1_0, HttpStatus.SC_INTERNAL_SERVER_ERROR, context); + response.setParams( + new DefaultedHttpParams(response.getParams(), this.params)); + handleException(httpex, response); + response.setEntity(null); + sendResponse(conn, response); + + } catch (IOException ex) { + shutdownConnection(conn, ex); + if (this.eventListener != null) { + this.eventListener.fatalIOException(ex, conn); + } + } catch (HttpException ex) { + closeConnection(conn, ex); + if (this.eventListener != null) { + this.eventListener.fatalProtocolException(ex, conn); + } + } + } + + public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) { + HttpRequest request = conn.getHttpRequest(); + ServerConnState connState = (ServerConnState) conn.getContext().getAttribute(CONN_STATE); + HttpResponse response = connState.getResponse(); + + try { + if(response == null) { + HttpEntityEnclosingRequest entityReq = (HttpEntityEnclosingRequest) request; + // TODO: Can entityReq.getEntity() ever be null here? What to do if it is? + BasicConsumingNHttpEntity entity = new BasicConsumingNHttpEntity(entityReq.getEntity()); + entityReq.setEntity(entity); + connState.setConsumingEntity(entity); + processRequest(conn, request); + // If no listener was installed, install a listener that skips the content. + if(entity.getContentListener() == null) { + entity.setContentListener(new SkipContentListener(allocator)); + } + } + + ConsumingNHttpEntity entity = connState.getConsumingEntity(); + entity.consumeContent(decoder, conn); + + if (decoder.isCompleted()) { + conn.suspendInput(); + // If there is a blocking handler, it will notify us + // when the response is ready -- othewise we may be sending + // it before they have set an entity. + if(!connState.isHandlerBlocking()) + sendResponse(conn, connState.getResponse()); + } + + } catch (IOException ex) { + shutdownConnection(conn, ex); + if (this.eventListener != null) { + this.eventListener.fatalIOException(ex, conn); + } + } catch (HttpException ex) { + closeConnection(conn, ex); + if (this.eventListener != null) { + this.eventListener.fatalProtocolException(ex, conn); + } + } + } + + public void responseReady(final NHttpServerConnection conn) { + if (conn.isOpen()) { + conn.requestInput(); + // make sure any buffered requests are processed + if (((DefaultNHttpServerConnection)conn).hasBufferedInput()) { + ((DefaultNHttpServerConnection)conn).consumeInput(this); + } + } + } + + public void outputReady(final NHttpServerConnection conn, final ContentEncoder encoder) { + HttpContext context = conn.getContext(); + HttpResponse response = conn.getHttpResponse(); + + ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE); + + try { + ProducingNHttpEntity entity = connState.getProducingEntity(); + entity.produceContent(encoder, conn); + + if (encoder.isCompleted()) { + connState.resetOutput(); + if (!this.connStrategy.keepAlive(response, context)) { + conn.close(); + } + } + + } catch (IOException ex) { + shutdownConnection(conn, ex); + if (this.eventListener != null) { + this.eventListener.fatalIOException(ex, conn); + } + } + } + + @Override + protected void closeConnection(final HttpConnection conn, final Throwable cause) { + ServerConnState connState = (ServerConnState) ((NHttpServerConnection)conn).getContext().getAttribute(CONN_STATE); + connState.resetOutput(); + + super.closeConnection(conn, cause); + } + + @Override + protected void shutdownConnection(HttpConnection conn, Throwable cause) { + ServerConnState connState = (ServerConnState) ((NHttpServerConnection)conn).getContext().getAttribute(CONN_STATE); + connState.resetOutput(); + + super.shutdownConnection(conn, cause); + } + + private void handleException(final HttpException ex, final HttpResponse response) { + int code = HttpStatus.SC_INTERNAL_SERVER_ERROR; + if (ex instanceof MethodNotSupportedException) { + code = HttpStatus.SC_NOT_IMPLEMENTED; + } else if (ex instanceof UnsupportedHttpVersionException) { + code = HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED; + } else if (ex instanceof ProtocolException) { + code = HttpStatus.SC_BAD_REQUEST; + } + response.setStatusCode(code); + + byte[] msg = EncodingUtils.getAsciiBytes(ex.getMessage()); + NByteArrayEntity entity = new NByteArrayEntity(msg); + entity.setContentType("text/plain; charset=US-ASCII"); + response.setEntity(entity); + } + + private void processRequest( + final NHttpServerConnection conn, + final HttpRequest request) throws IOException, HttpException { + + HttpContext context = conn.getContext(); + ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE); + ProtocolVersion ver = request.getRequestLine().getProtocolVersion(); + + // TODO: Should this be 'greaterEquals'? -- It looks like it's going + // to upgrade HTTP 1.0 to HTTP 1.1, not downgrade 1.2 to 1.1. + if (!ver.lessEquals(HttpVersion.HTTP_1_1)) { + // Downgrade protocol version if greater than HTTP/1.1 + ver = HttpVersion.HTTP_1_1; + } + + HttpResponse response = this.responseFactory.newHttpResponse( + ver, + HttpStatus.SC_OK, + conn.getContext()); + response.setParams( + new DefaultedHttpParams(response.getParams(), this.params)); + + context.setAttribute(ExecutionContext.HTTP_REQUEST, request); + context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn); + context.setAttribute(ExecutionContext.HTTP_RESPONSE, response); + + try { + + this.httpProcessor.process(request, context); + + HttpRequestHandler handler = null; + if (this.handlerResolver != null) { + String requestURI = request.getRequestLine().getUri(); + handler = this.handlerResolver.lookup(requestURI); + } + + connState.setHandlerBlocking(false); + if (handler != null) { + if(handler instanceof BlockingHandler && request instanceof HttpEntityEnclosingRequest && ((HttpEntityEnclosingRequest)request).getEntity() != null) { + connState.setHandlerBlocking(true); + ((BlockingHandler)handler).handle(request, response, context, new BlockingResponseListener(conn, this)); + } else { + handler.handle(request, response, context); + } + } else { + response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED); + } + + } catch (HttpException ex) { + response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_0, + HttpStatus.SC_INTERNAL_SERVER_ERROR, context); + response.setParams( + new DefaultedHttpParams(response.getParams(), this.params)); + handleException(ex, response); + } + + connState.setResponse(response); + } + + private void sendResponse(final NHttpServerConnection conn, + HttpResponse response) throws IOException, HttpException { + HttpContext context = conn.getContext(); + + ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE); + + this.httpProcessor.process(response, context); + + if (!canResponseHaveBody(connState.getRequest(), response)) { + response.setEntity(null); + } + + HttpEntity entity = response.getEntity(); + if (entity != null) { + if(entity instanceof ProducingNHttpEntity) + connState.setProducingEntity((ProducingNHttpEntity)entity); + else + connState.setProducingEntity(new NBlockingProducingNHttpEntity(entity, allocator)); + } else { + connState.resetOutput(); + if (!this.connStrategy.keepAlive(response, context)) { + conn.close(); + } + } + + connState.setResponse(null); + conn.submitResponse(response); + + } + + static class ServerConnState { + private volatile HttpRequest request; + private volatile HttpResponse response; + private volatile ConsumingNHttpEntity consumingEntity; + private volatile ProducingNHttpEntity producingEntity; + private volatile boolean handlerBlocking; + + public boolean isHandlerBlocking() { + return handlerBlocking; + } + + public void setHandlerBlocking(boolean blocking) { + this.handlerBlocking = blocking; + } + + public HttpResponse getResponse() { + return this.response; + } + + public void setResponse(HttpResponse response) { + this.response = response; + } + + public HttpRequest getRequest() { + return this.request; + } + + public ConsumingNHttpEntity getConsumingEntity() { + return consumingEntity; + } + + public void setConsumingEntity(ConsumingNHttpEntity entity) { + this.consumingEntity = entity; + } + + public ProducingNHttpEntity getProducingEntity() { + return this.producingEntity; + } + + public void setProducingEntity(ProducingNHttpEntity entity) { + this.producingEntity = entity; + } + + public void setRequest(final HttpRequest request) { + this.request = request; + } + + public void resetInput() { + this.request = null; + handlerBlocking = false; + if(consumingEntity != null) { + consumingEntity.finish(); + consumingEntity = null; + } + } + + public void resetOutput() { + response = null; + handlerBlocking = false; + if(producingEntity != null) { + producingEntity.finish(); + producingEntity = null; + } + } + } + + static class SkipContentListener implements ContentListener { + private final ByteBuffer buffer; + public SkipContentListener(ByteBufferAllocator allocator) { + this.buffer = allocator.allocate(2048); + } + + @Override + public int contentAvailable(ContentDecoder decoder, IOControl ioctrl) + throws IOException { + int totalRead = 0; + int lastRead; + do { + buffer.clear(); + lastRead = decoder.read(buffer); + if(lastRead > 0) + totalRead += lastRead; + } while(lastRead > 0); + return totalRead; + } + + @Override + public void finish() {} + } + + /** + * A response listener to help handle legacy HttpRequestHandlers that + * perform the handle in a new thread. + */ + static class BlockingResponseListener implements ResponseListener { + private final NHttpServerConnection conn; + private final AsyncNHttpServiceHandler handler; + + public BlockingResponseListener(NHttpServerConnection conn, AsyncNHttpServiceHandler handler) { + this.conn = conn; + this.handler = handler; + } + + @Override + public void handleException(HttpException httpException) { + HttpContext context = conn.getContext(); + HttpResponse response = handler.responseFactory.newHttpResponse(HttpVersion.HTTP_1_0, + HttpStatus.SC_INTERNAL_SERVER_ERROR, context); + response.setParams( + new DefaultedHttpParams(response.getParams(), handler.params)); + handler.handleException(httpException, response); + ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE); + connState.setResponse(response); + responseReady(); + } + + @Override + public void handleException(IOException ioxException) { + handler.shutdownConnection(conn, ioxException); + if (handler.eventListener != null) { + handler.eventListener.fatalIOException(ioxException, conn); + } + } + + @Override + public void responseReady() { + ServerConnState connState = (ServerConnState) conn.getContext().getAttribute(CONN_STATE); + try { + handler.sendResponse(conn, connState.getResponse()); + } catch (IOException ex) { + handler.shutdownConnection(conn, ex); + if (handler.eventListener != null) { + handler.eventListener.fatalIOException(ex, conn); + } + } catch (HttpException ex) { + handler.closeConnection(conn, ex); + if (handler.eventListener != null) { + handler.eventListener.fatalProtocolException(ex, conn); + } + } + } + } + +} Property changes on: module-nio/src/main/java/org/apache/http/nio/protocol/AsyncNHttpServiceHandler.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/protocol/NBlockingHttpRequestHandler.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/protocol/NBlockingHttpRequestHandler.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/protocol/NBlockingHttpRequestHandler.java (revision 0) @@ -0,0 +1,118 @@ +package org.apache.http.nio.protocol; + +import java.io.IOException; +import java.util.concurrent.Executor; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.entity.ConsumingNHttpEntity; +import org.apache.http.nio.entity.ContentBufferEntity; +import org.apache.http.nio.entity.ConsumingNHttpEntity.ContentListener; +import org.apache.http.nio.util.ByteBufferAllocator; +import org.apache.http.nio.util.ContentInputBuffer; +import org.apache.http.nio.util.HeapByteBufferAllocator; +import org.apache.http.nio.util.SharedInputBuffer; +import org.apache.http.protocol.ExecutionContext; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpRequestHandler; + +/** + * A {@link HttpRequestHandler} that is intended for use with + * {@link AsyncNHttpServiceHandler} and pre-existing HttpRequestHandlers that + * use {@link HttpEntity#getContent()}. Normally, calling getContent() on an + * entity from AsyncNHttpServiceHandler would throw an + * {@link UnsupportedOperationException}. This class will intercept the entity + * and allow getContent() to be called. However, a portion of the entity may be + * buffered in memory if classes do not quickly consume the content. + *

+ * Use this class sparingly. For best results, rewrite the handler to work with + * {@link ConsumingNHttpEntity}, and avoid using this class entirely. + * + * @author Sam Berlin + */ +public class NBlockingHttpRequestHandler implements HttpRequestHandler, BlockingHandler { + + private final HttpRequestHandler delegate; + private final Executor executor; + private final ByteBufferAllocator allocator; + + public NBlockingHttpRequestHandler(HttpRequestHandler delegate, + Executor executor) { + this(delegate, executor, new HeapByteBufferAllocator()); + } + + public NBlockingHttpRequestHandler(HttpRequestHandler delegate, + Executor executor, ByteBufferAllocator allocator) { + this.delegate = delegate; + this.executor = executor; + this.allocator = allocator; + } + + @Override + public void handle(HttpRequest request, HttpResponse response, + HttpContext context) throws HttpException, IOException { + delegate.handle(request, response, context); + } + + @Override + public void handle(final HttpRequest request, final HttpResponse response, + final HttpContext context, final ResponseListener responseListener) + throws HttpException, IOException { + if (request instanceof HttpEntityEnclosingRequest) { + HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); + if (entity instanceof ConsumingNHttpEntity) { + ConsumingNHttpEntity consumingEntity = (ConsumingNHttpEntity) entity; + IOControl ioctrl = (IOControl)context.getAttribute(ExecutionContext.HTTP_CONNECTION); + ContentReader reader = new ContentReader(allocator, ioctrl); + consumingEntity.setContentListener(reader); + entity = new ContentBufferEntity(consumingEntity, reader.getInputBuffer()); + ((HttpEntityEnclosingRequest)request).setEntity(entity); + executor.execute(new Runnable() { + public void run() { + try { + delegate.handle(request, response, context); + responseListener.responseReady(); + } catch (HttpException e) { + responseListener.handleException(e); + } catch (IOException e) { + responseListener.handleException(e); + } + } + }); + return; + } + } + // If we fell through, because no entity.. + delegate.handle(request, response, context); + } + + static class ContentReader implements ContentListener { + private SharedInputBuffer inputBuffer; + + public ContentReader(ByteBufferAllocator allocator, IOControl ioctrl) { + inputBuffer = new SharedInputBuffer(2048, ioctrl, allocator); + } + + @Override + public int contentAvailable(ContentDecoder decoder, IOControl ioctrl) + throws IOException { + int read = inputBuffer.consumeContent(decoder); + return read; + } + + @Override + public void finish() { + inputBuffer.shutdown(); + inputBuffer = null; + } + + public ContentInputBuffer getInputBuffer() { + return inputBuffer; + } + } +} Property changes on: module-nio/src/main/java/org/apache/http/nio/protocol/NBlockingHttpRequestHandler.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/ContentDecoderChannel.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/ContentDecoderChannel.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/ContentDecoderChannel.java (revision 0) @@ -0,0 +1,36 @@ +package org.apache.http.nio; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; + +/** + * A {@link ReadableByteChannel} that delegates to a {@link ContentDecoder}. + * Attempts to close this channel are ignored, and isOpen always returns true. + * + * @author Sam Berlin + */ +public class ContentDecoderChannel implements ReadableByteChannel { + + private final ContentDecoder decoder; + + public ContentDecoderChannel(ContentDecoder decoder) { + this.decoder = decoder; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return decoder.read(dst); + } + + @Override + public void close() {} + + @Override + public boolean isOpen() { + return true; + } + + + +} Property changes on: module-nio/src/main/java/org/apache/http/nio/ContentDecoderChannel.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/entity/ContentBufferEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/ContentBufferEntity.java (revision 628295) +++ module-nio/src/main/java/org/apache/http/nio/entity/ContentBufferEntity.java (working copy) @@ -36,6 +36,7 @@ import org.apache.http.entity.BasicHttpEntity; import org.apache.http.nio.util.ContentInputBuffer; +@Deprecated public class ContentBufferEntity extends BasicHttpEntity { /** The wrapped entity. */ Index: module-nio/src/main/java/org/apache/http/nio/entity/ProducingNHttpEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/ProducingNHttpEntity.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/entity/ProducingNHttpEntity.java (revision 0) @@ -0,0 +1,22 @@ +package org.apache.http.nio.entity; + +import java.io.IOException; + +import org.apache.http.nio.ContentEncoder; +import org.apache.http.nio.IOControl; + +/** + * An {@link NHttpEntity} that writes content to a {@link ContentEncoder}. + * + * @author Sam Berlin + */ +public interface ProducingNHttpEntity extends NHttpEntity { + + /** + * Notification that content should be written to the encoder. + * When all content is finished, this MUST call {@link ContentEncoder#complete()}. + * Failure to do so could result in the entity never being written. + */ + void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOException; + +} Property changes on: module-nio/src/main/java/org/apache/http/nio/entity/ProducingNHttpEntity.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/entity/ByteArrayNIOEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/ByteArrayNIOEntity.java (revision 628295) +++ module-nio/src/main/java/org/apache/http/nio/entity/ByteArrayNIOEntity.java (working copy) @@ -1,7 +1,7 @@ /* - * $HeadURL:$ - * $Revision:$ - * $Date:$ + * $HeadURL$ + * $Revision$ + * $Date$ * * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one @@ -45,10 +45,11 @@ * * @author Oleg Kalnichevski * - * @version $Revision:$ + * @version $Revision$ * * @since 4.0 */ +@Deprecated public class ByteArrayNIOEntity extends ByteArrayEntity implements HttpNIOEntity { public ByteArrayNIOEntity(final byte[] b) { Index: module-nio/src/main/java/org/apache/http/nio/entity/ContentInputStream.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/ContentInputStream.java (revision 628295) +++ module-nio/src/main/java/org/apache/http/nio/entity/ContentInputStream.java (working copy) @@ -36,6 +36,7 @@ import org.apache.http.nio.util.ContentInputBuffer; +@Deprecated public class ContentInputStream extends InputStream { private final ContentInputBuffer buffer; Index: module-nio/src/main/java/org/apache/http/nio/entity/ContentOutputStream.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/ContentOutputStream.java (revision 628295) +++ module-nio/src/main/java/org/apache/http/nio/entity/ContentOutputStream.java (working copy) @@ -36,6 +36,7 @@ import org.apache.http.nio.util.ContentOutputBuffer; +@Deprecated public class ContentOutputStream extends OutputStream { private final ContentOutputBuffer buffer; Index: module-nio/src/main/java/org/apache/http/nio/entity/NByteArrayEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/NByteArrayEntity.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/entity/NByteArrayEntity.java (revision 0) @@ -0,0 +1,87 @@ +/* + * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/entity/ByteArrayNIOEntity.java $ + * $Revision: 602520 $ + * $Date: 2007-12-08 12:42:26 -0500 (Sat, 08 Dec 2007) $ + * + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.http.nio.entity; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.http.nio.ContentEncoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.protocol.AsyncNHttpServiceHandler; + +/** + * An entity whose content is retrieved from a byte array. + * This entity is intended for use only as an {@link NHttpEntity}. + * Blocking methods are not supported. + * + * @author Sam Berlin + * + * @version $Revision: 602520 $ + * + * @see AsyncNHttpServiceHandler + * @since 4.0 + */ +public class NByteArrayEntity extends AbstractNHttpEntity implements ProducingNHttpEntity { + + protected final ByteBuffer buffer; + + public NByteArrayEntity(final byte[] b) { + this.buffer = ByteBuffer.wrap(b); + } + + @Override + public void finish() { + buffer.rewind(); + } + + @Override + public void produceContent(ContentEncoder encoder, IOControl ioctrl) + throws IOException { + encoder.write(buffer); + if(!buffer.hasRemaining()) + encoder.complete(); + } + + @Override + public long getContentLength() { + return buffer.limit(); + } + + @Override + public boolean isRepeatable() { + return true; + } + + + + +} Property changes on: module-nio/src/main/java/org/apache/http/nio/entity/NByteArrayEntity.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/entity/ConsumingNHttpEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/ConsumingNHttpEntity.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/entity/ConsumingNHttpEntity.java (revision 0) @@ -0,0 +1,48 @@ +package org.apache.http.nio.entity; + +import java.io.IOException; + +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.IOControl; + +/** + * A non-blocking entity that allows content to be consumed from a decoder. + * + * If external code wants to be notified when more content is available, it + * should install a ContentListener on the entity via + * {@link #setContentListener(org.apache.http.nio.entity.ConsumingNHttpEntity.ContentListener)}. + * When content becomes available, implementations must notify the listener. + *

+ * All blocking methods throw an {@link UnsupportedOperationException}. + * + * @author Sam Berlin + */ +public interface ConsumingNHttpEntity extends NHttpEntity { + + /** Notification that content is availble for the entity to consume. */ + int consumeContent(ContentDecoder decoder, IOControl ioctrl) + throws IOException; + + /** + * Installs a listener on this entity that will be notified when content is + * available for consumption. + * + * If data is already available, the listener will be immediately notified. + */ + void setContentListener(ContentListener listener); + + /** + * A listener for available data on a non-blocking {@link ConsumingNHttpEntity}. + */ + public static interface ContentListener { + /** + * Notification that content is available to be read from the decoder. + * Implementations must return the amount of data read from the decoder. + */ + int contentAvailable(ContentDecoder decoder, IOControl ioctrl) throws IOException; + + /** Notification that this entity has finished reading data. */ + void finish(); + } + +} Property changes on: module-nio/src/main/java/org/apache/http/nio/entity/ConsumingNHttpEntity.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/entity/AbstractNHttpEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/AbstractNHttpEntity.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/entity/AbstractNHttpEntity.java (revision 0) @@ -0,0 +1,37 @@ +package org.apache.http.nio.entity; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.http.entity.AbstractHttpEntity; + +/** + * An abstract implementation of {@link NHttpEntity} that stubs out all blocking + * methods, throwing an {@link UnsupportedOperationException}. + */ +public abstract class AbstractNHttpEntity extends AbstractHttpEntity implements NHttpEntity { + + @Override + public final InputStream getContent() throws IOException, IllegalStateException { + throw new UnsupportedOperationException("Does not support blocking methods"); + } + + @Override + public final boolean isStreaming() { + throw new UnsupportedOperationException("Does not support blocking methods"); + } + + @Override + public final void writeTo(OutputStream arg0) throws IOException { + throw new UnsupportedOperationException("Does not support blocking methods"); + } + + @Override + public final void consumeContent() throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException("Does not support blocking methods"); + } + + + +} Property changes on: module-nio/src/main/java/org/apache/http/nio/entity/AbstractNHttpEntity.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/entity/BasicConsumingNHttpEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/BasicConsumingNHttpEntity.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/entity/BasicConsumingNHttpEntity.java (revision 0) @@ -0,0 +1,71 @@ +package org.apache.http.nio.entity; + +import java.io.IOException; + +import org.apache.http.HttpEntity; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.IOControl; + +/** + * A default implementation of {@link ConsumingNHttpEntity}. + * Blocking output methods will throw {@link UnsupportedOperationException}. + * + * @author Sam Berlin + */ +public class BasicConsumingNHttpEntity extends AbstractNHttpEntity implements ConsumingNHttpEntity { + + private ContentListener contentListener; + private long contentLength; + + public BasicConsumingNHttpEntity(HttpEntity httpEntity) { + setChunked(httpEntity.isChunked()); + setContentEncoding(httpEntity.getContentEncoding()); + setContentLength(httpEntity.getContentLength()); + setContentType(httpEntity.getContentType()); + } + + /** + * Specifies the length of the content. + * + * @param len the number of bytes in the content, or + * a negative number to indicate an unknown length + */ + public void setContentLength(long len) { + this.contentLength = len; + } + + @Override + public int consumeContent(ContentDecoder decoder, IOControl ioctrl) + throws IOException { + if(contentListener != null) { + return contentListener.contentAvailable(decoder, ioctrl); + } else { + throw new IllegalStateException("consuming content with no listener!"); + } + } + + @Override + public void setContentListener(ContentListener listener) { + this.contentListener = listener; + } + + public ContentListener getContentListener() { + return contentListener; + } + + @Override + public void finish() { + if(contentListener != null) + contentListener.finish(); + } + + @Override + public long getContentLength() { + return contentLength; + } + + @Override + public boolean isRepeatable() { + return false; + } +} Property changes on: module-nio/src/main/java/org/apache/http/nio/entity/BasicConsumingNHttpEntity.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/entity/NStringEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/NStringEntity.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/entity/NStringEntity.java (revision 0) @@ -0,0 +1,98 @@ +/* + * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/entity/StringNIOEntity.java $ + * $Revision: 602520 $ + * $Date: 2007-12-08 12:42:26 -0500 (Sat, 08 Dec 2007) $ + * + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.http.nio.entity; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import org.apache.http.nio.ContentEncoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.protocol.AsyncNHttpServiceHandler; +import org.apache.http.protocol.HTTP; + +/** + * An entity whose content is retrieved from a string. + * This entity is intended for use only as an {@link NHttpEntity}. + * Blocking methods are not supported. + * + * @author Sam Berlin + * + * @version $Revision: 602520 $ + * + * @see AsyncNHttpServiceHandler + * @since 4.0 + */ +public class NStringEntity extends AbstractNHttpEntity implements ProducingNHttpEntity { + + protected final ByteBuffer buffer; + + public NStringEntity(final String s, String charset) + throws UnsupportedEncodingException { + if (s == null) { + throw new IllegalArgumentException("Source string may not be null"); + } + if (charset == null) { + charset = HTTP.DEFAULT_CONTENT_CHARSET; + } + this.buffer = ByteBuffer.wrap(s.getBytes(charset)); + setContentType(HTTP.PLAIN_TEXT_TYPE + HTTP.CHARSET_PARAM + charset); + } + + public NStringEntity(final String s) throws UnsupportedEncodingException { + this(s, null); + } + + @Override + public boolean isRepeatable() { + return true; + } + + @Override + public long getContentLength() { + return this.buffer.limit(); + } + + @Override + public void finish() { + buffer.rewind(); + } + + @Override + public void produceContent(ContentEncoder encoder, IOControl ioctrl) + throws IOException { + encoder.write(buffer); + if(!buffer.hasRemaining()) + encoder.complete(); + } + +} Property changes on: module-nio/src/main/java/org/apache/http/nio/entity/NStringEntity.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/entity/HttpNIOEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/HttpNIOEntity.java (revision 628295) +++ module-nio/src/main/java/org/apache/http/nio/entity/HttpNIOEntity.java (working copy) @@ -36,6 +36,7 @@ import org.apache.http.HttpEntity; +@Deprecated public interface HttpNIOEntity extends HttpEntity { ReadableByteChannel getChannel() throws IOException; Index: module-nio/src/main/java/org/apache/http/nio/entity/NHttpEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/NHttpEntity.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/entity/NHttpEntity.java (revision 0) @@ -0,0 +1,19 @@ +package org.apache.http.nio.entity; + +import org.apache.http.HttpEntity; +import org.apache.http.nio.protocol.AsyncNHttpServiceHandler; + +/** + * A {@link HttpEntity} intended for use with non-blocking requests. + * Incoming entities are expected to implement {@link ConsumingNHttpEntity}, + * and outgoing entities are expected to implement {@link ProducingNHttpEntity}. + * + * @see AsyncNHttpServiceHandler + * + * @author Sam Berlin + */ +public interface NHttpEntity extends HttpEntity { + + void finish(); + +} Property changes on: module-nio/src/main/java/org/apache/http/nio/entity/NHttpEntity.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/entity/NFileEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/NFileEntity.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/entity/NFileEntity.java (revision 0) @@ -0,0 +1,113 @@ +/* + * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/entity/FileNIOEntity.java $ + * $Revision: 602520 $ + * $Date: 2007-12-08 12:42:26 -0500 (Sat, 08 Dec 2007) $ + * + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.http.nio.entity; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; + +import org.apache.http.nio.ContentEncoder; +import org.apache.http.nio.ContentEncoderChannel; +import org.apache.http.nio.FileContentEncoder; +import org.apache.http.nio.IOControl; + +/** + * An entity whose content is retrieved from from a file. + * This entity is intended for use only as an {@link NHttpEntity}. + * Blocking methods are not supported. + * + * @author Sam Berlin + * + * @version $Revision: 602520 $ + * + * @since 4.0 + */ +public class NFileEntity extends AbstractNHttpEntity implements ProducingNHttpEntity { + + private final File file; + private FileChannel fileChannel; + private long idx = -1; + private boolean useFileChannels; + + public NFileEntity(final File file, final String contentType, boolean useFileChannels) { + if (file == null) { + throw new IllegalArgumentException("File may not be null"); + } + this.file = file; + this.useFileChannels = useFileChannels; + setContentType(contentType); + } + + @Override + public void finish() { + try { + if(fileChannel != null) + fileChannel.close(); + } catch(IOException ignored) {} + fileChannel = null; + } + + @Override + public long getContentLength() { + return file.length(); + } + + @Override + public boolean isRepeatable() { + return true; + } + + @Override + public void produceContent(ContentEncoder encoder, IOControl ioctrl) + throws IOException { + if(fileChannel == null) { + FileInputStream in = new FileInputStream(file); + fileChannel = in.getChannel(); + idx = 0; + } + + long transferred; + if(useFileChannels && encoder instanceof FileContentEncoder) { + transferred = ((FileContentEncoder)encoder).transfer(fileChannel, idx, Long.MAX_VALUE); + } else { + transferred = fileChannel.transferTo(idx, Long.MAX_VALUE, new ContentEncoderChannel(encoder)); + } + + if(transferred > 0) + idx += transferred; + + if(transferred == -1) + encoder.complete(); + } + +} Property changes on: module-nio/src/main/java/org/apache/http/nio/entity/NFileEntity.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/entity/NBlockingProducingNHttpEntity.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/entity/NBlockingProducingNHttpEntity.java (revision 0) +++ module-nio/src/main/java/org/apache/http/nio/entity/NBlockingProducingNHttpEntity.java (revision 0) @@ -0,0 +1,77 @@ +package org.apache.http.nio.entity; + +import java.io.IOException; + +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.nio.ContentEncoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.protocol.AsyncNHttpServiceHandler; +import org.apache.http.nio.util.ByteBufferAllocator; +import org.apache.http.nio.util.SimpleOutputBuffer; + +/** + * A {@link ProducingNHttpEntity} that delegates to a blocking + * {@link HttpEntity}. This is useful when the only entity available is a + * blocking one, but you want to make it work in a non-blocking manner. + *

+ * The entire output is buffered in memory. + * + * @see AsyncNHttpServiceHandler + * + * @author Sam Berlin + */ +public class NBlockingProducingNHttpEntity extends AbstractNHttpEntity implements + ProducingNHttpEntity { + + private final HttpEntity entity; + private final ByteBufferAllocator allocator; + private SimpleOutputBuffer outputBuffer; + + public NBlockingProducingNHttpEntity(HttpEntity entity, ByteBufferAllocator allocator) { + this.entity = entity; + this.allocator = allocator; + } + + @Override + public void finish() { + } + + + @Override + public void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOException { + if(outputBuffer == null) { + outputBuffer = new SimpleOutputBuffer(2048, allocator); + entity.writeTo(new ContentOutputStream(outputBuffer)); + } + + outputBuffer.produceContent(encoder); + if(!outputBuffer.hasData()) + encoder.complete(); + } + + @Override + public long getContentLength() { + return entity.getContentLength(); + } + + @Override + public boolean isRepeatable() { + return entity.isRepeatable(); + } + + @Override + public Header getContentEncoding() { + return entity.getContentEncoding(); + } + + @Override + public Header getContentType() { + return entity.getContentType(); + } + + @Override + public boolean isChunked() { + return entity.isChunked(); + } +} Property changes on: module-nio/src/main/java/org/apache/http/nio/entity/NBlockingProducingNHttpEntity.java ___________________________________________________________________ Name: svn:executable + * Index: module-nio/src/main/java/org/apache/http/nio/util/ContentInputBuffer.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/util/ContentInputBuffer.java (revision 628295) +++ module-nio/src/main/java/org/apache/http/nio/util/ContentInputBuffer.java (working copy) @@ -35,6 +35,7 @@ import org.apache.http.nio.ContentDecoder; +@Deprecated public interface ContentInputBuffer { int consumeContent(ContentDecoder decoder) throws IOException; Index: module-nio/src/examples/org/apache/http/examples/nio/AsyncNHttpFileServer.java =================================================================== --- module-nio/src/examples/org/apache/http/examples/nio/AsyncNHttpFileServer.java (revision 0) +++ module-nio/src/examples/org/apache/http/examples/nio/AsyncNHttpFileServer.java (revision 0) @@ -0,0 +1,243 @@ +/* + * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/examples/nio/NHttpFileServer.java $ + * $Revision: 610763 $ + * $Date: 2008-01-10 07:01:13 -0500 (Thu, 10 Jan 2008) $ + * + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.http.examples.nio; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.InetSocketAddress; +import java.net.URLDecoder; +import java.nio.channels.FileChannel; + +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.impl.DefaultConnectionReuseStrategy; +import org.apache.http.impl.DefaultHttpResponseFactory; +import org.apache.http.impl.nio.DefaultServerIOEventDispatch; +import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.ContentDecoderChannel; +import org.apache.http.nio.FileContentDecoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.NHttpConnection; +import org.apache.http.nio.NHttpServiceHandler; +import org.apache.http.nio.entity.ConsumingNHttpEntity; +import org.apache.http.nio.entity.NFileEntity; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.nio.entity.ConsumingNHttpEntity.ContentListener; +import org.apache.http.nio.protocol.AsyncNHttpServiceHandler; +import org.apache.http.nio.protocol.EventListener; +import org.apache.http.nio.reactor.IOEventDispatch; +import org.apache.http.nio.reactor.ListeningIOReactor; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.CoreConnectionPNames; +import org.apache.http.params.CoreProtocolPNames; +import org.apache.http.params.HttpParams; +import org.apache.http.protocol.BasicHttpProcessor; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpRequestHandler; +import org.apache.http.protocol.HttpRequestHandlerRegistry; +import org.apache.http.protocol.ResponseConnControl; +import org.apache.http.protocol.ResponseContent; +import org.apache.http.protocol.ResponseDate; +import org.apache.http.protocol.ResponseServer; + +/** + * A version of {@link NHttpFileServer} that uses {@link AsyncNHttpServiceHandler} + * instead of implementing its own {@link NHttpServiceHandler}. This makes the example + * very analogous to {@link NHttpServer}, except it is fully asynchronous and does + * not buffer the contents of the file in memory. + */ +public class AsyncNHttpFileServer { + + public static void main(String[] args) throws Exception { + if (args.length < 1) { + System.err.println("Please specify document root directory"); + System.exit(1); + } + boolean useFileChannels = true; + if (args.length >= 2) { + String s = args[1]; + if (s.equalsIgnoreCase("disableFileChannels")) { + useFileChannels = false; + } + } + + HttpParams params = new BasicHttpParams(); + params + .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 20000) + .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) + .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false) + .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true) + .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpComponents/1.1"); + + BasicHttpProcessor httpproc = new BasicHttpProcessor(); + httpproc.addInterceptor(new ResponseDate()); + httpproc.addInterceptor(new ResponseServer()); + httpproc.addInterceptor(new ResponseContent()); + httpproc.addInterceptor(new ResponseConnControl()); + + AsyncNHttpServiceHandler handler = new AsyncNHttpServiceHandler( + httpproc, + new DefaultHttpResponseFactory(), + new DefaultConnectionReuseStrategy(), + params); + + // Set up request handlers + HttpRequestHandlerRegistry reqistry = new HttpRequestHandlerRegistry(); + reqistry.register("*", new HttpFileHandler(args[0], useFileChannels)); + + handler.setHandlerResolver(reqistry); + + // Provide an event logger + handler.setEventListener(new EventLogger()); + + IOEventDispatch ioEventDispatch = new DefaultServerIOEventDispatch(handler, params); + ListeningIOReactor ioReactor = new DefaultListeningIOReactor(2, params); + try { + ioReactor.listen(new InetSocketAddress(8080)); + ioReactor.execute(ioEventDispatch); + } catch (InterruptedIOException ex) { + System.err.println("Interrupted"); + } catch (IOException e) { + System.err.println("I/O error: " + e.getMessage()); + } + System.out.println("Shutdown"); + } + + static class HttpFileHandler implements HttpRequestHandler { + + private final String docRoot; + private final boolean useFileChannels; + + public HttpFileHandler(final String docRoot, boolean useFileChannels) { + this.docRoot = docRoot; + this.useFileChannels = useFileChannels; + } + + public void handle( + final HttpRequest request, + final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + + if(request instanceof HttpEntityEnclosingRequest) { + ConsumingNHttpEntity entity = (ConsumingNHttpEntity)((HttpEntityEnclosingRequest)request).getEntity(); + entity.setContentListener(new FileWriteListener(useFileChannels)); + } + + String target = request.getRequestLine().getUri(); + final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8")); + if (!file.exists()) { + response.setStatusCode(HttpStatus.SC_NOT_FOUND); + NStringEntity entity = new NStringEntity("

File" + + file.getPath() + " not found

", "UTF-8"); + entity.setContentType("text/html; charset=UTF-8"); + response.setEntity(entity); + } else if (!file.canRead() || file.isDirectory()) { + response.setStatusCode(HttpStatus.SC_FORBIDDEN); + NStringEntity entity = new NStringEntity("

Access denied

", "UTF-8"); + entity.setContentType("text/html; charset=UTF-8"); + response.setEntity(entity); + } else { + response.setStatusCode(HttpStatus.SC_OK); + response.setEntity(new NFileEntity(file, "text/html", useFileChannels)); + } + } + } + + static class FileWriteListener implements ContentListener { + private final File file; + private final FileInputStream inputFile; + private final FileChannel fileChannel; + private final boolean useFileChannels; + private long idx = 0; + + public FileWriteListener(boolean useFileChannels) throws IOException { + this.file = File.createTempFile("tmp", ".tmp", null); + this.inputFile = new FileInputStream(file); + this.fileChannel = inputFile.getChannel(); + this.useFileChannels = useFileChannels; + } + + @Override + public int contentAvailable(ContentDecoder decoder, IOControl ioctrl) + throws IOException { + long transferred; + if(useFileChannels && decoder instanceof FileContentDecoder) { + transferred = ((FileContentDecoder)decoder).transfer(fileChannel, idx, Long.MAX_VALUE); + } else { + transferred = fileChannel.transferFrom(new ContentDecoderChannel(decoder), idx, Long.MAX_VALUE); + } + + if(transferred > 0) + idx += transferred; + return (int)transferred; + } + + @Override + public void finish() { + try { + inputFile.close(); + } catch(IOException ignored) {} + try { + fileChannel.close(); + } catch(IOException ignored) {} + } + } + + static class EventLogger implements EventListener { + + public void connectionOpen(final NHttpConnection conn) { + System.out.println("Connection open: " + conn); + } + + public void connectionTimeout(final NHttpConnection conn) { + System.out.println("Connection timed out: " + conn); + } + + public void connectionClosed(final NHttpConnection conn) { + System.out.println("Connection closed: " + conn); + } + + public void fatalIOException(final IOException ex, final NHttpConnection conn) { + System.err.println("I/O error: " + ex.getMessage()); + } + + public void fatalProtocolException(final HttpException ex, final NHttpConnection conn) { + System.err.println("HTTP error: " + ex.getMessage()); + } + + } +} Property changes on: module-nio/src/examples/org/apache/http/examples/nio/AsyncNHttpFileServer.java ___________________________________________________________________ Name: svn:executable + *