From 74e8a1b048e52eac7cd0d22bc5447b7694c3b072 Mon Sep 17 00:00:00 2001 From: stack Date: Mon, 5 Jan 2015 09:06:53 -0800 Subject: [PATCH] HTRACE-51 htraced java REST client (a.k.a java SpanReceiver for htraced) First cut at a receiver for htraced that uses REST. Not finished. Was waiting on a write REST verb to finish. Missing is the batching up of spans and POSTing to htraced (async). Adds new htrace-htraced module intended to house htraced and htraced client(s). Moving htraced here will keep htrace-core clean with Interfaces and basic implementations only; tooling will be out in other modules. Makes jackson a first-class dependency moving it up to top-level pom since used by core and htraced modules. Adds new receiver, HTracedRESTReceiver, that takes a hostname and port for remote htraced daemon and a timeout. Adds a non-blocking httpclient used by the receiver to send spans to htraced via REST modeled on https://github.com/timboudreau/netty-http-client/ (TODO: Reexamine if could not just include the above client altogether: at first blush it had too many dependencies and did more than we need here but go look again). Non-blocking is probably overkill given it unlikely we'll ever have more than a single connection going on and we'll be batching up the writing of spans but a nice-to-have. TODO: add http keepalive support and optionally compression. --- htrace-core/pom.xml | 2 - htrace-htraced/pom.xml | 133 +++++++++++ .../apache/htrace/impl/HTracedRESTReceiver.java | 151 +++++++++++++ .../java/org/apache/htrace/impl/HttpClient.java | 248 +++++++++++++++++++++ .../impl/HttpClientChannelInboundHandler.java | 153 +++++++++++++ .../htrace/impl/HttpClientChannelInitializer.java | 48 ++++ .../org/apache/htrace/impl/HttpRequestBuilder.java | 78 +++++++ .../org/apache/htrace/impl/HttpRequestInfo.java | 91 ++++++++ .../org/apache/htrace/impl/HttpResponseFuture.java | 121 ++++++++++ .../apache/htrace/impl/HttpResponseHandler.java | 94 ++++++++ .../org/apache/htrace/impl/TestHttpClient.java | 97 ++++++++ pom.xml | 11 + 12 files changed, 1225 insertions(+), 2 deletions(-) create mode 100644 htrace-htraced/pom.xml create mode 100644 htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java create mode 100644 htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClient.java create mode 100644 htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClientChannelInboundHandler.java create mode 100644 htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClientChannelInitializer.java create mode 100644 htrace-htraced/src/main/java/org/apache/htrace/impl/HttpRequestBuilder.java create mode 100644 htrace-htraced/src/main/java/org/apache/htrace/impl/HttpRequestInfo.java create mode 100644 htrace-htraced/src/main/java/org/apache/htrace/impl/HttpResponseFuture.java create mode 100644 htrace-htraced/src/main/java/org/apache/htrace/impl/HttpResponseHandler.java create mode 100644 htrace-htraced/src/test/java/org/apache/htrace/impl/TestHttpClient.java diff --git a/htrace-core/pom.xml b/htrace-core/pom.xml index 91d906c..b4e1ca3 100644 --- a/htrace-core/pom.xml +++ b/htrace-core/pom.xml @@ -140,12 +140,10 @@ language governing permissions and limitations under the License. --> com.fasterxml.jackson.core jackson-core - 2.4.0 com.fasterxml.jackson.core jackson-databind - 2.4.0 diff --git a/htrace-htraced/pom.xml b/htrace-htraced/pom.xml new file mode 100644 index 0000000..ab4756e --- /dev/null +++ b/htrace-htraced/pom.xml @@ -0,0 +1,133 @@ + + + + 4.0.0 + + htrace-htraced + jar + + + htrace + org.apache.htrace + 3.1.0 + .. + + + htrace-htraced + HTraced and HTraced clients + http://incubator.apache.org/projects/htrace.html + + + UTF-8 + 4.0.24.Final + + + + + + + maven-assembly-plugin + + true + + + + org.apache.maven.plugins + maven-source-plugin + + + maven-javadoc-plugin + + + maven-compiler-plugin + + + org.apache.maven.plugins + maven-gpg-plugin + + + org.apache.rat + apache-rat-plugin + + + + maven-deploy-plugin + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + + + + + + org.apache.htrace + htrace-core + ${project.version} + provided + + + org.apache.htrace + htrace-core + ${project.version} + tests + test + + + + commons-logging + commons-logging + provided + + + junit + junit + test + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + + io.netty + netty-common + ${netty.version} + + + io.netty + netty-transport + ${netty.version} + + + io.netty + netty-codec-http + ${netty.version} + + + + joda-time + joda-time + 2.6 + + + diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java new file mode 100644 index 0000000..8adde80 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.concurrent.TimeUnit; + +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiver; +import org.joda.time.Duration; + +/** + * A {@link SpanReceiver} that passes Spans to htraced via REST. Implementation minimizes + * dependencies since this client will be embedded in the process being traced. + * Dependent on the REST defined in rest.go in the htraced REST server. + * For example, a GET on /serverInfo returns the htraced server info. + * + *

Create an instance by doing: + * SpanReceiver receiver = + * HTracedRESTReceiver.builder().hostname(HTRACED_HOSTNAME).port(HTRACED_PORT).build(); + * Then call receiver.receiveSpan(Span); to send spans to htraced. This method + * returns immediately. It sends the spans in background asynchronously being parsimonious about + * resources consumed. + * + *

TODO: How to be more dependent on rest.go so we break if it changes? + */ +public class HTracedRESTReceiver implements SpanReceiver { + // Uses Builder and Fluent patterns as described here: + // http://jlordiales.me/2012/12/13/the-builder-pattern-in-practice/ + private final HttpClient httpClient; + private final String hostname; + private final int port; + + /** + * Builder class. Get instance by calling {@link HTracedRESTReceiver#builder()}. + */ + public static class Builder { + private Duration timeout = new Duration(1000); + private String hostname = "localhost"; + /** + * Default port for htraced. + */ + private int port = 9095; + + private Builder() {} + + public Builder hostname(final String hostname) { + this.hostname = hostname; + return this; + } + + public Builder port(final int port) { + this.port = port; + return this; + } + + /** + * @param timeout Connection setup and request timeouts. + * @return This + */ + public Builder timeout(final long timeout) { + this.timeout = new Duration(timeout); + return this; + } + + public HTracedRESTReceiver build() { + return new HTracedRESTReceiver(this); + } + } + + private HTracedRESTReceiver(Builder builder) { + this.httpClient = new HttpClient(this.getClass().getSimpleName(), builder.timeout); + this.hostname = builder.hostname; + this.port = builder.port; + } + + /** + * @return New builder instance. + */ + public static Builder builder() { + return new Builder(); + } + + @Override + public void close() throws IOException { + if (this.httpClient != null) this.httpClient.shutdown(); + } + + @Override + public void receiveSpan(Span span) { + // TODO when htraced has a write method. + // Add span to a queue and return immediately. Send spans in background to htraced on a timeout + // and/or size. Start dropping spans if we fail to connect or cannot send. We are running in + // someone elses' process so don't hog resources. + } + + /** + * @param hostname + * @param port + * @param fileREST + * @return URL made from hostname, port, and fileREST + */ + private static URL buildURL(final String hostname, final int port, final String fileREST) { + try { + return new URL("http", hostname, port, fileREST); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + public static void main(String[] args) throws IOException, InterruptedException { + HTracedRESTReceiver receiver = + HTracedRESTReceiver.builder().hostname("www.google.com").port(80).timeout(1000).build(); + try { + URL url = buildURL(receiver.hostname, receiver.port, "/"); + HttpRequest request = HttpRequestBuilder.builderForHttpGET(url).build(); + HttpResponseFuture responseFuture = + receiver.httpClient.submit(request, new HttpResponseHandler(String.class) { + @Override + protected void receive(HttpResponseStatus status, HttpHeaders headers, String str) { + super.receive(status, headers, str); + System.out.println(str); + } + }, new Duration(1000)); + responseFuture.await(10, TimeUnit.SECONDS); + } finally { + receiver.close(); + } + } +} \ No newline at end of file diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClient.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClient.java new file mode 100644 index 0000000..0089af9 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClient.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.util.AttributeKey; + +import java.net.ConnectException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.joda.time.Duration; + +/** + * Simple non-blocking httpclient. Minimal dependencies. Create an instance and then pass a netty + * HttpRequest to submit(HttpRequest, ResponseHandler, Duration). Build the http request to pass + * using HttpRequestBuilder. Subclass an instance of ResponseHandler overriding one of its receive + * methods to get the response. We pass back a 'future' that you can check for result or wait on + * (defeating the point of a non-blocking client -- but hey!). For example: + *

+ * HttpClient client = new HttpClient("myHttpClient", 1000);
+ * URL url = new URL("http://www.google.com:80/");
+ * HttpRequest request = HttpRequestBuilder.builderForHttpGET(url).build();
+ * HttpResponseFuture future =
+ *   client.submit(request, new HttpResponseHandler(String.class) {
+ *     @Override
+ *     protected void receive(String str) {
+ *       System.out.println(str);
+ *       }
+ *       }, timeout);
+ * // Block until google does its thing.
+ * future.await();
+ * 
+ *

TODO: keepalive, compress, chunking, redirect. + */ +class HttpClient { + // Modeled on https://github.com/timboudreau/netty-http-client/ + private final String userAgent; + private final NioEventLoopGroup group; + private Bootstrap bootstrap; + private final Timer timer; + private final Duration timeout; + + /** + * Key for attributes associated with the outstanding netty channel. + * Keep all info related to a channel in a datastructure, a RequestInfo object. + */ + static final AttributeKey REQUEST_INFO_KEY = + AttributeKey.valueOf("requestInfo"); + + /** + * @param userAgent Who is hosting this httpclient; added as prefix on thread names we create and + * passed as part of the http agent string. + * @param timeout Timeout in milliseconds for connection setup (CONNECT_TIMEOUT_MILLIS). Pass null + * or 0 for no timeout. + */ + HttpClient(final String userAgent, final Duration timeout) { + if (userAgent == null || userAgent.length() <= 0) { + throw new IllegalArgumentException("No userAgent!"); + } + this.userAgent = userAgent; + this.timeout = timeout; + // TODO: Make it configurable how many threads we use. + this.group = new NioEventLoopGroup(1, new HttpClientThreadFactory(this.userAgent)); + this.timer = new Timer(userAgent + ".HttpClient.timer." + System.identityHashCode(this)); + } + + /** + * Thread factory so can name our threads we. + */ + private static class HttpClientThreadFactory implements ThreadFactory { + private int index = 0; + private final String host; + + HttpClientThreadFactory(final String host) { + this.host = host; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, this.host + ".HttpClient.NioEventLoopGroup." + index++); + t.setDaemon(true); + return t; + } + } + + @SuppressWarnings("deprecation") + public void shutdown() { + // Shutdown the timer we started up in the constructor. + this.timer.cancel(); + if (this.group != null) { + this.group.shutdownGracefully(0, 10, TimeUnit.SECONDS); + if (!this.group.isTerminated()) { + // Should not get here but try forcing shutdown if we do. + this.group.shutdownNow(); + } + } + } + + private synchronized Bootstrap bootstrap() { + // If we have already bootstrapped this client, return. + if (this.bootstrap != null) return this.bootstrap; + Bootstrap bs = new Bootstrap(); + bs.group(group); + bs.handler(new HttpClientChannelInitializer(new HttpClientChannelInboundHandler(), + 128 * 1024, 8192, 16383)); + // Default settings from https://github.com/timboudreau/netty-http-client/ + bs.option(ChannelOption.TCP_NODELAY, true); + bs.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bs.option(ChannelOption.SO_REUSEADDR, false); + if (this.timeout != null) { + bs.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)timeout.getMillis()); + } + // TODO: Pass other options? SO_KEEPALIVE? + bs.channel(NioSocketChannel.class); + this.bootstrap = bs; + return bs; + } + + /** + * Called on timeout from Timer. + * Shut everything down. + */ + static final class TimeoutTimerTask extends TimerTask { + private final HttpRequestInfo requestInfo; + + public TimeoutTimerTask(HttpRequestInfo requestInfo) { + this.requestInfo = requestInfo; + } + + @Override + public void run() { + if (this.requestInfo.cancelled.get()) return; + if (this.requestInfo.responseHandler != null) { + this.requestInfo.responseHandler.onError(new TimeoutException("" + + this.requestInfo.timeout.getMillis() + "ms")); + } + if (this.requestInfo.responseFuture != null) { + this.requestInfo.responseFuture.onTimeout(this.requestInfo.age()); + } + } + } + + private URL getURL(HttpRequest request) { + // TODO: Make a 'richer' request so we don't have to do this reconstruction of original URL. + try { + return new URL(request.getUri()); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + /** + * @param request Build one of these using {@link HttpRequestBuilder} + * @param response Subclass and implement the receive method to get the http response. + * @param timeout Request should complete inside this time; includes connection setup and + * processing of the request + * @return A future, one that is 'fatter' with more facility than ChannelFuture returned by netty. + */ + HttpResponseFuture submit(final HttpRequest request, final HttpResponseHandler response, + final Duration timeout) { + final AtomicBoolean cancelled = new AtomicBoolean(); + HttpResponseFuture responseFuture = new HttpResponseFuture(cancelled); + HttpRequestInfo requestInfo = + new HttpRequestInfo(request, cancelled, responseFuture, response, timeout); + if (timeout != null) { + TimerTask tt = new TimeoutTimerTask(requestInfo); + requestInfo.timerTask(tt); + this.timer.schedule(tt, timeout.getMillis()); + } + // Get URL from the URI str in request. TODO: make it so don't have to do this parse. + URL url = getURL(request); + // Add user agent. TODO: Add other header types. + request.headers().add(HttpHeaders.Names.USER_AGENT, this.userAgent); + // Connect + Bootstrap bs = bootstrap(); + ChannelFuture future = bs.connect(url.getHost().toString(), url.getPort()); + // Shove our request info object into the channel attributes so can get it later + future.channel().attr(REQUEST_INFO_KEY).set(requestInfo); + responseFuture.setFuture(future); + future.addListener(new ChannelFutureListener() { + /** + * Utility + * @param future Future to cancel. + */ + private void cancel(final ChannelFuture future) { + future.cancel(true); + if (future.channel().isOpen()) { + future.channel().close(); + } + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + // Error + Throwable cause = future.cause(); + if (cause == null) { + cause = new ConnectException(request.getUri()); + } + if (response != null) response.onError(cause); + cancelled.set(true); + } + if (cancelled.get()) { + cancel(future); + return; + } + future = future.channel().writeAndFlush(request); + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (cancelled.get()) cancel(future); + } + }); + } + }); + return responseFuture; + } +} \ No newline at end of file diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClientChannelInboundHandler.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClientChannelInboundHandler.java new file mode 100644 index 0000000..c1a1a71 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClientChannelInboundHandler.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; + +import java.util.Map; + +/** + * Channel response handler. For internal use. It processes the response and makes it available + * to clients via HttpResponseHandler. + */ +@Sharable +class HttpClientChannelInboundHandler extends ChannelInboundHandlerAdapter { + /** + * Key to use keeping response state in the channel + */ + static final AttributeKey RESPONSE_STATE = + AttributeKey.valueOf("responseState"); + + /** + * Data structure of response state (keyed by {@link #RESPONSE_STATE} + */ + static class ResponseState { + final CompositeByteBuf content = Unpooled.compositeBuffer(); + volatile HttpResponse httpResponse; + volatile boolean fullResponseSent; + + boolean hasResponse() { + return httpResponse != null; + } + } + + /** + * Utility for setting {@link ResponseState} in channel attribute. + * @param chc + * @return Current {@link ResponseState} + */ + private ResponseState state(ChannelHandlerContext chc) { + Attribute attribute = chc.channel().attr(RESPONSE_STATE); + ResponseState responseState = attribute.get(); + if (responseState == null) { + responseState = new ResponseState(); + attribute.set(responseState); + } + return responseState; + } + + @Override + public void channelInactive(ChannelHandlerContext chc) throws Exception { + HttpRequestInfo requestInfo = chc.channel().attr(HttpClient.REQUEST_INFO_KEY).get(); + if (!requestInfo.cancelled.get()) { + // From: https://github.com/timboudreau/netty-http-client/ + // Premature close, which is a legitimate way of ending a request + // with no chunks and no Content-Length header according to RFC + sendFullResponse(chc); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) throws Exception { + // TODO: What to do here? + cause.printStackTrace(); + } + + public void channelRead(final ChannelHandlerContext chc, Object obj) throws Exception { + if (checkCancelled(chc)) { + return; + } + final ResponseState state = state(chc); + if (obj instanceof HttpResponse) { + state.httpResponse = (HttpResponse)obj; + } else if (obj instanceof HttpContent) { + HttpContent c = (HttpContent)obj; + c.content().resetReaderIndex(); + if (c.content().readableBytes() > 0) { + state.content.writeBytes(c.content()); + } + state.content.resetReaderIndex(); + boolean last = c instanceof LastHttpContent; + if (!last && state.httpResponse.headers().get(HttpHeaders.Names.CONTENT_LENGTH) != null) { + long contentLength = + Long.parseLong(state.httpResponse.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + last = state.content.readableBytes() >= contentLength; + } + if (last) { + c.content().resetReaderIndex(); + sendFullResponse(chc); + } + } + } + + private boolean checkCancelled(ChannelHandlerContext chc) { + HttpRequestInfo requestInfo = chc.channel().attr(HttpClient.REQUEST_INFO_KEY).get(); + boolean cancelled = requestInfo.cancelled.get(); + if (cancelled) { + Channel channel = chc.channel(); + if (channel.isOpen()) channel.close(); + } + return cancelled; + } + + void sendFullResponse(ChannelHandlerContext chc) { + ResponseState state = state(chc); + HttpRequestInfo requestInfo = chc.channel().attr(HttpClient.REQUEST_INFO_KEY).get(); + state.content.resetReaderIndex(); + if (requestInfo != null) { + requestInfo.cancelTimer(); + } + if (requestInfo.responseHandler != null && + !state.fullResponseSent && state.content.readableBytes() > 0) { + state.fullResponseSent = true; + DefaultFullHttpResponse full = + new DefaultFullHttpResponse(state.httpResponse.getProtocolVersion(), + state.httpResponse.getStatus(), state.content); + for (Map.Entry e : state.httpResponse.headers().entries()) { + full.headers().add(e.getKey(), e.getValue()); + } + state.content.resetReaderIndex(); + requestInfo.responseHandler.internalReceive(state.httpResponse.getStatus(), + state.httpResponse.headers(), state.content); + state.content.resetReaderIndex(); + requestInfo.responseFuture.trigger(); + } + } +} \ No newline at end of file diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClientChannelInitializer.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClientChannelInitializer.java new file mode 100644 index 0000000..96351c7 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpClientChannelInitializer.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpClientCodec; + +/** + * HttpClient Channel setup. + */ +class HttpClientChannelInitializer extends ChannelInitializer { + private final ChannelInboundHandlerAdapter handler; + private final int maxChunkSize; + private final int maxInitialLineLength; + + public HttpClientChannelInitializer(ChannelInboundHandlerAdapter handler, int maxChunkSize, + int maxInitialLineLength, int maxHeadersSize) { + this.handler = handler; + this.maxChunkSize = maxChunkSize; + this.maxInitialLineLength = maxInitialLineLength; + } + + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("http-codec", + new HttpClientCodec(maxInitialLineLength, maxChunkSize, maxChunkSize)); + pipeline.addLast("handler", handler); + } +} \ No newline at end of file diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpRequestBuilder.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpRequestBuilder.java new file mode 100644 index 0000000..a346fa1 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpRequestBuilder.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; + +import java.io.IOException; +import java.net.URL; + +/** + * Build up a netty (Default*)HttpRequest. Call builderForHttpGET(URL) or + * builderForHttpPOST(URL), etc., and then set attributes like body(Object). + */ +class HttpRequestBuilder { + private ByteBuf body; + private final URL url; + private final HttpMethod method; + + private HttpRequestBuilder(final URL url, final HttpMethod method) { + this.url = url; + if (!this.url.getProtocol().toLowerCase().equals("http")) { + throw new IllegalArgumentException("Only http supported: " + this.url.toString()); + } + this.method = method; + } + + static HttpRequestBuilder builderForHttpGET(final URL url) { + return new HttpRequestBuilder(url, HttpMethod.GET); + } + + static HttpRequestBuilder builderForHttpPOST(final URL url) { + return new HttpRequestBuilder(url, HttpMethod.POST); + } + + HttpRequestBuilder body(Object obj) throws IOException { + if (obj instanceof CharSequence) { + CharSequence seq = (CharSequence)obj; + body(seq.toString().getBytes(CharsetUtil.UTF_8.toString())); + } else if (obj instanceof byte[]) { + body(Unpooled.wrappedBuffer((byte[])obj)); + } else if (obj instanceof ByteBuf) { + this.body = (ByteBuf) obj; + } else { + throw new RuntimeException("Unexpected type: " + obj); + } + return this; + } + + HttpRequest build() { + // TODO: Return a richer HttpRequest, one that at least has an URL in it instead of url-as-str + return this.body == null? + new DefaultHttpRequest(HttpVersion.HTTP_1_1, this.method, this.url.toString()): + new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, this.method, this.url.toString(), + this.body); + } +} \ No newline at end of file diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpRequestInfo.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpRequestInfo.java new file mode 100644 index 0000000..cf4e737 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpRequestInfo.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import io.netty.handler.codec.http.HttpRequest; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.joda.time.DateTime; +import org.joda.time.Duration; + +import java.util.TimerTask; + +/** + * Datastructure of channel request info that gets stuffed into netty channel attribute so it is + * avaiable at all points of the processing. + */ +final class HttpRequestInfo { + /** + * The ongoing request. + */ + private final HttpRequest request; + + /** + * Boolean on whether current request has been cancelled. + */ + final AtomicBoolean cancelled; + final HttpResponseFuture responseFuture; + final HttpResponseHandler responseHandler; + final Duration timeout; + final DateTime startTime; + + /** + * Set post construction + */ + volatile TimerTask timer; + + HttpRequestInfo(HttpRequest request, AtomicBoolean cancelled, HttpResponseFuture responseFuture, + HttpResponseHandler responseHandler, Duration timeout, DateTime startTime) { + this.request = request; + this.cancelled = cancelled; + this.responseFuture = responseFuture; + this.responseHandler = responseHandler; + this.timeout = timeout; + this.startTime = startTime; + } + + HttpRequestInfo(HttpRequest request, AtomicBoolean cancelled, HttpResponseFuture responseFuture, + HttpResponseHandler responseHandler, Duration timeout) { + this(request, cancelled, responseFuture, responseHandler, timeout, DateTime.now()); + } + + Duration age() { + return new Duration(this.startTime, DateTime.now()); + } + + boolean isExpired() { + return this.timeout != null? DateTime.now().isAfter(startTime.plus(this.timeout)): false; + } + + TimerTask timerTask(final TimerTask timer) { + this.timer = timer; + return timer; + } + + void cancelTimer() { + if (this.timer != null) this.timer.cancel(); + } + + @Override + public String toString() { + return "url=" + this.request.getUri() + ", request=" + this.request + + ", cancelled=" + cancelled + ", responseFuture=" + responseFuture + + ", responseHandler=" + responseHandler + ", timeout=" + timeout; + } +} \ No newline at end of file diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpResponseFuture.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpResponseFuture.java new file mode 100644 index 0000000..222680d --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpResponseFuture.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import io.netty.channel.ChannelFuture; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.joda.time.Duration; + + +/** + * Facility beyond the ChannelFuture returned out of netty. + * Returned out of a HttpClient#submit. Does not have results. Just for figuring if + * request is done or not currently. TODO: make it more amenable to non-blocking usage. + */ +class HttpResponseFuture { + // TODO: make it so easier doing non-blocking behavior on this type. Over in + // https://github.com/timboudreau/netty-http-client/, has 'lastState', has extensive + // 'state' that you can listen in on: its nice. + private final AtomicBoolean cancelled; + private volatile ChannelFuture future; + private final CountDownLatch latch = new CountDownLatch(1); + + HttpResponseFuture(AtomicBoolean cancelled) { + this.cancelled = cancelled; + } + + void setFuture(ChannelFuture future) { + this.future = future; + } + + void trigger() { + this.latch.countDown(); + } + + /** + * Wait for the channel to be closed. Dangerous without a timeout! + *

+ * Note - blocking while waiting for a response defeats the purpose + * of using an asynchronous HTTP client; this sort of thing is + * sometimes useful in unit tests, but should not be done in production + * code. Where possible, find a way to + * attach a callback and finish work there, rather than use this + * method. + * + * @throws InterruptedException + */ + public HttpResponseFuture await() throws InterruptedException { + this.latch.await(); + return this; + } + + /** + * Wait for a timeout for the request to be complleted. This is realy for + * use in unit tests - normal users of this library should use callbacks. + *

+ * Note - blocking while waiting for a response defeats the purpose + * of using an asynchronous HTTP client; this sort of thing is + * sometimes useful in unit tests, but should not be done in production + * code. Where possible, find a way to + * attach a callback and finish work there, rather than use this + * method. + * + * @param l A number of time units + * @param tu Time units + * @return follows the contract of CountDownLatch.await() + * @throws InterruptedException + */ + public HttpResponseFuture await(long l, TimeUnit tu) throws InterruptedException { + this.latch.await(l, tu); + return this; + } + + void onTimeout(Duration duration) { + cancel(duration); + } + + /** + * Cancel the associated request. This will make a best-effort, but cannot + * guarantee, that no state changes will be fired after the final Cancelled. + * + * @return true if it succeeded, false if it was already canceled + */ + public boolean cancel() { + return cancel(null); + } + + boolean cancel(Duration forTimeout) { + boolean result = cancelled.compareAndSet(false, true); + if (result) { + ChannelFuture future = this.future; + if (future != null) { + future.cancel(true); + } + if (future != null && future.channel() != null && future.channel().isOpen()) { + future.channel().close(); + } + } + // If anyone blocked on this future, release them. + trigger(); + return result; + } +} \ No newline at end of file diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpResponseHandler.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpResponseHandler.java new file mode 100644 index 0000000..5cb48d9 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HttpResponseHandler.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.CharsetUtil; + +/** + * Handle the http response. Subclass and override one of the receive() and onError methods to get + * the actual http response. + */ +public abstract class HttpResponseHandler { + // Modeled on https://github.com/timboudreau/netty-http-client/ + // The type is how you want your http response packaged as a String or netty ByteBuf or byte []. + private final Class type; + + /** + * Say what type you want response as. + * @param type String.class or ByteBuff.class or byte [].class. + */ + public HttpResponseHandler(Class type) { + this.type = type; + } + + protected void internalReceive(HttpResponseStatus status, HttpHeaders headers, ByteBuf content) { + byte [] b = new byte[content.readableBytes()]; + content.readBytes(b); + if (status.code() > 399) { + onErrorResponse(status, headers, new String(b, CharsetUtil.UTF_8)); + return; + } + + if (type == ByteBuf.class) { + doReceive(status, headers, type.cast(content)); + } else if (type == String.class || type == CharSequence.class) { + doReceive(status, headers, type.cast(new String(b, CharsetUtil.UTF_8))); + } else if (type == byte[].class) { + doReceive(status, headers, type.cast(b)); + } else { + throw new RuntimeException("Unsupported type: " + this.type); + } + } + + void doReceive(HttpResponseStatus status, HttpHeaders headers, T obj) { + receive(status, headers, obj); + receive(status, obj); + receive(obj); + } + + protected void receive(HttpResponseStatus status, T obj) { + } + + protected void receive(HttpResponseStatus status, HttpHeaders headers, T obj) { + } + + protected void receive(T obj) { + } + + protected void onErrorResponse(String content) { + } + + protected void onErrorResponse(HttpResponseStatus status, String content) { + onErrorResponse(content); + } + + protected void onErrorResponse(HttpResponseStatus status, HttpHeaders headers, String content) { + onErrorResponse(status, content); + } + + protected void onError(Throwable err) { + err.printStackTrace(); + } + + public Class type() { + return type; + } +} \ No newline at end of file diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHttpClient.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHttpClient.java new file mode 100644 index 0000000..93fffe2 --- /dev/null +++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHttpClient.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import static org.junit.Assert.*; +import io.netty.handler.codec.http.HttpRequest; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class TestHttpClient { + @Rule + public TestName name = new TestName(); + + // TODO: Make it so we don't depend on network connection. Make it so netty client can talk to + // a little netty server. + @Test (timeout=10000) + public void testGetGoogleHomePage() throws MalformedURLException, InterruptedException { + final long timeoutMillis = 10000; + Duration timeout = new Duration(timeoutMillis); + HttpClient client = new HttpClient(name.getMethodName(), timeout); + // A reference to catch the response page in. + final AtomicReference reference = new AtomicReference(); + try { + // Add a port to URL otherwise it defaults -1. + URL url = new URL("http://www.google.com:80/"); + HttpRequest request = HttpRequestBuilder.builderForHttpGET(url).build(); + HttpResponseFuture future = + client.submit(request, new HttpResponseHandler(String.class) { + @Override + protected void receive(String str) { + reference.set(str); + } + }, timeout); + // Block until we get our page. + future.await(); + assertTrue(reference.get() != null && reference.get().length() > 0); + } finally { + client.shutdown(); + } + } + + @Test (timeout=10000) + public void testTimeout() throws Throwable { + final long timeoutMillis = 1000; + Duration timeout = new Duration(timeoutMillis); + HttpClient client = new HttpClient(name.getMethodName(), timeout); + // A reference to catch the error in. + final AtomicReference reference = new AtomicReference(); + try { + // Impossible address. Add a port to URL otherwise it defaults -1. + URL url = new URL("http://10.0.0.0:80/"); + HttpRequest request = HttpRequestBuilder.builderForHttpGET(url).build(); + HttpResponseFuture future = + client.submit(request, new HttpResponseHandler(String.class) { + @Override + protected void receive(String str) { + // Print out anything we get but we should not get anything but error. + System.out.println(str); + } + + @Override + protected void onError(Throwable err) { + reference.set(err); + } + }, timeout); + future.await(); + assertTrue(reference.get() != null); + // Assert we timed out. + assertTrue(reference.get() instanceof TimeoutException); + } finally { + client.shutdown(); + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index a85149a..ebb09f6 100644 --- a/pom.xml +++ b/pom.xml @@ -32,6 +32,7 @@ language governing permissions and limitations under the License. --> htrace-zipkin htrace-hbase htrace-flume + htrace-htraced @@ -300,6 +301,16 @@ language governing permissions and limitations under the License. --> 4.10 test + + com.fasterxml.jackson.core + jackson-core + 2.4.0 + + + com.fasterxml.jackson.core + jackson-databind + 2.4.0 + -- 2.2.1