(1000);
this.protocolFactory = new TBinaryProtocol.Factory();
configure(conf);
}
+ private void logAndThrow(Throwable exception) {
+ LOG.error(ExceptionUtils.getStackTrace(exception));
+ throw new RuntimeException(exception);
+ }
+
+ private Transport createTransport(HTraceConfiguration conf) {
+ ClassLoader classLoader = TracerBuilder.class.getClassLoader();
+ String className = conf.get(TRANSPORT_CLASS_KEY, DEFAULT_TRANSPORT_CLASS);
+ Transport transport = null;
+ try {
+ Class> cls = classLoader.loadClass(className);
+ Constructor> ctor = cls.getConstructor();
+ transport = (Transport)ctor.newInstance();
+ } catch (ClassNotFoundException
+ | NoSuchMethodException
+ | InstantiationException
+ | InvocationTargetException
+ | IllegalAccessException e) {
+ logAndThrow(e);
+ }
+ return transport;
+ }
+
private void configure(HTraceConfiguration conf) {
this.conf = conf;
- this.collectorHostname = conf.get(HOSTNAME_KEY, DEFAULT_COLLECTOR_HOSTNAME);
- this.collectorPort = conf.getInt(PORT_KEY, DEFAULT_COLLECTOR_PORT);
// initialize the endpoint. This endpoint is used while writing the Span.
initConverter();
@@ -186,6 +199,8 @@ public class ZipkinSpanReceiver extends SpanReceiver {
InetAddress tracedServiceHostname = null;
// Try and get the hostname. If it's not configured try and get the local hostname.
try {
+ //TODO (clehene) extract conf to constant
+ //TODO (clehene) has this been deprecated?
String host = conf.get("zipkin.traced-service-hostname",
InetAddress.getLocalHost().getHostAddress());
@@ -195,17 +210,13 @@ public class ZipkinSpanReceiver extends SpanReceiver {
}
short tracedServicePort = (short) conf.getInt("zipkin.traced-service-port", -1);
byte[] address = tracedServiceHostname != null
- ? tracedServiceHostname.getAddress() : DEFAULT_COLLECTOR_HOSTNAME.getBytes();
+ ? tracedServiceHostname.getAddress() : "localhost".getBytes();
int ipv4 = ByteBuffer.wrap(address).getInt();
this.converter = new HTraceToZipkinConverter(ipv4, tracedServicePort);
}
private class WriteSpanRunnable implements Runnable {
- /**
- * scribe client to push zipkin spans
- */
- private Scribe.Iface scribe = null;
private final ByteArrayOutputStream baos;
private final TProtocol streamProtocol;
@@ -215,16 +226,15 @@ public class ZipkinSpanReceiver extends SpanReceiver {
}
/**
- * This runnable converts a HTrace span to a Zipkin span and sends it across the zipkin
- * collector as a thrift object. The scribe client which is used for rpc writes a list of
- * LogEntry objects, so the span objects are first transformed into LogEntry objects before
- * sending to the zipkin-collector.
+ *
+ * This runnable converts an HTrace span to a Zipkin span and sends it across the transport
+ * as a Thrift object.
*
* Here is a little ascii art which shows the above transformation:
*
- * +------------+ +------------+ +------------+ +-----------------+
- * | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector|
- * +------------+ +------------+ +------------+ (Scribe rpc) +-----------------+
+ * +------------+ +------------+ +-----------------+
+ * | HTrace Span|-->|Zipkin Span | ===========> | Zipkin Collector|
+ * +------------+ +------------+ (transport) +-----------------+
*
*/
@Override
@@ -236,6 +246,7 @@ public class ZipkinSpanReceiver extends SpanReceiver {
while (running.get() || queue.size() > 0) {
Span firstSpan = null;
+ //TODO (clenene) the following code (try / catch) is duplicated in / from FlumeSpanReceiver
try {
// Block for up to a second. to try and get a span.
// We only block for a little bit in order to notice if the running value has changed
@@ -256,13 +267,17 @@ public class ZipkinSpanReceiver extends SpanReceiver {
if (dequeuedSpans.isEmpty()) continue;
- // If this is the first time through or there was an error re-connect
- if (scribe == null) {
- startClient();
+ if (!transport.isOpen()) {
+ try {
+ transport.open(conf);
+ } catch (Throwable e) {
+ logAndThrow(e);
+ }
}
+
// Create a new list every time through so that the list doesn't change underneath
// thrift as it's sending.
- List entries = new ArrayList(dequeuedSpans.size());
+ List entries = new ArrayList(dequeuedSpans.size());
try {
// Convert every de-queued span
for (Span htraceSpan : dequeuedSpans) {
@@ -273,21 +288,18 @@ public class ZipkinSpanReceiver extends SpanReceiver {
// Write the span to a BAOS
zipkinSpan.write(streamProtocol);
- // Do Base64 encoding and put the string into a log entry.
- LogEntry logEntry =
- new LogEntry(CATEGORY, Base64.encodeBase64String(baos.toByteArray()));
- entries.add(logEntry);
+ entries.add(baos.toByteArray());
}
// Send the entries
- scribe.Log(entries);
+ transport.send(entries);
+
// clear the list for the next time through.
dequeuedSpans.clear();
// reset the error counter.
errorCount = 0;
} catch (Exception e) {
- LOG.error("Error when writing to the zipkin collector: " +
- collectorHostname + ":" + collectorPort, e);
+ LOG.error("Error when writing to the zipkin transport: " + transport, e);
errorCount += 1;
// If there have been ten errors in a row start dropping things.
@@ -298,7 +310,6 @@ public class ZipkinSpanReceiver extends SpanReceiver {
LOG.error("Drop " + dequeuedSpans.size() + " span(s) because queue is full");
}
}
-
closeClient();
try {
// Since there was an error sleep just a little bit to try and allow the
@@ -307,42 +318,22 @@ public class ZipkinSpanReceiver extends SpanReceiver {
} catch (InterruptedException e1) {
// Ignored
}
- }
- }
+ } // catch
+ } // while
closeClient();
}
/**
* Close out the connection.
*/
- private void closeClient() {
- // close out the transport.
- if (scribe != null && scribe instanceof Scribe.Client) {
- ((Scribe.Client) scribe).getInputProtocol().getTransport().close();
- scribe = null;
- }
- }
-
- /**
- * Re-connect to Zipkin.
- */
- private void startClient() {
- if (this.scribe == null) {
- this.scribe = newScribe();
+ private void closeClient(){
+ try {
+ transport.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close transport", e);
}
}
- }
- // Override for testing
- Scribe.Iface newScribe() {
- TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort));
- try {
- transport.open();
- } catch (TTransportException e) {
- e.printStackTrace();
- }
- TProtocol protocol = protocolFactory.getProtocol(transport);
- return new Scribe.Client(protocol);
}
/**
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java b/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
deleted file mode 100644
index f166d35..0000000
--- a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.htrace.zipkin;
-
-import com.twitter.zipkin.gen.zipkinCoreConstants;
-
-import org.apache.htrace.core.HTraceConfiguration;
-import org.apache.htrace.core.MilliSpan;
-import org.apache.htrace.core.POJOSpanReceiver;
-import org.apache.htrace.core.Span;
-import org.apache.htrace.core.SpanId;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Creates HTrace and then convert it to Zipkin trace and checks whether it is a valid span or not.
- */
-public class TestHTraceSpanToZipkinSpan {
- private static final String ROOT_SPAN_DESC = "ROOT";
-
- @Test
- public void testHTraceToZipkin() throws IOException {
- Span rootSpan = new MilliSpan.Builder().
- description(ROOT_SPAN_DESC).
- parents(new SpanId[] { } ).
- spanId(new SpanId(100, 100)).
- tracerId("test").
- begin(System.currentTimeMillis()).
- build();
- Span innerOne = rootSpan.child("Some good work");
- Span innerTwo = innerOne.child("Some more good work");
- innerTwo.stop();
- innerOne.stop();
- rootSpan.addKVAnnotation("foo", "bar");
- rootSpan.addTimelineAnnotation("timeline");
- rootSpan.stop();
-
- for (Span s : new Span[] {rootSpan, innerOne, innerTwo}) {
- com.twitter.zipkin.gen.Span zs =
- new HTraceToZipkinConverter(12345, (short) 12).convert(s);
- assertSpansAreEquivalent(s, zs);
- }
- }
-
- @Test
- public void testHTraceAnnotationTimestamp() throws IOException, InterruptedException {
-
- String tracerId = "testHTraceAnnotationTimestamp";
- long startTime = System.currentTimeMillis() * 1000;
- Span ms = new MilliSpan.Builder().
- description(tracerId).parents(new SpanId[] { }).
- spanId(new SpanId(2L, 2L)).
- tracerId(tracerId).
- begin(System.currentTimeMillis()).
- build();
-
- Thread.sleep(500);
- long annoStartTime = System.currentTimeMillis() * 1000;
- Thread.sleep(500);
- ms.addTimelineAnnotation("anno");
- Thread.sleep(500);
- long annoEndTime = System.currentTimeMillis() * 1000;
- Thread.sleep(500);
- ms.stop();
- long endTime = System.currentTimeMillis() * 1000;
-
-
-
- com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
-
- // Check to make sure that all times are in the proper order.
- for (com.twitter.zipkin.gen.Annotation annotation : zs.getAnnotations()) {
- // CS and SR should be before the annotation
- // the annotation should be in between annotationStart and annotationEnd times
- // SS and CR should be after annotationEnd and before endtime.
- if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_SEND)
- || annotation.getValue().equals(zipkinCoreConstants.SERVER_RECV)) {
- assertTrue(startTime <= annotation.getTimestamp());
- assertTrue(annotation.getTimestamp() <= annoStartTime);
- } else if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_RECV)
- || annotation.getValue().equals(zipkinCoreConstants.SERVER_SEND)) {
- assertTrue(annoEndTime <= annotation.getTimestamp());
- assertTrue(annotation.getTimestamp() <= endTime);
- } else {
- assertTrue(annoStartTime <= annotation.getTimestamp());
- assertTrue(annotation.getTimestamp() <= annoEndTime);
- assertTrue(annotation.getTimestamp() <= endTime);
- }
- }
- }
-
- @Test
- public void testHTraceDefaultPort() throws IOException {
- MilliSpan ms = new MilliSpan.Builder().description("test").
- parents(new SpanId[] { new SpanId(2L, 2L) }).
- spanId(new SpanId(2L, 3L)).
- tracerId("hmaster").
- begin(System.currentTimeMillis()).
- build();
- com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
- for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) {
- assertEquals((short)60000, annotation.getHost().getPort());
- }
-
- // make sure it's all lower cased
- ms = new MilliSpan.Builder().description("test").
- parents(new SpanId[] {new SpanId(2, 2)}).
- spanId(new SpanId(2, 3)).
- tracerId("HregIonServer").
- begin(System.currentTimeMillis()).
- build();
- zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
- for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) {
- assertEquals((short)60020, annotation.getHost().getPort());
- }
- }
-
- private void assertSpansAreEquivalent(Span s, com.twitter.zipkin.gen.Span zs) {
- assertTrue("zipkin doesn't support multiple parents to a single span.",
- s.getParents().length <= 1);
- if (s.getParents().length == 1) {
- assertEquals(s.getParents()[0].getLow(), zs.getParent_id());
- }
- assertEquals(s.getSpanId().getLow(), zs.getId());
- Assert.assertNotNull(zs.getAnnotations());
- if (ROOT_SPAN_DESC.equals(zs.getName())) {
- assertEquals(5, zs.getAnnotations().size());// two start, two stop + one timeline annotation
- assertEquals(1, zs.getBinary_annotations().size());
- } else {
- assertEquals(4, zs.getAnnotations().size());
- }
- }
-}
diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java
new file mode 100644
index 0000000..f166d35
--- /dev/null
+++ b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.zipkin;
+
+import com.twitter.zipkin.gen.zipkinCoreConstants;
+
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.POJOSpanReceiver;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Creates HTrace and then convert it to Zipkin trace and checks whether it is a valid span or not.
+ */
+public class TestHTraceSpanToZipkinSpan {
+ private static final String ROOT_SPAN_DESC = "ROOT";
+
+ @Test
+ public void testHTraceToZipkin() throws IOException {
+ Span rootSpan = new MilliSpan.Builder().
+ description(ROOT_SPAN_DESC).
+ parents(new SpanId[] { } ).
+ spanId(new SpanId(100, 100)).
+ tracerId("test").
+ begin(System.currentTimeMillis()).
+ build();
+ Span innerOne = rootSpan.child("Some good work");
+ Span innerTwo = innerOne.child("Some more good work");
+ innerTwo.stop();
+ innerOne.stop();
+ rootSpan.addKVAnnotation("foo", "bar");
+ rootSpan.addTimelineAnnotation("timeline");
+ rootSpan.stop();
+
+ for (Span s : new Span[] {rootSpan, innerOne, innerTwo}) {
+ com.twitter.zipkin.gen.Span zs =
+ new HTraceToZipkinConverter(12345, (short) 12).convert(s);
+ assertSpansAreEquivalent(s, zs);
+ }
+ }
+
+ @Test
+ public void testHTraceAnnotationTimestamp() throws IOException, InterruptedException {
+
+ String tracerId = "testHTraceAnnotationTimestamp";
+ long startTime = System.currentTimeMillis() * 1000;
+ Span ms = new MilliSpan.Builder().
+ description(tracerId).parents(new SpanId[] { }).
+ spanId(new SpanId(2L, 2L)).
+ tracerId(tracerId).
+ begin(System.currentTimeMillis()).
+ build();
+
+ Thread.sleep(500);
+ long annoStartTime = System.currentTimeMillis() * 1000;
+ Thread.sleep(500);
+ ms.addTimelineAnnotation("anno");
+ Thread.sleep(500);
+ long annoEndTime = System.currentTimeMillis() * 1000;
+ Thread.sleep(500);
+ ms.stop();
+ long endTime = System.currentTimeMillis() * 1000;
+
+
+
+ com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
+
+ // Check to make sure that all times are in the proper order.
+ for (com.twitter.zipkin.gen.Annotation annotation : zs.getAnnotations()) {
+ // CS and SR should be before the annotation
+ // the annotation should be in between annotationStart and annotationEnd times
+ // SS and CR should be after annotationEnd and before endtime.
+ if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_SEND)
+ || annotation.getValue().equals(zipkinCoreConstants.SERVER_RECV)) {
+ assertTrue(startTime <= annotation.getTimestamp());
+ assertTrue(annotation.getTimestamp() <= annoStartTime);
+ } else if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_RECV)
+ || annotation.getValue().equals(zipkinCoreConstants.SERVER_SEND)) {
+ assertTrue(annoEndTime <= annotation.getTimestamp());
+ assertTrue(annotation.getTimestamp() <= endTime);
+ } else {
+ assertTrue(annoStartTime <= annotation.getTimestamp());
+ assertTrue(annotation.getTimestamp() <= annoEndTime);
+ assertTrue(annotation.getTimestamp() <= endTime);
+ }
+ }
+ }
+
+ @Test
+ public void testHTraceDefaultPort() throws IOException {
+ MilliSpan ms = new MilliSpan.Builder().description("test").
+ parents(new SpanId[] { new SpanId(2L, 2L) }).
+ spanId(new SpanId(2L, 3L)).
+ tracerId("hmaster").
+ begin(System.currentTimeMillis()).
+ build();
+ com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
+ for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) {
+ assertEquals((short)60000, annotation.getHost().getPort());
+ }
+
+ // make sure it's all lower cased
+ ms = new MilliSpan.Builder().description("test").
+ parents(new SpanId[] {new SpanId(2, 2)}).
+ spanId(new SpanId(2, 3)).
+ tracerId("HregIonServer").
+ begin(System.currentTimeMillis()).
+ build();
+ zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms);
+ for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) {
+ assertEquals((short)60020, annotation.getHost().getPort());
+ }
+ }
+
+ private void assertSpansAreEquivalent(Span s, com.twitter.zipkin.gen.Span zs) {
+ assertTrue("zipkin doesn't support multiple parents to a single span.",
+ s.getParents().length <= 1);
+ if (s.getParents().length == 1) {
+ assertEquals(s.getParents()[0].getLow(), zs.getParent_id());
+ }
+ assertEquals(s.getSpanId().getLow(), zs.getId());
+ Assert.assertNotNull(zs.getAnnotations());
+ if (ROOT_SPAN_DESC.equals(zs.getName())) {
+ assertEquals(5, zs.getAnnotations().size());// two start, two stop + one timeline annotation
+ assertEquals(1, zs.getBinary_annotations().size());
+ } else {
+ assertEquals(4, zs.getAnnotations().size());
+ }
+ }
+}