(1000);
this.protocolFactory = new TBinaryProtocol.Factory();
configure(conf);
}
+ private void logAndThrow(Throwable exception) {
+ LOG.error(ExceptionUtils.getStackTrace(exception));
+ throw new RuntimeException(exception);
+ }
+
+ protected Transport createTransport(HTraceConfiguration conf) {
+ ClassLoader classLoader = Builder.class.getClassLoader();
+ String className = conf.get(TRANSPORT_CLASS_KEY, DEFAULT_TRANSPORT_CLASS);
+ Transport transport = null;
+ try {
+ Class> cls = classLoader.loadClass(className);
+ transport = (Transport)cls.newInstance();
+ } catch (ClassNotFoundException
+ | InstantiationException
+ | IllegalAccessException e) {
+ logAndThrow(e);
+ }
+ return transport;
+ }
+
private void configure(HTraceConfiguration conf) {
this.conf = conf;
- this.collectorHostname = conf.get(HOSTNAME_KEY, DEFAULT_COLLECTOR_HOSTNAME);
- this.collectorPort = conf.getInt(PORT_KEY, DEFAULT_COLLECTOR_PORT);
// initialize the endpoint. This endpoint is used while writing the Span.
initConverter();
@@ -186,26 +195,23 @@ public class ZipkinSpanReceiver extends SpanReceiver {
InetAddress tracedServiceHostname = null;
// Try and get the hostname. If it's not configured try and get the local hostname.
try {
+ //TODO (clehene) extract conf to constant
+ //TODO (clehene) has this been deprecated?
String host = conf.get("zipkin.traced-service-hostname",
InetAddress.getLocalHost().getHostAddress());
-
tracedServiceHostname = InetAddress.getByName(host);
} catch (UnknownHostException e) {
LOG.error("Couldn't get the localHost address", e);
}
short tracedServicePort = (short) conf.getInt("zipkin.traced-service-port", -1);
byte[] address = tracedServiceHostname != null
- ? tracedServiceHostname.getAddress() : DEFAULT_COLLECTOR_HOSTNAME.getBytes();
+ ? tracedServiceHostname.getAddress() : InetAddress.getLoopbackAddress().getAddress();
int ipv4 = ByteBuffer.wrap(address).getInt();
this.converter = new HTraceToZipkinConverter(ipv4, tracedServicePort);
}
private class WriteSpanRunnable implements Runnable {
- /**
- * scribe client to push zipkin spans
- */
- private Scribe.Iface scribe = null;
private final ByteArrayOutputStream baos;
private final TProtocol streamProtocol;
@@ -215,16 +221,15 @@ public class ZipkinSpanReceiver extends SpanReceiver {
}
/**
- * This runnable converts a HTrace span to a Zipkin span and sends it across the zipkin
- * collector as a thrift object. The scribe client which is used for rpc writes a list of
- * LogEntry objects, so the span objects are first transformed into LogEntry objects before
- * sending to the zipkin-collector.
+ *
+ * This runnable converts an HTrace span to a Zipkin span and sends it across the transport
+ * as a Thrift object.
*
* Here is a little ascii art which shows the above transformation:
*
- * +------------+ +------------+ +------------+ +-----------------+
- * | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector|
- * +------------+ +------------+ +------------+ (Scribe rpc) +-----------------+
+ * +------------+ +------------+ +-----------------+
+ * | HTrace Span|-->|Zipkin Span | ===========> | Zipkin Collector|
+ * +------------+ +------------+ (transport) +-----------------+
*
*/
@Override
@@ -236,6 +241,7 @@ public class ZipkinSpanReceiver extends SpanReceiver {
while (running.get() || queue.size() > 0) {
Span firstSpan = null;
+ //TODO (clenene) the following code (try / catch) is duplicated in / from FlumeSpanReceiver
try {
// Block for up to a second. to try and get a span.
// We only block for a little bit in order to notice if the running value has changed
@@ -256,13 +262,17 @@ public class ZipkinSpanReceiver extends SpanReceiver {
if (dequeuedSpans.isEmpty()) continue;
- // If this is the first time through or there was an error re-connect
- if (scribe == null) {
- startClient();
+ if (!transport.isOpen()) {
+ try {
+ transport.open(conf);
+ } catch (Throwable e) {
+ logAndThrow(e);
+ }
}
+
// Create a new list every time through so that the list doesn't change underneath
// thrift as it's sending.
- List entries = new ArrayList(dequeuedSpans.size());
+ List entries = new ArrayList<>(dequeuedSpans.size());
try {
// Convert every de-queued span
for (Span htraceSpan : dequeuedSpans) {
@@ -273,21 +283,18 @@ public class ZipkinSpanReceiver extends SpanReceiver {
// Write the span to a BAOS
zipkinSpan.write(streamProtocol);
- // Do Base64 encoding and put the string into a log entry.
- LogEntry logEntry =
- new LogEntry(CATEGORY, Base64.encodeBase64String(baos.toByteArray()));
- entries.add(logEntry);
+ entries.add(baos.toByteArray());
}
// Send the entries
- scribe.Log(entries);
+ transport.send(entries);
+
// clear the list for the next time through.
dequeuedSpans.clear();
// reset the error counter.
errorCount = 0;
} catch (Exception e) {
- LOG.error("Error when writing to the zipkin collector: " +
- collectorHostname + ":" + collectorPort, e);
+ LOG.error("Error when writing to the zipkin transport: " + transport, e);
errorCount += 1;
// If there have been ten errors in a row start dropping things.
@@ -298,7 +305,6 @@ public class ZipkinSpanReceiver extends SpanReceiver {
LOG.error("Drop " + dequeuedSpans.size() + " span(s) because queue is full");
}
}
-
closeClient();
try {
// Since there was an error sleep just a little bit to try and allow the
@@ -307,42 +313,22 @@ public class ZipkinSpanReceiver extends SpanReceiver {
} catch (InterruptedException e1) {
// Ignored
}
- }
- }
+ } // catch
+ } // while
closeClient();
}
/**
* Close out the connection.
*/
- private void closeClient() {
- // close out the transport.
- if (scribe != null && scribe instanceof Scribe.Client) {
- ((Scribe.Client) scribe).getInputProtocol().getTransport().close();
- scribe = null;
- }
- }
-
- /**
- * Re-connect to Zipkin.
- */
- private void startClient() {
- if (this.scribe == null) {
- this.scribe = newScribe();
+ private void closeClient(){
+ try {
+ transport.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close transport", e);
}
}
- }
- // Override for testing
- Scribe.Iface newScribe() {
- TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort));
- try {
- transport.open();
- } catch (TTransportException e) {
- e.printStackTrace();
- }
- TProtocol protocol = protocolFactory.getProtocol(transport);
- return new Scribe.Client(protocol);
}
/**
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java b/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
deleted file mode 100644
index f166d35..0000000
--- a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.htrace.zipkin;
-
-import com.twitter.zipkin.gen.zipkinCoreConstants;
-
-import org.apache.htrace.core.HTraceConfiguration;
-import org.apache.htrace.core.MilliSpan;
-import org.apache.htrace.core.POJOSpanReceiver;
-import org.apache.htrace.core.Span;
-import org.apache.htrace.core.SpanId;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Creates HTrace and then convert it to Zipkin trace and checks whether it is a valid span or not.
- */
-public class TestHTraceSpanToZipkinSpan {
- private static final String ROOT_SPAN_DESC = "ROOT";
-
- @Test
- public void testHTraceToZipkin() throws IOException {
- Span rootSpan = new MilliSpan.Builder().
- description(ROOT_SPAN_DESC).
- parents(new SpanId[] { } ).
- spanId(new SpanId(100, 100)).
- tracerId("test").
- begin(System.currentTimeMillis()).
- build();
- Span innerOne = rootSpan.child("Some good work");
- Span innerTwo = innerOne.child("Some more good work");
- innerTwo.stop();
- innerOne.stop();
- rootSpan.addKVAnnotation("foo", "bar");
- rootSpan.addTimelineAnnotation("timeline");
- rootSpan.stop();
-
- for (Span s : new Span[] {rootSpan, innerOne, innerTwo}) {
- com.twitter.zipkin.gen.Span zs =
- new HTraceToZipkinConverter(12345, (short) 12).convert(s);
- assertSpansAreEquivalent(s, zs);
- }
- }
-
- @Test
- public void testHTraceAnnotationTimestamp() throws IOException, InterruptedException {
-
- String tracerId = "testHTraceAnnotationTimestamp";
- long startTime = System.currentTimeMillis() * 1000;
- Span ms = new MilliSpan.Builder().
- description(tracerId).parents(new SpanId[] { }).
- spanId(new SpanId(2L, 2L)).
- tracerId(tracerId).
- begin(System.currentTimeMillis()).
- build();
-
- Thread.sleep(500);
- long annoStartTime = System.currentTimeMillis() * 1000;
- Thread.sleep(500);
- ms.addTimelineAnnotation("anno");
- Thread.sleep(500);
- long annoEndTime = System.currentTimeMillis() * 1000;
- Thread.sleep(500);
- ms.stop();
- long endTime = System.currentTimeMillis() * 1000;
-
-
-
- com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
-
- // Check to make sure that all times are in the proper order.
- for (com.twitter.zipkin.gen.Annotation annotation : zs.getAnnotations()) {
- // CS and SR should be before the annotation
- // the annotation should be in between annotationStart and annotationEnd times
- // SS and CR should be after annotationEnd and before endtime.
- if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_SEND)
- || annotation.getValue().equals(zipkinCoreConstants.SERVER_RECV)) {
- assertTrue(startTime <= annotation.getTimestamp());
- assertTrue(annotation.getTimestamp() <= annoStartTime);
- } else if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_RECV)
- || annotation.getValue().equals(zipkinCoreConstants.SERVER_SEND)) {
- assertTrue(annoEndTime <= annotation.getTimestamp());
- assertTrue(annotation.getTimestamp() <= endTime);
- } else {
- assertTrue(annoStartTime <= annotation.getTimestamp());
- assertTrue(annotation.getTimestamp() <= annoEndTime);
- assertTrue(annotation.getTimestamp() <= endTime);
- }
- }
- }
-
- @Test
- public void testHTraceDefaultPort() throws IOException {
- MilliSpan ms = new MilliSpan.Builder().description("test").
- parents(new SpanId[] { new SpanId(2L, 2L) }).
- spanId(new SpanId(2L, 3L)).
- tracerId("hmaster").
- begin(System.currentTimeMillis()).
- build();
- com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
- for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) {
- assertEquals((short)60000, annotation.getHost().getPort());
- }
-
- // make sure it's all lower cased
- ms = new MilliSpan.Builder().description("test").
- parents(new SpanId[] {new SpanId(2, 2)}).
- spanId(new SpanId(2, 3)).
- tracerId("HregIonServer").
- begin(System.currentTimeMillis()).
- build();
- zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
- for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) {
- assertEquals((short)60020, annotation.getHost().getPort());
- }
- }
-
- private void assertSpansAreEquivalent(Span s, com.twitter.zipkin.gen.Span zs) {
- assertTrue("zipkin doesn't support multiple parents to a single span.",
- s.getParents().length <= 1);
- if (s.getParents().length == 1) {
- assertEquals(s.getParents()[0].getLow(), zs.getParent_id());
- }
- assertEquals(s.getSpanId().getLow(), zs.getId());
- Assert.assertNotNull(zs.getAnnotations());
- if (ROOT_SPAN_DESC.equals(zs.getName())) {
- assertEquals(5, zs.getAnnotations().size());// two start, two stop + one timeline annotation
- assertEquals(1, zs.getBinary_annotations().size());
- } else {
- assertEquals(4, zs.getAnnotations().size());
- }
- }
-}
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java b/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java
index 6595772..f077826 100644
--- a/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java
+++ b/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.codec.binary.Base64;
+import org.apache.htrace.Transport;
import org.apache.htrace.core.AlwaysSampler;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.MilliSpan;
@@ -37,20 +38,21 @@ import org.apache.htrace.core.TracerPool;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TTransportException;
import org.junit.Assert;
import org.junit.Test;
public class TestZipkinSpanReceiver {
- private Tracer newTracer(final Scribe.Iface scribe) {
+ private Tracer newTracer(final Transport transport) {
TracerPool pool = new TracerPool("newTracer");
pool.addReceiver(new ZipkinSpanReceiver(HTraceConfiguration.EMPTY) {
- @Override Scribe.Iface newScribe() {
- return scribe;
+ @Override
+ protected Transport createTransport(HTraceConfiguration conf) {
+ return transport;
}
});
- return new Tracer.Builder().
- name("ZipkinTracer").
+ return new Tracer.Builder("ZipkinTracer").
tracerPool(pool).
conf(HTraceConfiguration.fromKeyValuePairs(
"sampler.classes", AlwaysSampler.class.getName()
@@ -60,7 +62,7 @@ public class TestZipkinSpanReceiver {
@Test
public void testSimpleTraces() throws IOException, InterruptedException {
- FakeZipkinScribe scribe = new FakeZipkinScribe();
+ FakeZipkinTransport scribe = new FakeZipkinTransport();
Tracer tracer = newTracer(scribe);
Span rootSpan = new MilliSpan.Builder().
description("root").
@@ -84,31 +86,14 @@ public class TestZipkinSpanReceiver {
@Test
public void testConcurrency() throws IOException {
- Scribe.Iface alwaysOk = new Scribe.Iface() {
- @Override
- public ResultCode Log(List messages) throws TException {
- return ResultCode.OK;
- }
- };
- Tracer tracer = newTracer(alwaysOk);
- TraceCreator traceCreator = new TraceCreator(tracer);
- traceCreator.createThreadedTrace();
- }
-
- @Test
- public void testResilience() throws IOException {
- Scribe.Iface alwaysTryLater = new Scribe.Iface() {
- @Override
- public ResultCode Log(List messages) throws TException {
- return ResultCode.TRY_LATER;
- }
- };
- Tracer tracer = newTracer(alwaysTryLater);
+ Tracer tracer = newTracer(new FakeZipkinTransport(){
+ @Override public void send(List spans) throws IOException { /*do nothing*/ }
+ });
TraceCreator traceCreator = new TraceCreator(tracer);
traceCreator.createThreadedTrace();
}
- private static class FakeZipkinScribe implements Scribe.Iface {
+ private static class FakeZipkinTransport implements Transport {
private final BlockingQueue receivedSpans =
new ArrayBlockingQueue(1);
@@ -117,19 +102,35 @@ public class TestZipkinSpanReceiver {
return receivedSpans.take();
}
+
@Override
- public ResultCode Log(List messages) throws TException {
- for (LogEntry message : messages) {
- Assert.assertEquals("zipkin", message.category);
- byte[] bytes = Base64.decodeBase64(message.message);
-
- TMemoryBuffer transport = new TMemoryBuffer(bytes.length);
- transport.write(bytes);
- com.twitter.zipkin.gen.Span zSpan = new com.twitter.zipkin.gen.Span();
- zSpan.read(new TBinaryProtocol(transport));
- receivedSpans.add(zSpan);
+ public void open(HTraceConfiguration conf) throws IOException {
+
+ }
+
+ @Override
+ public boolean isOpen() {
+ return false;
+ }
+
+ @Override
+ public void send(List spans) throws IOException {
+ for (byte[] message : spans) {
+ TMemoryBuffer transport = new TMemoryBuffer(message.length);
+ try {
+ transport.write(message);
+ com.twitter.zipkin.gen.Span zSpan = new com.twitter.zipkin.gen.Span();
+ zSpan.read(new TBinaryProtocol(transport));
+ receivedSpans.add(zSpan);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
}
- return ResultCode.OK;
+ }
+
+ @Override
+ public void close() throws IOException {
+
}
}
}
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java
new file mode 100644
index 0000000..932cc2e
--- /dev/null
+++ b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.zipkin;
+
+
+import com.twitter.zipkin.gen.Span;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+import org.apache.htrace.core.TracerPool;
+import org.apache.htrace.impl.KafkaTransport;
+import org.apache.htrace.impl.ZipkinSpanReceiver;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.TestZKUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.zk.EmbeddedZookeeper;
+import scala.collection.JavaConversions;
+import scala.collection.mutable.Buffer;
+
+public class ITZipkinReceiver {
+
+ @Test
+ public void testKafkaTransport() throws Exception {
+
+ String topic = "zipkin";
+
+ // Kafka setup
+ EmbeddedZookeeper zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect());
+ ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
+ Properties props = TestUtils.createBrokerConfig(0, TestUtils.choosePort(), false);
+ KafkaConfig config = new KafkaConfig(props);
+ KafkaServer kafkaServer = TestUtils.createServer(config, new MockTime());
+
+ Buffer servers = JavaConversions.asScalaBuffer(Collections.singletonList(kafkaServer));
+ TestUtils.createTopic(zkClient, topic, 1, 1, servers, new Properties());
+ zkClient.close();
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 5000);
+
+ // HTrace
+ HTraceConfiguration hTraceConfiguration = HTraceConfiguration.fromKeyValuePairs(
+ "sampler.classes", "AlwaysSampler",
+ "span.receiver.classes", ZipkinSpanReceiver.class.getName(),
+ "zipkin.kafka.metadata.broker.list", config.advertisedHostName() + ":" + config.advertisedPort(),
+ "zipkin.kafka.topic", topic,
+ ZipkinSpanReceiver.TRANSPORT_CLASS_KEY, KafkaTransport.class.getName()
+ );
+
+ final Tracer tracer = new Tracer.Builder("test-tracer")
+ .tracerPool(new TracerPool("test-tracer-pool"))
+ .conf(hTraceConfiguration)
+ .build();
+
+ String scopeName = "test-kafka-transport-scope";
+ TraceScope traceScope = tracer.newScope(scopeName);
+ traceScope.close();
+ tracer.close();
+
+ // Kafka consumer
+ Properties consumerProps = new Properties();
+ consumerProps.put("zookeeper.connect", props.getProperty("zookeeper.connect"));
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testing.group");
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
+ ConsumerConnector connector =
+ kafka.consumer.Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(consumerProps));
+ Map topicCountMap = new HashMap<>();
+ topicCountMap.put(topic, 1);
+ Map>> streams = connector.createMessageStreams(topicCountMap);
+ ConsumerIterator it = streams.get(topic).get(0).iterator();
+
+ // Test
+ Assert.assertTrue("We should have one message in Kafka", it.hasNext());
+ Span span = new Span();
+ new TDeserializer(new TBinaryProtocol.Factory()).deserialize(span, it.next().message());
+ Assert.assertEquals("The span name should match our scope description", span.getName(), scopeName);
+
+
+ kafkaServer.shutdown();
+
+ }
+
+}
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java
new file mode 100644
index 0000000..f166d35
--- /dev/null
+++ b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.zipkin;
+
+import com.twitter.zipkin.gen.zipkinCoreConstants;
+
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.POJOSpanReceiver;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Creates HTrace and then convert it to Zipkin trace and checks whether it is a valid span or not.
+ */
+public class TestHTraceSpanToZipkinSpan {
+ private static final String ROOT_SPAN_DESC = "ROOT";
+
+ @Test
+ public void testHTraceToZipkin() throws IOException {
+ Span rootSpan = new MilliSpan.Builder().
+ description(ROOT_SPAN_DESC).
+ parents(new SpanId[] { } ).
+ spanId(new SpanId(100, 100)).
+ tracerId("test").
+ begin(System.currentTimeMillis()).
+ build();
+ Span innerOne = rootSpan.child("Some good work");
+ Span innerTwo = innerOne.child("Some more good work");
+ innerTwo.stop();
+ innerOne.stop();
+ rootSpan.addKVAnnotation("foo", "bar");
+ rootSpan.addTimelineAnnotation("timeline");
+ rootSpan.stop();
+
+ for (Span s : new Span[] {rootSpan, innerOne, innerTwo}) {
+ com.twitter.zipkin.gen.Span zs =
+ new HTraceToZipkinConverter(12345, (short) 12).convert(s);
+ assertSpansAreEquivalent(s, zs);
+ }
+ }
+
+ @Test
+ public void testHTraceAnnotationTimestamp() throws IOException, InterruptedException {
+
+ String tracerId = "testHTraceAnnotationTimestamp";
+ long startTime = System.currentTimeMillis() * 1000;
+ Span ms = new MilliSpan.Builder().
+ description(tracerId).parents(new SpanId[] { }).
+ spanId(new SpanId(2L, 2L)).
+ tracerId(tracerId).
+ begin(System.currentTimeMillis()).
+ build();
+
+ Thread.sleep(500);
+ long annoStartTime = System.currentTimeMillis() * 1000;
+ Thread.sleep(500);
+ ms.addTimelineAnnotation("anno");
+ Thread.sleep(500);
+ long annoEndTime = System.currentTimeMillis() * 1000;
+ Thread.sleep(500);
+ ms.stop();
+ long endTime = System.currentTimeMillis() * 1000;
+
+
+
+ com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
+
+ // Check to make sure that all times are in the proper order.
+ for (com.twitter.zipkin.gen.Annotation annotation : zs.getAnnotations()) {
+ // CS and SR should be before the annotation
+ // the annotation should be in between annotationStart and annotationEnd times
+ // SS and CR should be after annotationEnd and before endtime.
+ if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_SEND)
+ || annotation.getValue().equals(zipkinCoreConstants.SERVER_RECV)) {
+ assertTrue(startTime <= annotation.getTimestamp());
+ assertTrue(annotation.getTimestamp() <= annoStartTime);
+ } else if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_RECV)
+ || annotation.getValue().equals(zipkinCoreConstants.SERVER_SEND)) {
+ assertTrue(annoEndTime <= annotation.getTimestamp());
+ assertTrue(annotation.getTimestamp() <= endTime);
+ } else {
+ assertTrue(annoStartTime <= annotation.getTimestamp());
+ assertTrue(annotation.getTimestamp() <= annoEndTime);
+ assertTrue(annotation.getTimestamp() <= endTime);
+ }
+ }
+ }
+
+ @Test
+ public void testHTraceDefaultPort() throws IOException {
+ MilliSpan ms = new MilliSpan.Builder().description("test").
+ parents(new SpanId[] { new SpanId(2L, 2L) }).
+ spanId(new SpanId(2L, 3L)).
+ tracerId("hmaster").
+ begin(System.currentTimeMillis()).
+ build();
+ com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
+ for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) {
+ assertEquals((short)60000, annotation.getHost().getPort());
+ }
+
+ // make sure it's all lower cased
+ ms = new MilliSpan.Builder().description("test").
+ parents(new SpanId[] {new SpanId(2, 2)}).
+ spanId(new SpanId(2, 3)).
+ tracerId("HregIonServer").
+ begin(System.currentTimeMillis()).
+ build();
+ zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
+ for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) {
+ assertEquals((short)60020, annotation.getHost().getPort());
+ }
+ }
+
+ private void assertSpansAreEquivalent(Span s, com.twitter.zipkin.gen.Span zs) {
+ assertTrue("zipkin doesn't support multiple parents to a single span.",
+ s.getParents().length <= 1);
+ if (s.getParents().length == 1) {
+ assertEquals(s.getParents()[0].getLow(), zs.getParent_id());
+ }
+ assertEquals(s.getSpanId().getLow(), zs.getId());
+ Assert.assertNotNull(zs.getAnnotations());
+ if (ROOT_SPAN_DESC.equals(zs.getName())) {
+ assertEquals(5, zs.getAnnotations().size());// two start, two stop + one timeline annotation
+ assertEquals(1, zs.getBinary_annotations().size());
+ } else {
+ assertEquals(4, zs.getAnnotations().size());
+ }
+ }
+}
diff --git a/htrace-zipkin/src/test/resources/log4j.properties b/htrace-zipkin/src/test/resources/log4j.properties
new file mode 100644
index 0000000..564a77a
--- /dev/null
+++ b/htrace-zipkin/src/test/resources/log4j.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# By default, everything goes to console and file
+log4j.rootLogger=WARN, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+log4j.appender.A1.ImmediateFlush=true
+
+
+log4j.logger.kafka.utils=WARN, A1
+log4j.logger.kafka.consumer=WARN, A1
+log4j.logger.kafka.producer=WARN, A1
+
+log4j.logger.org.apache.htrace=INFO, A1
\ No newline at end of file