Index: module-nio/src/test/java/org/apache/http/nio/protocol/TestAsyncNHttpHandlers.java =================================================================== --- module-nio/src/test/java/org/apache/http/nio/protocol/TestAsyncNHttpHandlers.java (revision 632005) +++ module-nio/src/test/java/org/apache/http/nio/protocol/TestAsyncNHttpHandlers.java (working copy) @@ -1271,5 +1271,380 @@ } } } + + /** + * This test executes a series of delayed GETs, ensuring the + * {@link NHttpResponseTrigger} works correctly. + */ + public void testDelayedHttpGets() throws Exception { + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + final ByteSequence requestData = new ByteSequence(); + requestData.rnd(reqNo); + + List responseData = new ArrayList(connNo); + for (int i = 0; i < connNo; i++) { + responseData.add(new ByteSequence()); + } + + NHttpRequestHandler requestHandler = new NHttpRequestHandler() { + + public ConsumingNHttpEntity entityRequest( + final HttpEntityEnclosingRequest request, + final HttpContext context) { + return null; + } + + public void handle(HttpRequest request, final HttpResponse response, + final NHttpResponseTrigger trigger, HttpContext context) + throws HttpException, IOException { + String s = request.getRequestLine().getUri(); + URI uri; + try { + uri = new URI(s); + } catch (URISyntaxException ex) { + throw new HttpException("Invalid request URI: " + s); + } + int index = Integer.parseInt(uri.getQuery()); + final byte[] bytes = requestData.getBytes(index); + new Thread() { + public void run() { + // Wait a bit, to make sure this is delayed. + try { Thread.sleep(10); } catch(InterruptedException ie) {} + // Set the entity after delaying... + NByteArrayEntity entity = new NByteArrayEntity(bytes); + response.setEntity(entity); + trigger.submitResponse(response); + } + }.start(); + } + + }; + + NHttpRequestExecutionHandler requestExecutionHandler = new NHttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("LIST", (ByteSequence) attachment); + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpRequest get = null; + if (i < reqNo) { + get = new BasicHttpRequest("GET", "/?" + i); + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return get; + } + + public ConsumingNHttpEntity responseEntity( + final HttpResponse response, + final HttpContext context) throws IOException { + return new BufferingNHttpEntity(response.getEntity(), + new HeapByteBufferAllocator()); + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + ByteSequence list = (ByteSequence) context.getAttribute("LIST"); + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + byte[] data = EntityUtils.toByteArray(entity); + list.addBytes(data); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < responseData.size(); i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + responseData.get(i)); + } + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + for (int c = 0; c < responseData.size(); c++) { + ByteSequence receivedPackets = responseData.get(c); + ByteSequence expectedPackets = requestData; + assertEquals(expectedPackets.size(), receivedPackets.size()); + for (int p = 0; p < requestData.size(); p++) { + byte[] expected = requestData.getBytes(p); + byte[] received = receivedPackets.getBytes(p); + + assertEquals(expected.length, received.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], received[i]); + } + } + } + + } + + /** + * This test ensures that HttpExceptions work correctly when immediate. + */ + public void testHttpException() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + + NHttpRequestHandler requestHandler = new SimpleNHttpRequestHandler() { + + public ConsumingNHttpEntity entityRequest( + final HttpEntityEnclosingRequest request, + final HttpContext context) { + return null; + } + + public void handle(HttpRequest request, final HttpResponse response, + HttpContext context) + throws HttpException, IOException { + try { + URI uri = new URI(request.getRequestLine().getUri()); + throw new HttpException("Error: " + uri.getQuery()); + } catch(URISyntaxException uri) { + throw new IOException(); + } + } + + }; + + NHttpRequestExecutionHandler requestExecutionHandler = new NHttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpRequest get = null; + if (i < reqNo) { + get = new BasicHttpRequest("GET", "/?" + i); + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return get; + } + + public ConsumingNHttpEntity responseEntity( + final HttpResponse response, + final HttpContext context) throws IOException { + return new BufferingNHttpEntity(response.getEntity(), + new HeapByteBufferAllocator()); + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + String data = EntityUtils.toString(entity); + if(!data.equals("Error: " + (i-1))) + throw new IOException(); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < connNo; i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + null); + } + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + } + + /** + * This test ensures that HttpExceptions work correctly when they are delayed by a trigger. + */ + public void testDelayedHttpException() throws Exception { + + final int connNo = 3; + final int reqNo = 20; + final RequestCount requestCount = new RequestCount(connNo * reqNo); + + NHttpRequestHandler requestHandler = new NHttpRequestHandler() { + + public ConsumingNHttpEntity entityRequest( + final HttpEntityEnclosingRequest request, + final HttpContext context) { + return null; + } + public void handle(final HttpRequest request, HttpResponse response, + final NHttpResponseTrigger trigger, HttpContext context) + throws HttpException, IOException { + new Thread() { + public void run() { + try { Thread.sleep(10); } catch(InterruptedException ie) {}; + try { + URI uri = new URI(request.getRequestLine().getUri()); + trigger.handleException(new HttpException("Error: " + uri.getQuery())); + } catch(URISyntaxException uri) { + trigger.handleException(new IOException()); + } + } + }.start(); + } + + }; + + NHttpRequestExecutionHandler requestExecutionHandler = new NHttpRequestExecutionHandler() { + + public void initalizeContext(final HttpContext context, final Object attachment) { + context.setAttribute("REQ-COUNT", new Integer(0)); + context.setAttribute("RES-COUNT", new Integer(0)); + } + + public void finalizeContext(final HttpContext context) { + } + + public HttpRequest submitRequest(final HttpContext context) { + int i = ((Integer) context.getAttribute("REQ-COUNT")).intValue(); + BasicHttpRequest get = null; + if (i < reqNo) { + get = new BasicHttpRequest("GET", "/?" + i); + context.setAttribute("REQ-COUNT", new Integer(i + 1)); + } + return get; + } + + public ConsumingNHttpEntity responseEntity( + final HttpResponse response, + final HttpContext context) throws IOException { + return new BufferingNHttpEntity(response.getEntity(), + new HeapByteBufferAllocator()); + } + + public void handleResponse(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + ExecutionContext.HTTP_CONNECTION); + + int i = ((Integer) context.getAttribute("RES-COUNT")).intValue(); + i++; + context.setAttribute("RES-COUNT", new Integer(i)); + + try { + HttpEntity entity = response.getEntity(); + String data = EntityUtils.toString(entity); + if(!data.equals("Error: " + (i-1))) + throw new IOException(); + requestCount.decrement(); + } catch (IOException ex) { + requestCount.abort(); + return; + } + + if (i < reqNo) { + conn.requestInput(); + } + } + + }; + + NHttpServiceHandler serviceHandler = createHttpServiceHandler( + requestHandler, + null); + + NHttpClientHandler clientHandler = createHttpClientHandler( + requestExecutionHandler); + + this.server.start(serviceHandler); + this.client.start(clientHandler); + + ListenerEndpoint endpoint = this.server.getListenerEndpoint(); + endpoint.waitFor(); + InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress(); + + for (int i = 0; i < connNo; i++) { + this.client.openConnection( + new InetSocketAddress("localhost", serverAddress.getPort()), + null); + } + + requestCount.await(10000); + assertEquals(0, requestCount.getValue()); + + this.client.shutdown(); + this.server.shutdown(); + + } + + } Index: module-nio/src/main/java/org/apache/http/nio/protocol/AsyncNHttpServiceHandler.java =================================================================== --- module-nio/src/main/java/org/apache/http/nio/protocol/AsyncNHttpServiceHandler.java (revision 632005) +++ module-nio/src/main/java/org/apache/http/nio/protocol/AsyncNHttpServiceHandler.java (working copy) @@ -302,7 +302,6 @@ } HttpRequest request = connState.getRequest(); - HttpResponse response = null; try { @@ -313,7 +312,7 @@ HttpException httpex = connState.getHttpExepction(); if (httpex != null) { - response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_0, + HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_0, HttpStatus.SC_INTERNAL_SERVER_ERROR, context); response.setParams( new DefaultedHttpParams(response.getParams(), this.params)); @@ -321,7 +320,7 @@ connState.setResponse(response); } - response = connState.getResponse(); + HttpResponse response = connState.getResponse(); if (response != null) { connState.setHandled(true); sendResponse(conn, request, response); @@ -400,14 +399,13 @@ ver = HttpVersion.HTTP_1_1; } - HttpResponse response = null; + NHttpResponseTrigger trigger = new ResponseTriggerImpl(connState, conn); try { - this.httpProcessor.process(request, context); NHttpRequestHandler handler = connState.getRequestHandler(); if (handler != null) { - response = this.responseFactory.newHttpResponse( + HttpResponse response = this.responseFactory.newHttpResponse( ver, HttpStatus.SC_OK, context); response.setParams( new DefaultedHttpParams(response.getParams(), this.params)); @@ -415,27 +413,19 @@ handler.handle( request, response, - new ResponseTriggerImpl(connState, conn), + trigger, context); } else { - response = this.responseFactory.newHttpResponse(ver, + HttpResponse response = this.responseFactory.newHttpResponse(ver, HttpStatus.SC_NOT_IMPLEMENTED, context); response.setParams( new DefaultedHttpParams(response.getParams(), this.params)); + trigger.submitResponse(response); } } catch (HttpException ex) { - response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_0, - HttpStatus.SC_INTERNAL_SERVER_ERROR, context); - response.setParams( - new DefaultedHttpParams(response.getParams(), this.params)); - handleException(ex, response); + trigger.handleException(ex); } - if (response != null) { - connState.setResponse(response); - sendResponse(conn, request, response); - connState.setHandled(true); - } } private void sendResponse( @@ -513,6 +503,7 @@ finishInput(); this.request = null; finishOutput(); + this.handled = false; this.response = null; this.ioex = null; this.httpex = null;