diff --git a/htrace-core/src/main/java/org/apache/htrace/Span.java b/htrace-core/src/main/java/org/apache/htrace/Span.java index b8af10d..81dd383 100644 --- a/htrace-core/src/main/java/org/apache/htrace/Span.java +++ b/htrace-core/src/main/java/org/apache/htrace/Span.java @@ -36,6 +36,18 @@ import java.util.Map; @JsonSerialize(using = Span.SpanSerializer.class) public interface Span { public static final long ROOT_SPAN_ID = 0x74ace; + // Json serialization field names + public static final String TRACE_ID_FIELD = "i"; + public static final String SPAN_ID_FIELD = "s"; + public static final String BEGIN_FIELD = "b"; + public static final String END_FIELD = "e"; + public static final String DESCRIPTION_FIELD = "d"; + public static final String PROCESS_ID_FIELD = "r"; + public static final String PARENT_IDS_FIELD = "p"; + public static final String KV_ANNOTATIONS_FIELD = "n"; + public static final String TIMELINE_ANNOTATIONS_FIELD = "t"; + public static final String TIMELINE_ANNOTATION_TIME_FIELD = "t"; + public static final String TIMELINE_ANNOTATION_MESSAGE_FIELD = "m"; /** * The block has completed, stop the clock @@ -131,20 +143,20 @@ public interface Span { public void serialize(Span span, JsonGenerator jgen, SerializerProvider provider) throws IOException { jgen.writeStartObject(); - jgen.writeStringField("i", String.format("%016x", span.getTraceId())); - jgen.writeStringField("s", String.format("%016x", span.getSpanId())); - jgen.writeNumberField("b", span.getStartTimeMillis()); - jgen.writeNumberField("e", span.getStopTimeMillis()); - jgen.writeStringField("d", span.getDescription()); - jgen.writeStringField("r", span.getProcessId()); - jgen.writeArrayFieldStart("p"); + jgen.writeStringField(TRACE_ID_FIELD, String.format("%016x", span.getTraceId())); + jgen.writeStringField(SPAN_ID_FIELD, String.format("%016x", span.getSpanId())); + jgen.writeNumberField(BEGIN_FIELD, span.getStartTimeMillis()); + jgen.writeNumberField(END_FIELD, span.getStopTimeMillis()); + jgen.writeStringField(DESCRIPTION_FIELD, span.getDescription()); + jgen.writeStringField(PROCESS_ID_FIELD, span.getProcessId()); + jgen.writeArrayFieldStart(PARENT_IDS_FIELD); if (span.getParentId() != ROOT_SPAN_ID) { jgen.writeString(String.format("%016x", span.getParentId())); } jgen.writeEndArray(); Map traceInfoMap = span.getKVAnnotations(); if (!traceInfoMap.isEmpty()) { - jgen.writeObjectFieldStart("n"); + jgen.writeObjectFieldStart(KV_ANNOTATIONS_FIELD); for (Map.Entry e : traceInfoMap.entrySet()) { jgen.writeStringField(new String(e.getKey(), "UTF-8"), new String(e.getValue(), "UTF-8")); @@ -154,11 +166,11 @@ public interface Span { List timelineAnnotations = span.getTimelineAnnotations(); if (!timelineAnnotations.isEmpty()) { - jgen.writeArrayFieldStart("t"); + jgen.writeArrayFieldStart(TIMELINE_ANNOTATIONS_FIELD); for (TimelineAnnotation tl : timelineAnnotations) { jgen.writeStartObject(); - jgen.writeNumberField("t", tl.getTime()); - jgen.writeStringField("m", tl.getMessage()); + jgen.writeNumberField(TIMELINE_ANNOTATION_TIME_FIELD, tl.getTime()); + jgen.writeStringField(TIMELINE_ANNOTATION_MESSAGE_FIELD, tl.getMessage()); jgen.writeEndObject(); } jgen.writeEndArray(); diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java index 597b566..58d2884 100644 --- a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java +++ b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.htrace.Span; @@ -48,7 +49,9 @@ import java.util.Random; public class MilliSpan implements Span { private static Random rand = new Random(); - private static ObjectWriter JSON_WRITER = new ObjectMapper().writer(); + private static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static ObjectWriter JSON_WRITER = OBJECT_MAPPER.writer(); + private static ObjectReader JSON_READER = OBJECT_MAPPER.reader(MilliSpan.class); private long begin; private long end; @@ -283,24 +286,38 @@ public class MilliSpan implements Span { return writer.toString(); } - private static long parseUnsignedHexLong(String s) { - return new BigInteger(s, 16).longValue(); + public static Span parseJson(String json) { + MilliSpan span; + try { + span = JSON_READER.readValue(json); + } catch (IOException e) { + // Re-throw JsonProcessingException as IllegalArgumentException to avoid + // leaking Jackson in our APIs. Other types of IOException should not be + // possible when reading from a string. + throw new IllegalArgumentException(e); + } + return span; } - public static class MilliSpanDeserializer + protected static class MilliSpanDeserializer extends JsonDeserializer { + + private static long parseUnsignedHexLong(String s) { + return new BigInteger(s, 16).longValue(); + } + @Override public MilliSpan deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException { JsonNode root = jp.getCodec().readTree(jp); Builder builder = new Builder(); - builder.begin(root.get("b").asLong()). - end(root.get("e").asLong()). - description(root.get("d").asText()). - traceId(parseUnsignedHexLong(root.get("i").asText())). - spanId(parseUnsignedHexLong(root.get("s").asText())). - processId(root.get("r").asText()); - JsonNode parentsNode = root.get("p"); + builder.begin(root.get(BEGIN_FIELD).asLong()). + end(root.get(END_FIELD).asLong()). + description(root.get(DESCRIPTION_FIELD).asText()). + traceId(parseUnsignedHexLong(root.get(TRACE_ID_FIELD).asText())). + spanId(parseUnsignedHexLong(root.get(SPAN_ID_FIELD).asText())). + processId(root.get(PROCESS_ID_FIELD).asText()); + JsonNode parentsNode = root.get(PARENT_IDS_FIELD); LinkedList parents = new LinkedList(); for (Iterator iter = parentsNode.elements(); iter.hasNext(); ) { @@ -308,7 +325,7 @@ public class MilliSpan implements Span { parents.add(parseUnsignedHexLong(parentIdNode.asText())); } builder.parents(parents); - JsonNode traceInfoNode = root.get("n"); + JsonNode traceInfoNode = root.get(KV_ANNOTATIONS_FIELD); if (traceInfoNode != null) { HashMap traceInfo = new HashMap(); for (Iterator iter = traceInfoNode.fieldNames(); @@ -319,15 +336,15 @@ public class MilliSpan implements Span { } builder.traceInfo(traceInfo); } - JsonNode timelineNode = root.get("t"); + JsonNode timelineNode = root.get(TIMELINE_ANNOTATIONS_FIELD); if (timelineNode != null) { LinkedList timeline = new LinkedList(); for (Iterator iter = timelineNode.elements(); iter.hasNext(); ) { JsonNode ann = iter.next(); - timeline.add(new TimelineAnnotation(ann.get("t").asLong(), - ann.get("m").asText())); + timeline.add(new TimelineAnnotation(ann.get(TIMELINE_ANNOTATION_TIME_FIELD).asLong(), + ann.get(TIMELINE_ANNOTATION_MESSAGE_FIELD).asText())); } builder.timeline(timeline); } diff --git a/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java b/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java index 677ec61..83ddcd8 100644 --- a/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java +++ b/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java @@ -18,7 +18,7 @@ package org.apache.htrace.impl; import com.fasterxml.jackson.databind.ObjectMapper; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.apache.htrace.Span; import org.apache.htrace.TimelineAnnotation; @@ -33,20 +33,38 @@ import java.util.Map; import java.util.Random; public class TestMilliSpan { - private void compareSpans(Span expected, Span got) throws Exception { - assertEquals(expected.getStartTimeMillis(), got.getStartTimeMillis()); - assertEquals(expected.getStopTimeMillis(), got.getStopTimeMillis()); - assertEquals(expected.getDescription(), got.getDescription()); - assertEquals(expected.getTraceId(), got.getTraceId()); - assertEquals(expected.getSpanId(), got.getSpanId()); - assertEquals(expected.getProcessId(), got.getProcessId()); - assertEquals(expected.getParentId(), got.getParentId()); + public static boolean compareSpans(Span expected, Span got) throws Exception { + if (expected.getStartTimeMillis() != got.getStartTimeMillis()) { + return false; + } + if (expected.getStopTimeMillis() != got.getStopTimeMillis()) { + return false; + } + if (!expected.getDescription().equals(got.getDescription())) { + return false; + } + if (expected.getTraceId() != got.getTraceId()) { + return false; + } + if (expected.getSpanId() != got.getSpanId()) { + return false; + } + if (!expected.getProcessId().equals(got.getProcessId())) { + return false; + } + if (expected.getParentId() != got.getParentId()) { + return false; + } Map expectedT = expected.getKVAnnotations(); Map gotT = got.getKVAnnotations(); if (expectedT == null) { - assertEquals(null, gotT); + if (gotT != null) { + return false; + } } else { - assertEquals(expectedT.size(), gotT.size()); + if (expectedT.size() != gotT.size()) { + return false; + } Map expectedTMap = new HashMap(); for (byte[] key : expectedT.keySet()) { expectedTMap.put(new String(key, "UTF-8"), @@ -58,7 +76,9 @@ public class TestMilliSpan { new String(gotT.get(key), "UTF-8")); } for (String key : expectedTMap.keySet()) { - assertEquals(expectedTMap.get(key), gotTMap.get(key)); + if (!expectedTMap.get(key).equals(gotTMap.get(key))) { + return false; + } } } List expectedTimeline = @@ -66,16 +86,25 @@ public class TestMilliSpan { List gotTimeline = got.getTimelineAnnotations(); if (expectedTimeline == null) { - assertEquals(null, gotTimeline); + if (gotTimeline != null) { + return false; + } } else { - assertEquals(expectedTimeline.size(), gotTimeline.size()); + if (expectedTimeline.size() != gotTimeline.size()) { + return false; + } Iterator iter = gotTimeline.iterator(); for (TimelineAnnotation expectedAnn : expectedTimeline) { TimelineAnnotation gotAnn = iter.next(); - assertEquals(expectedAnn.getMessage(), gotAnn.getMessage()); - assertEquals(expectedAnn.getTime(), gotAnn.getTime()); + if (!expectedAnn.getMessage().equals(gotAnn.getMessage())) { + return false; + } + if (expectedAnn.getTime() != gotAnn.getTime()) { + return false; + } } } + return true; } @Test @@ -91,7 +120,7 @@ public class TestMilliSpan { String json = span.toJson(); ObjectMapper mapper = new ObjectMapper(); MilliSpan dspan = mapper.readValue(json, MilliSpan.class); - compareSpans(span, dspan); + assertTrue(compareSpans(span, dspan)); } @Test @@ -107,7 +136,7 @@ public class TestMilliSpan { String json = span.toJson(); ObjectMapper mapper = new ObjectMapper(); MilliSpan dspan = mapper.readValue(json, MilliSpan.class); - compareSpans(span, dspan); + assertTrue(compareSpans(span, dspan)); } @Test @@ -124,7 +153,7 @@ public class TestMilliSpan { String json = span.toJson(); ObjectMapper mapper = new ObjectMapper(); MilliSpan dspan = mapper.readValue(json, MilliSpan.class); - compareSpans(span, dspan); + assertTrue(compareSpans(span, dspan)); } @Test @@ -150,6 +179,6 @@ public class TestMilliSpan { String json = span.toJson(); ObjectMapper mapper = new ObjectMapper(); MilliSpan dspan = mapper.readValue(json, MilliSpan.class); - compareSpans(span, dspan); + assertTrue(compareSpans(span, dspan)); } } diff --git a/htrace-flume/README.md b/htrace-flume/README.md index 9181fc8..b0e358f 100644 --- a/htrace-flume/README.md +++ b/htrace-flume/README.md @@ -18,7 +18,8 @@ htrace-flume ============ -htrace-flume provides the span receiver which sends tracing spans to Flume collector. +htrace-flume implements a SpanReceiver that sends tracing spans to Flume. +Flume can aggregate traces from remote data centers and transfer to storage sink efficiently and reliably. Tutorial -------- @@ -26,14 +27,14 @@ Tutorial 1) build and deploy $ cd htrace/htrace-flume - $ mvn compile assembly:single - $ cp target/htrace-flume-*-jar-with-dependencies.jar $HADOOP_HOME/share/hadoop/hdfs/lib/ + $ mvn compile + $ cp target/htrace-flume-${project.version}.jar $HADOOP_HOME/share/hadoop/hdfs/lib/ 2) Edit hdfs-site.xml to include the following: hadoop.trace.spanreceiver.classes - org.htrace.impl.FlumeSpanReceiver + org.apache.htrace.impl.FlumeSpanReceiver hadoop.htrace.flume.hostname @@ -44,33 +45,43 @@ Tutorial 60000 -3) Setup flume collector - -Create flume-conf.properties file. Below is a sample that sets up an hdfs sink. - - agent.sources = avro-collection-source - agent.channels = memoryChannel - agent.sinks = loggerSink hdfs-sink - - # avro source - should match the configurations in hdfs-site.xml - agent.sources.avro-collection-source.type = avro - agent.sources.avro-collection-source.bind = 127.0.0.1 - agent.sources.avro-collection-source.port = 60000 - agent.sources.avro-collection-source.channels = memoryChannel - - #sample hdfs-sink, change to any sink that flume supports - agent.sinks.hdfs-sink.type = hdfs - agent.sinks.hdfs-sink.hdfs.path = hdfs://127.0.0.1:9000/flume - agent.sinks.hdfs-sink.channel = memoryChannel - agent.sinks.hdfs-sink.hdfs.fileType = DataStream - agent.sinks.hdfs-sink.hdfs.writeFormat = Text - agent.sinks.hdfs-sink.hdfs.rollSize = 0 - agent.sinks.hdfs-sink.hdfs.rollCount = 10000 - agent.sinks.hdfs-sink.hdfs.batchSize = 100 - - # memory channel - agent.channels.memoryChannel.capacity = 10000 - agent.channels.memoryChannel.transactionCapacity = 1000 - -Run flume agent using command "flume-ng agent -c ./conf/ -f ./conf/flume-conf.properties -n agent" +3) Setup flume agent + + * Below is a sample flume configuration that sets up an hdfs sink. + + agent.sources = avro-collection-source + agent.channels = memoryChannel + agent.sinks = hdfs-sink + *\# avro source - should match FlumeSpanReceiver configurations in hdfs-site.xml* + agent.sources.avro-collection-source.type = avro + agent.sources.avro-collection-source.bind = 127.0.0.1 + agent.sources.avro-collection-source.port = 60000 + agent.sources.avro-collection-source.channels = memoryChannel + *\# sample hdfs-sink, change to any sink that flume supports* + agent.sinks.hdfs-sink.type = hdfs + agent.sinks.hdfs-sink.hdfs.path = hdfs://127.0.0.1:9000/flume + agent.sinks.hdfs-sink.channel = memoryChannel + agent.sinks.hdfs-sink.hdfs.fileType = DataStream + agent.sinks.hdfs-sink.hdfs.writeFormat = Text + agent.sinks.hdfs-sink.hdfs.rollSize = 0 + agent.sinks.hdfs-sink.hdfs.rollCount = 10000 + agent.sinks.hdfs-sink.hdfs.batchSize = 100 + *\# memory channel* + agent.channels.memoryChannel.capacity = 10000 + agent.channels.memoryChannel.transactionCapacity = 1000 + + * Start flume agent using command "flume-ng agent -c ./conf/ -f ./conf/flume-conf.properties -n agent" + +4) Setup HTrace sink + + * Use the following configurations to setup an HTrace sink. + + *\# htrace-sink* + agent.sinks.htrace-sink.type = org.apache.htrace.flume.TraceSink + agent.sinks.htrace-sink.channel = memoryChannel + agent.sinks.htrace-sink.span.receiver = LocalFileSpanReceiver + agent.sinks.htrace-sink.local-file-span-receiver.path = /tmp/span.txt + + * Copy htrace-core.jar and htrace-flume.jar to $FLUME_HOME/lib. + * Start flume agent using command "flume-ng agent -c ./conf/ -f ./conf/flume-conf.properties -n agent" diff --git a/htrace-flume/pom.xml b/htrace-flume/pom.xml index b9fe5a8..a94aed4 100644 --- a/htrace-flume/pom.xml +++ b/htrace-flume/pom.xml @@ -107,6 +107,11 @@ language governing permissions and limitations under the License. --> org.apache.flume + flume-ng-core + ${flume.version} + + + org.apache.flume flume-ng-sdk ${flume.version} tests diff --git a/htrace-flume/src/main/java/org/apache/htrace/flume/FlumeHTraceConfiguration.java b/htrace-flume/src/main/java/org/apache/htrace/flume/FlumeHTraceConfiguration.java new file mode 100644 index 0000000..8d288d3 --- /dev/null +++ b/htrace-flume/src/main/java/org/apache/htrace/flume/FlumeHTraceConfiguration.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.flume; + +import org.apache.flume.Context; +import org.apache.htrace.HTraceConfiguration; + +/** + * Meshes {@link HTraceConfiguration} to {@link Context} + */ +public class FlumeHTraceConfiguration extends HTraceConfiguration { + private final Context ctx; + + public FlumeHTraceConfiguration(Context ctx) { + this.ctx = ctx; + } + + @Override + public String get(String key) { + // flume context already removed key prefix "agent.sinks.mysink." + return ctx.getString(key); + } + + @Override + public String get(String key, String defaultValue) { + return ctx.getString(key, defaultValue); + } + + @Override + public boolean getBoolean(String key, boolean defaultValue) { + return ctx.getBoolean(key, defaultValue); + } +} \ No newline at end of file diff --git a/htrace-flume/src/main/java/org/apache/htrace/flume/TraceSink.java b/htrace-flume/src/main/java/org/apache/htrace/flume/TraceSink.java new file mode 100644 index 0000000..e02c7ac --- /dev/null +++ b/htrace-flume/src/main/java/org/apache/htrace/flume/TraceSink.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.flume; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.ConfigurationException; +import org.apache.flume.sink.AbstractSink; +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiver; +import org.apache.htrace.SpanReceiverBuilder; +import org.apache.htrace.impl.MilliSpan; + +public class TraceSink extends AbstractSink implements Configurable { + public static final String TRACESINK_BATCHSIZE_KEY = "batchsize"; + public static final int DEFAULT_TRACESINK_BATCHSIZE = 100; + + private static final Log LOG = LogFactory.getLog(TraceSink.class); + private SpanReceiver receiver; + private int batchSize; + + @Override + public void configure(Context ctx) { + FlumeHTraceConfiguration cfg = new FlumeHTraceConfiguration(ctx); + SpanReceiverBuilder builder = new SpanReceiverBuilder(cfg); + receiver = builder.build(); + if (receiver == null) { + throw new ConfigurationException("Invalid configuration"); + } + batchSize = ctx.getInteger(TRACESINK_BATCHSIZE_KEY, new Integer(DEFAULT_TRACESINK_BATCHSIZE)); + } + + public SpanReceiver getSpanReceiver() { + return receiver; + } + + @Override + public Status process() throws EventDeliveryException { + Status status = null; + + // Start transaction + Channel ch = getChannel(); + Transaction txn = ch.getTransaction(); + txn.begin(); + try { + status = Status.READY; + for (long i = 0; i < batchSize; i++) { + Event event = ch.take(); + if (event == null) { + status = Status.BACKOFF; + break; + } + try { + // forward event to span receiver + String body = new String(event.getBody(), "UTF-8"); + Span span = MilliSpan.parseJson(body); + receiver.receiveSpan(span); + } catch (IllegalArgumentException e) { + // drop malformed events + LOG.warn("Failed to parse event", e); + } + } + txn.commit(); + } catch (Throwable t) { + txn.rollback(); + status = Status.BACKOFF; + // re-throw all Errors + if (t instanceof Error) { + throw (Error)t; + } + } finally { + txn.close(); + } + return status; + } +} diff --git a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java index 54b8a14..91c2fbc 100644 --- a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java +++ b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java @@ -171,10 +171,10 @@ public class FlumeSpanReceiver implements SpanReceiver { for (Span span : dequeuedSpans) { // Headers allow Flume to filter Map headers = new HashMap(); - headers.put("TraceId", Long.toString(span.getTraceId())); - headers.put("SpanId", Long.toString(span.getSpanId())); - headers.put("ProcessId", span.getProcessId()); - headers.put("Description", span.getDescription()); + headers.put(Span.TRACE_ID_FIELD, Long.toString(span.getTraceId())); + headers.put(Span.SPAN_ID_FIELD, Long.toString(span.getSpanId())); + headers.put(Span.PROCESS_ID_FIELD, span.getProcessId()); + headers.put(Span.DESCRIPTION_FIELD, span.getDescription()); String body = span.toJson(); diff --git a/htrace-flume/src/test/java/org/apache/htrace/impl/FlumeSpanReceiverTestUtility.java b/htrace-flume/src/test/java/org/apache/htrace/impl/FlumeSpanReceiverTestUtility.java new file mode 100644 index 0000000..f12952c --- /dev/null +++ b/htrace-flume/src/test/java/org/apache/htrace/impl/FlumeSpanReceiverTestUtility.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.Server; +import org.apache.flume.api.RpcTestUtils; +import org.apache.flume.source.avro.AvroFlumeEvent; +import org.apache.flume.source.avro.AvroSourceProtocol; +import org.apache.flume.source.avro.Status; +import org.apache.htrace.HTraceConfiguration; +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiver; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceCreator; + +public class FlumeSpanReceiverTestUtility { + private SpanReceiver spanReceiver; + private Server flumeServer; + private TraceCreator traceCreator; + + public void startReceiver(Map extraConf, AvroSourceProtocol avroHandler) { + // Start Flume server + Assert.assertNull(flumeServer); + flumeServer = RpcTestUtils.startServer(avroHandler); + + // Create and configure span receiver + Map conf = new HashMap(); + conf.put(FlumeSpanReceiver.FLUME_HOSTNAME_KEY, "127.0.0.1"); + conf.put(FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort())); + if (extraConf != null) { + conf.putAll(extraConf); + } + + spanReceiver = new FlumeSpanReceiver(HTraceConfiguration.fromMap(conf)); + // Create trace creator, it will register our receiver + traceCreator = new TraceCreator(spanReceiver); + } + + public List createSimpleTraces() { + List spans = new ArrayList(); + Span rootSpan = new MilliSpan("ROOT", 1, Span.ROOT_SPAN_ID, 100, "test"); + Span innerOne = rootSpan.child("Some good work"); + Span innerTwo = innerOne.child("Some more good work"); + innerTwo.stop(); + spans.add(innerTwo); + innerOne.stop(); + spans.add(innerOne); + rootSpan.addKVAnnotation("foo".getBytes(), "bar".getBytes()); + rootSpan.addTimelineAnnotation("timeline"); + rootSpan.stop(); + spans.add(rootSpan); + + return spans; + } + + public void stopReceiver() throws IOException { + // Close span receiver + if (spanReceiver != null) { + Trace.removeReceiver(spanReceiver); + spanReceiver.close(); + spanReceiver = null; + } + + // Close Flume server + if (flumeServer != null) { + RpcTestUtils.stopServer(flumeServer); + flumeServer = null; + } + } + + public TraceCreator getTraceCreator() { + return traceCreator; + } + + /** + * A mock Flume agent that collects events + */ + public static class AvroHandler implements AvroSourceProtocol { + private ArrayList all_events = new ArrayList(); + + public List getAllEvents() { + return new ArrayList(all_events); + } + + @Override + public Status append(AvroFlumeEvent event) throws AvroRemoteException { + all_events.add(event); + return Status.OK; + } + + @Override + public Status appendBatch(List events) throws + AvroRemoteException { + all_events.addAll(events); + return Status.OK; + } + } +} diff --git a/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java index a825690..4b1e3f3 100644 --- a/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java +++ b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java @@ -19,74 +19,43 @@ package org.apache.htrace.impl; import java.io.IOException; import java.nio.charset.Charset; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import junit.framework.Assert; -import org.apache.avro.AvroRemoteException; -import org.apache.avro.ipc.Server; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.api.RpcTestUtils; import org.apache.flume.source.avro.AvroFlumeEvent; -import org.apache.flume.source.avro.AvroSourceProtocol; -import org.apache.flume.source.avro.Status; -import org.apache.htrace.HTraceConfiguration; import org.apache.htrace.Span; -import org.apache.htrace.SpanReceiver; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceCreator; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.apache.htrace.impl.FlumeSpanReceiverTestUtility; +import org.apache.htrace.impl.FlumeSpanReceiverTestUtility.AvroHandler; import org.junit.Test; public class TestFlumeSpanReceiver { - private static final Log LOG = LogFactory.getLog(TestFlumeSpanReceiver.class); - - private static final String ROOT_SPAN_DESC = "ROOT"; - - private SpanReceiver spanReceiver; - private Server flumeServer; - private TraceCreator traceCreator; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - } + /* + * Setup a mock Flume agent using AvroHandler, and verify FlumeSpanReceiver + * can deliver traces. + */ @Test public void testSimpleTraces() throws FlumeException, EventDeliveryException, IOException { - AvroHandler avroHandler = null; + FlumeSpanReceiverTestUtility util = new FlumeSpanReceiverTestUtility(); + // AvroHandler is a mock Flume agent that collects events and replies OK + AvroHandler avroHandler = new AvroHandler(); List spans = null; + // Send some simple traces via FlumeSpanReceiver to AvroHandler try { - avroHandler = new AvroHandler(); - startReceiver(null, avroHandler); - - spans = new ArrayList(); - Span rootSpan = new MilliSpan(ROOT_SPAN_DESC, 1, Span.ROOT_SPAN_ID, 100, "test"); - Span innerOne = rootSpan.child("Some good work"); - Span innerTwo = innerOne.child("Some more good work"); - innerTwo.stop(); - spans.add(innerTwo); - innerOne.stop(); - spans.add(innerOne); - rootSpan.addKVAnnotation("foo".getBytes(), "bar".getBytes()); - rootSpan.addTimelineAnnotation("timeline"); - rootSpan.stop(); - spans.add(rootSpan); - + util.startReceiver(null, avroHandler); + spans = util.createSimpleTraces(); } finally { - stopReceiver(); + util.stopReceiver(); } + + // Verify the received spans match the ones we sent List events = avroHandler.getAllEvents(); Assert.assertEquals(spans.size(), events.size()); for (int i = 0; i < spans.size(); i ++) { @@ -95,82 +64,37 @@ public class TestFlumeSpanReceiver { } } + /* + * Test the receiver under multi-threading condition + */ @Test public void testConcurrency() throws FlumeException, EventDeliveryException, IOException { + FlumeSpanReceiverTestUtility util = new FlumeSpanReceiverTestUtility(); try { Map extraConf = new HashMap(); extraConf.put(FlumeSpanReceiver.NUM_THREADS_KEY, "5"); - startReceiver(extraConf, new RpcTestUtils.OKAvroHandler()); - traceCreator.createThreadedTrace(); + // OKAvroHandler is a mock Flume agent that always replies OK + util.startReceiver(extraConf, new RpcTestUtils.OKAvroHandler()); + util.getTraceCreator().createThreadedTrace(); } finally { - stopReceiver(); + util.stopReceiver(); } } + /* + * Test the receiver is resilient when Flume is down + */ @Test public void testResilience() throws FlumeException, EventDeliveryException, IOException { + FlumeSpanReceiverTestUtility util = new FlumeSpanReceiverTestUtility(); try { - startReceiver(null, new RpcTestUtils.FailedAvroHandler()); - traceCreator.createThreadedTrace(); + // FailedAvroHandler is a mock Flume agent that always replies FAILED + util.startReceiver(null, new RpcTestUtils.FailedAvroHandler()); + util.getTraceCreator().createThreadedTrace(); } finally { - stopReceiver(); - } - } - - private void startReceiver(Map extraConf, AvroSourceProtocol avroHandler) { - // Start Flume server - Assert.assertNull(flumeServer); - flumeServer = RpcTestUtils.startServer(avroHandler); - - // Create and configure span receiver - Map conf = new HashMap(); - conf.put(FlumeSpanReceiver.FLUME_HOSTNAME_KEY, "127.0.0.1"); - conf.put(FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort())); - if (extraConf != null) { - conf.putAll(extraConf); - } - - spanReceiver = new FlumeSpanReceiver(HTraceConfiguration.fromMap(conf)); - - // Create trace creator, it will register our receiver - traceCreator = new TraceCreator(spanReceiver); - } - - private void stopReceiver() throws IOException { - // Close span receiver - if (spanReceiver != null) { - Trace.removeReceiver(spanReceiver); - spanReceiver.close(); - spanReceiver = null; - } - - // Close Flume server - if (flumeServer != null) { - RpcTestUtils.stopServer(flumeServer); - flumeServer = null; - } - } - - private static class AvroHandler implements AvroSourceProtocol { - private ArrayList all_events = new ArrayList(); - - public List getAllEvents() { - return new ArrayList(all_events); - } - - @Override - public Status append(AvroFlumeEvent event) throws AvroRemoteException { - all_events.add(event); - return Status.OK; - } - - @Override - public Status appendBatch(List events) throws - AvroRemoteException { - all_events.addAll(events); - return Status.OK; + util.stopReceiver(); } } } diff --git a/htrace-flume/src/test/java/org/apache/htrace/impl/TestTraceSink.java b/htrace-flume/src/test/java/org/apache/htrace/impl/TestTraceSink.java new file mode 100644 index 0000000..d34a57d --- /dev/null +++ b/htrace-flume/src/test/java/org/apache/htrace/impl/TestTraceSink.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.impl; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.avro.AvroRemoteException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.source.avro.AvroFlumeEvent; +import org.apache.flume.source.avro.AvroSourceProtocol; +import org.apache.flume.source.avro.Status; +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiverBuilder; +import org.apache.htrace.flume.TraceSink; +import org.junit.Test; + +public class TestTraceSink { + private static final Log LOG = LogFactory.getLog(TestTraceSink.class); + private static FlumeSpanReceiverTestUtility receiverUtil = new FlumeSpanReceiverTestUtility(); + private TraceSink sink; + + /* + * Setup a Flume pipeline from SpanReceiver to TraceSink and verify + * that traces are delivered correctly. + * FlumeSpanReceiver -> MockFlumeAgent -> TraceSink -> POJOSpanReceiver + */ + @Test + public void testSimpleTraces() throws Exception { + List sentSpans = null; + + POJOSpanReceiver receiver = null; + // delivery 3 simple traces through the pipeline + try { + receiver = startPipeline(1); + sentSpans = receiverUtil.createSimpleTraces(); + } catch (Exception e) { + LOG.error(e); + throw e; + } finally { + stopPipeline(); + } + + // Verify received traces match the ones we sent + Collection recvSpans = receiver.getSpans(); + Assert.assertEquals(sentSpans.size(), recvSpans.size()); + for (Span sentSpan : sentSpans) { + boolean found = false; + for (Span recvSpan : recvSpans) { + if (TestMilliSpan.compareSpans(sentSpan, recvSpan)) { + recvSpans.remove(recvSpan); + found = true; + break; + } + } + Assert.assertTrue(found); + } + } + + /* + * Test the receiver under multi-threading condition + */ + @Test + public void testConcurrency() throws Exception { + try { + startPipeline(5); + receiverUtil.getTraceCreator().createThreadedTrace(); + } catch (Exception e) { + LOG.error(e); + throw e; + } finally { + stopPipeline(); + } + } + + private POJOSpanReceiver startPipeline(int numThreads) throws IOException { + // configure TraceSink + Map sinkConf = new HashMap(); + sinkConf.put(SpanReceiverBuilder.SPAN_RECEIVER_CONF_KEY, "POJOSpanReceiver"); + + // start TraceSink and MemoryChannel + sink = new TraceSink(); + Context ctx = new Context(); + ctx.putAll(sinkConf); + Configurables.configure(sink, ctx); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, ctx); + sink.setChannel(channel); + sink.start(); + + // start SpanReceiver and MockFlumeAgent, connect to MemoryChannel + Map receiverConf = new HashMap(); + receiverConf.put(FlumeSpanReceiver.NUM_THREADS_KEY, Integer.toString(numThreads)); + receiverUtil.startReceiver(receiverConf, new MockFlumeAgent(sink, channel)); + + return (POJOSpanReceiver)sink.getSpanReceiver(); + } + + private void stopPipeline() throws IOException { + // stop SpanReceiver and MockFlumeAent + receiverUtil.stopReceiver(); + // stop TraceSink + if (sink != null) { + sink.stop(); + sink = null; + } + } + + /* + * MockFlumeAgent forward traces from SpanReceiver to TraceSink + */ + private static class MockFlumeAgent implements AvroSourceProtocol { + private TraceSink sink; + private Channel channel; + MockFlumeAgent(TraceSink sink, Channel channel) { + this.sink = sink; + this.channel = channel; + } + + public static Event convertEvent(AvroFlumeEvent ae) { + Map aeh = ae.getHeaders(); + Map eh = new HashMap(); + for (CharSequence k : aeh.keySet()) { + eh.put(k.toString(), aeh.get(k).toString()); + } + return EventBuilder.withBody(ae.getBody().array(), eh); + } + + // write received event to channel + // append and appendBatch methods are synchronized because Flume sink is single-threaded. + @Override + public synchronized Status append(AvroFlumeEvent ae) throws AvroRemoteException { + Transaction tx = channel.getTransaction(); + try { + tx.begin(); + Event e = convertEvent(ae); + channel.put(e); + tx.commit(); + } catch (Exception ex) { + LOG.error(ex); + tx.rollback(); + } finally { + tx.close(); + } + try { + sink.process(); + } catch (EventDeliveryException ex) { + // ignore + } + return Status.OK; + } + + // write a batch of events to channel + @Override + public synchronized Status appendBatch(List events) throws + AvroRemoteException { + Transaction tx = channel.getTransaction(); + try { + tx.begin(); + for (AvroFlumeEvent ae : events) { + Event e = convertEvent(ae); + channel.put(e); + } + tx.commit(); + } catch (Exception ex) { + LOG.error(ex); + tx.rollback(); + } finally { + tx.close(); + } + try { + sink.process(); + } catch (EventDeliveryException ex) { + // ignore + } + return Status.OK; + } + } +}