diff --git a/htrace-flume/pom.xml b/htrace-flume/pom.xml
new file mode 100644
index 0000000..3f8d70c
--- /dev/null
+++ b/htrace-flume/pom.xml
@@ -0,0 +1,117 @@
+
+
+
+ 4.0.0
+
+ htrace-flume
+ jar
+
+
+ htrace
+ org.apache.htrace
+ 3.1.0-SNAPSHOT
+
+
+ htrace-flume
+ https://github.com/cloudera/htrace
+
+
+ UTF-8
+ 1.6.0-SNAPSHOT
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+
+ maven-javadoc-plugin
+
+
+ maven-compiler-plugin
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+
+ maven-deploy-plugin
+
+
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+
+
+
+
+
+
+
+
+ org.apache.htrace
+ htrace-core
+ ${project.version}
+ provided
+
+
+ org.apache.htrace
+ htrace-core
+ ${project.version}
+ tests
+ test
+
+
+
+ commons-logging
+ commons-logging
+ provided
+
+
+ com.google.guava
+ guava
+ provided
+
+
+ junit
+ junit
+ test
+
+
+
+ org.apache.flume
+ flume-ng-sdk
+ ${flume.version}
+
+
+ org.apache.flume
+ flume-ng-sdk
+ ${flume.version}
+ tests
+ test
+
+
+ org.apache.avro
+ avro
+ 1.7.4
+
+
+
diff --git a/htrace-flume/src/main/avro/span.avdl b/htrace-flume/src/main/avro/span.avdl
new file mode 100644
index 0000000..0eb143f
--- /dev/null
+++ b/htrace-flume/src/main/avro/span.avdl
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@namespace ("org.apache.htrace.avro")
+protocol htrace_avro {
+ record TimelineAnnotation {
+ long time;
+ union {null, string} message;
+ }
+
+ record KVAnnotation {
+ union {null, bytes} key;
+ union {null, bytes} value;
+ }
+
+ record Span {
+ long trace_id;
+ long parent_id;
+ long span_id;
+ long start;
+ long stop;
+ union {null, string} process_id;
+ union {null, string} description;
+ union {null, array} timeline_annotations;
+ union {null, array} kv_annotations;
+ }
+}
diff --git a/htrace-flume/src/main/java/org/apache/htrace/avro/KVAnnotation.java b/htrace-flume/src/main/java/org/apache/htrace/avro/KVAnnotation.java
new file mode 100644
index 0000000..64e7581
--- /dev/null
+++ b/htrace-flume/src/main/java/org/apache/htrace/avro/KVAnnotation.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.htrace.avro;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class KVAnnotation extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"KVAnnotation\",\"namespace\":\"org.apache.htrace.avro\",\"fields\":[{\"name\":\"key\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"value\",\"type\":[\"null\",\"bytes\"]}]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+ @Deprecated public java.nio.ByteBuffer key;
+ @Deprecated public java.nio.ByteBuffer value;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use newBuilder().
+ */
+ public KVAnnotation() {}
+
+ /**
+ * All-args constructor.
+ */
+ public KVAnnotation(java.nio.ByteBuffer key, java.nio.ByteBuffer value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return key;
+ case 1: return value;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: key = (java.nio.ByteBuffer)value$; break;
+ case 1: value = (java.nio.ByteBuffer)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'key' field.
+ */
+ public java.nio.ByteBuffer getKey() {
+ return key;
+ }
+
+ /**
+ * Sets the value of the 'key' field.
+ * @param value the value to set.
+ */
+ public void setKey(java.nio.ByteBuffer value) {
+ this.key = value;
+ }
+
+ /**
+ * Gets the value of the 'value' field.
+ */
+ public java.nio.ByteBuffer getValue() {
+ return value;
+ }
+
+ /**
+ * Sets the value of the 'value' field.
+ * @param value the value to set.
+ */
+ public void setValue(java.nio.ByteBuffer value) {
+ this.value = value;
+ }
+
+ /** Creates a new KVAnnotation RecordBuilder */
+ public static org.apache.htrace.avro.KVAnnotation.Builder newBuilder() {
+ return new org.apache.htrace.avro.KVAnnotation.Builder();
+ }
+
+ /** Creates a new KVAnnotation RecordBuilder by copying an existing Builder */
+ public static org.apache.htrace.avro.KVAnnotation.Builder newBuilder(org.apache.htrace.avro.KVAnnotation.Builder other) {
+ return new org.apache.htrace.avro.KVAnnotation.Builder(other);
+ }
+
+ /** Creates a new KVAnnotation RecordBuilder by copying an existing KVAnnotation instance */
+ public static org.apache.htrace.avro.KVAnnotation.Builder newBuilder(org.apache.htrace.avro.KVAnnotation other) {
+ return new org.apache.htrace.avro.KVAnnotation.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for KVAnnotation instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase
+ implements org.apache.avro.data.RecordBuilder {
+
+ private java.nio.ByteBuffer key;
+ private java.nio.ByteBuffer value;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(org.apache.htrace.avro.KVAnnotation.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(org.apache.htrace.avro.KVAnnotation.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.key)) {
+ this.key = data().deepCopy(fields()[0].schema(), other.key);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.value)) {
+ this.value = data().deepCopy(fields()[1].schema(), other.value);
+ fieldSetFlags()[1] = true;
+ }
+ }
+
+ /** Creates a Builder by copying an existing KVAnnotation instance */
+ private Builder(org.apache.htrace.avro.KVAnnotation other) {
+ super(org.apache.htrace.avro.KVAnnotation.SCHEMA$);
+ if (isValidValue(fields()[0], other.key)) {
+ this.key = data().deepCopy(fields()[0].schema(), other.key);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.value)) {
+ this.value = data().deepCopy(fields()[1].schema(), other.value);
+ fieldSetFlags()[1] = true;
+ }
+ }
+
+ /** Gets the value of the 'key' field */
+ public java.nio.ByteBuffer getKey() {
+ return key;
+ }
+
+ /** Sets the value of the 'key' field */
+ public org.apache.htrace.avro.KVAnnotation.Builder setKey(java.nio.ByteBuffer value) {
+ validate(fields()[0], value);
+ this.key = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'key' field has been set */
+ public boolean hasKey() {
+ return fieldSetFlags()[0];
+ }
+
+ /** Clears the value of the 'key' field */
+ public org.apache.htrace.avro.KVAnnotation.Builder clearKey() {
+ key = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'value' field */
+ public java.nio.ByteBuffer getValue() {
+ return value;
+ }
+
+ /** Sets the value of the 'value' field */
+ public org.apache.htrace.avro.KVAnnotation.Builder setValue(java.nio.ByteBuffer value) {
+ validate(fields()[1], value);
+ this.value = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /** Checks whether the 'value' field has been set */
+ public boolean hasValue() {
+ return fieldSetFlags()[1];
+ }
+
+ /** Clears the value of the 'value' field */
+ public org.apache.htrace.avro.KVAnnotation.Builder clearValue() {
+ value = null;
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ @Override
+ public KVAnnotation build() {
+ try {
+ KVAnnotation record = new KVAnnotation();
+ record.key = fieldSetFlags()[0] ? this.key : (java.nio.ByteBuffer) defaultValue(fields()[0]);
+ record.value = fieldSetFlags()[1] ? this.value : (java.nio.ByteBuffer) defaultValue(fields()[1]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/htrace-flume/src/main/java/org/apache/htrace/avro/Span.java b/htrace-flume/src/main/java/org/apache/htrace/avro/Span.java
new file mode 100644
index 0000000..0688b28
--- /dev/null
+++ b/htrace-flume/src/main/java/org/apache/htrace/avro/Span.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.htrace.avro;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class Span extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Span\",\"namespace\":\"org.apache.htrace.avro\",\"fields\":[{\"name\":\"trace_id\",\"type\":\"long\"},{\"name\":\"parent_id\",\"type\":\"long\"},{\"name\":\"span_id\",\"type\":\"long\"},{\"name\":\"start\",\"type\":\"long\"},{\"name\":\"stop\",\"type\":\"long\"},{\"name\":\"process_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"description\",\"type\":[\"null\",\"string\"]},{\"name\":\"timeline_annotations\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"TimelineAnnotation\",\"fields\":[{\"name\":\"time\",\"type\":\"long\"},{\"name\":\"message\",\"type\":[\"null\",\"string\"]}]}}]},{\"name\":\"kv_annotations\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"KVAnnotation\",\"fields\":[{\"name\":\"key\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"value\",\"type\":[\"null\",\"bytes\"]}]}}]}]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+ @Deprecated public long trace_id;
+ @Deprecated public long parent_id;
+ @Deprecated public long span_id;
+ @Deprecated public long start;
+ @Deprecated public long stop;
+ @Deprecated public java.lang.CharSequence process_id;
+ @Deprecated public java.lang.CharSequence description;
+ @Deprecated public java.util.List timeline_annotations;
+ @Deprecated public java.util.List kv_annotations;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use newBuilder().
+ */
+ public Span() {}
+
+ /**
+ * All-args constructor.
+ */
+ public Span(java.lang.Long trace_id, java.lang.Long parent_id, java.lang.Long span_id, java.lang.Long start, java.lang.Long stop, java.lang.CharSequence process_id, java.lang.CharSequence description, java.util.List timeline_annotations, java.util.List kv_annotations) {
+ this.trace_id = trace_id;
+ this.parent_id = parent_id;
+ this.span_id = span_id;
+ this.start = start;
+ this.stop = stop;
+ this.process_id = process_id;
+ this.description = description;
+ this.timeline_annotations = timeline_annotations;
+ this.kv_annotations = kv_annotations;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return trace_id;
+ case 1: return parent_id;
+ case 2: return span_id;
+ case 3: return start;
+ case 4: return stop;
+ case 5: return process_id;
+ case 6: return description;
+ case 7: return timeline_annotations;
+ case 8: return kv_annotations;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: trace_id = (java.lang.Long)value$; break;
+ case 1: parent_id = (java.lang.Long)value$; break;
+ case 2: span_id = (java.lang.Long)value$; break;
+ case 3: start = (java.lang.Long)value$; break;
+ case 4: stop = (java.lang.Long)value$; break;
+ case 5: process_id = (java.lang.CharSequence)value$; break;
+ case 6: description = (java.lang.CharSequence)value$; break;
+ case 7: timeline_annotations = (java.util.List)value$; break;
+ case 8: kv_annotations = (java.util.List)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'trace_id' field.
+ */
+ public java.lang.Long getTraceId() {
+ return trace_id;
+ }
+
+ /**
+ * Sets the value of the 'trace_id' field.
+ * @param value the value to set.
+ */
+ public void setTraceId(java.lang.Long value) {
+ this.trace_id = value;
+ }
+
+ /**
+ * Gets the value of the 'parent_id' field.
+ */
+ public java.lang.Long getParentId() {
+ return parent_id;
+ }
+
+ /**
+ * Sets the value of the 'parent_id' field.
+ * @param value the value to set.
+ */
+ public void setParentId(java.lang.Long value) {
+ this.parent_id = value;
+ }
+
+ /**
+ * Gets the value of the 'span_id' field.
+ */
+ public java.lang.Long getSpanId() {
+ return span_id;
+ }
+
+ /**
+ * Sets the value of the 'span_id' field.
+ * @param value the value to set.
+ */
+ public void setSpanId(java.lang.Long value) {
+ this.span_id = value;
+ }
+
+ /**
+ * Gets the value of the 'start' field.
+ */
+ public java.lang.Long getStart() {
+ return start;
+ }
+
+ /**
+ * Sets the value of the 'start' field.
+ * @param value the value to set.
+ */
+ public void setStart(java.lang.Long value) {
+ this.start = value;
+ }
+
+ /**
+ * Gets the value of the 'stop' field.
+ */
+ public java.lang.Long getStop() {
+ return stop;
+ }
+
+ /**
+ * Sets the value of the 'stop' field.
+ * @param value the value to set.
+ */
+ public void setStop(java.lang.Long value) {
+ this.stop = value;
+ }
+
+ /**
+ * Gets the value of the 'process_id' field.
+ */
+ public java.lang.CharSequence getProcessId() {
+ return process_id;
+ }
+
+ /**
+ * Sets the value of the 'process_id' field.
+ * @param value the value to set.
+ */
+ public void setProcessId(java.lang.CharSequence value) {
+ this.process_id = value;
+ }
+
+ /**
+ * Gets the value of the 'description' field.
+ */
+ public java.lang.CharSequence getDescription() {
+ return description;
+ }
+
+ /**
+ * Sets the value of the 'description' field.
+ * @param value the value to set.
+ */
+ public void setDescription(java.lang.CharSequence value) {
+ this.description = value;
+ }
+
+ /**
+ * Gets the value of the 'timeline_annotations' field.
+ */
+ public java.util.List getTimelineAnnotations() {
+ return timeline_annotations;
+ }
+
+ /**
+ * Sets the value of the 'timeline_annotations' field.
+ * @param value the value to set.
+ */
+ public void setTimelineAnnotations(java.util.List value) {
+ this.timeline_annotations = value;
+ }
+
+ /**
+ * Gets the value of the 'kv_annotations' field.
+ */
+ public java.util.List getKvAnnotations() {
+ return kv_annotations;
+ }
+
+ /**
+ * Sets the value of the 'kv_annotations' field.
+ * @param value the value to set.
+ */
+ public void setKvAnnotations(java.util.List value) {
+ this.kv_annotations = value;
+ }
+
+ /** Creates a new Span RecordBuilder */
+ public static org.apache.htrace.avro.Span.Builder newBuilder() {
+ return new org.apache.htrace.avro.Span.Builder();
+ }
+
+ /** Creates a new Span RecordBuilder by copying an existing Builder */
+ public static org.apache.htrace.avro.Span.Builder newBuilder(org.apache.htrace.avro.Span.Builder other) {
+ return new org.apache.htrace.avro.Span.Builder(other);
+ }
+
+ /** Creates a new Span RecordBuilder by copying an existing Span instance */
+ public static org.apache.htrace.avro.Span.Builder newBuilder(org.apache.htrace.avro.Span other) {
+ return new org.apache.htrace.avro.Span.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for Span instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase
+ implements org.apache.avro.data.RecordBuilder {
+
+ private long trace_id;
+ private long parent_id;
+ private long span_id;
+ private long start;
+ private long stop;
+ private java.lang.CharSequence process_id;
+ private java.lang.CharSequence description;
+ private java.util.List timeline_annotations;
+ private java.util.List kv_annotations;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(org.apache.htrace.avro.Span.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(org.apache.htrace.avro.Span.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.trace_id)) {
+ this.trace_id = data().deepCopy(fields()[0].schema(), other.trace_id);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.parent_id)) {
+ this.parent_id = data().deepCopy(fields()[1].schema(), other.parent_id);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.span_id)) {
+ this.span_id = data().deepCopy(fields()[2].schema(), other.span_id);
+ fieldSetFlags()[2] = true;
+ }
+ if (isValidValue(fields()[3], other.start)) {
+ this.start = data().deepCopy(fields()[3].schema(), other.start);
+ fieldSetFlags()[3] = true;
+ }
+ if (isValidValue(fields()[4], other.stop)) {
+ this.stop = data().deepCopy(fields()[4].schema(), other.stop);
+ fieldSetFlags()[4] = true;
+ }
+ if (isValidValue(fields()[5], other.process_id)) {
+ this.process_id = data().deepCopy(fields()[5].schema(), other.process_id);
+ fieldSetFlags()[5] = true;
+ }
+ if (isValidValue(fields()[6], other.description)) {
+ this.description = data().deepCopy(fields()[6].schema(), other.description);
+ fieldSetFlags()[6] = true;
+ }
+ if (isValidValue(fields()[7], other.timeline_annotations)) {
+ this.timeline_annotations = data().deepCopy(fields()[7].schema(), other.timeline_annotations);
+ fieldSetFlags()[7] = true;
+ }
+ if (isValidValue(fields()[8], other.kv_annotations)) {
+ this.kv_annotations = data().deepCopy(fields()[8].schema(), other.kv_annotations);
+ fieldSetFlags()[8] = true;
+ }
+ }
+
+ /** Creates a Builder by copying an existing Span instance */
+ private Builder(org.apache.htrace.avro.Span other) {
+ super(org.apache.htrace.avro.Span.SCHEMA$);
+ if (isValidValue(fields()[0], other.trace_id)) {
+ this.trace_id = data().deepCopy(fields()[0].schema(), other.trace_id);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.parent_id)) {
+ this.parent_id = data().deepCopy(fields()[1].schema(), other.parent_id);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.span_id)) {
+ this.span_id = data().deepCopy(fields()[2].schema(), other.span_id);
+ fieldSetFlags()[2] = true;
+ }
+ if (isValidValue(fields()[3], other.start)) {
+ this.start = data().deepCopy(fields()[3].schema(), other.start);
+ fieldSetFlags()[3] = true;
+ }
+ if (isValidValue(fields()[4], other.stop)) {
+ this.stop = data().deepCopy(fields()[4].schema(), other.stop);
+ fieldSetFlags()[4] = true;
+ }
+ if (isValidValue(fields()[5], other.process_id)) {
+ this.process_id = data().deepCopy(fields()[5].schema(), other.process_id);
+ fieldSetFlags()[5] = true;
+ }
+ if (isValidValue(fields()[6], other.description)) {
+ this.description = data().deepCopy(fields()[6].schema(), other.description);
+ fieldSetFlags()[6] = true;
+ }
+ if (isValidValue(fields()[7], other.timeline_annotations)) {
+ this.timeline_annotations = data().deepCopy(fields()[7].schema(), other.timeline_annotations);
+ fieldSetFlags()[7] = true;
+ }
+ if (isValidValue(fields()[8], other.kv_annotations)) {
+ this.kv_annotations = data().deepCopy(fields()[8].schema(), other.kv_annotations);
+ fieldSetFlags()[8] = true;
+ }
+ }
+
+ /** Gets the value of the 'trace_id' field */
+ public java.lang.Long getTraceId() {
+ return trace_id;
+ }
+
+ /** Sets the value of the 'trace_id' field */
+ public org.apache.htrace.avro.Span.Builder setTraceId(long value) {
+ validate(fields()[0], value);
+ this.trace_id = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'trace_id' field has been set */
+ public boolean hasTraceId() {
+ return fieldSetFlags()[0];
+ }
+
+ /** Clears the value of the 'trace_id' field */
+ public org.apache.htrace.avro.Span.Builder clearTraceId() {
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'parent_id' field */
+ public java.lang.Long getParentId() {
+ return parent_id;
+ }
+
+ /** Sets the value of the 'parent_id' field */
+ public org.apache.htrace.avro.Span.Builder setParentId(long value) {
+ validate(fields()[1], value);
+ this.parent_id = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /** Checks whether the 'parent_id' field has been set */
+ public boolean hasParentId() {
+ return fieldSetFlags()[1];
+ }
+
+ /** Clears the value of the 'parent_id' field */
+ public org.apache.htrace.avro.Span.Builder clearParentId() {
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'span_id' field */
+ public java.lang.Long getSpanId() {
+ return span_id;
+ }
+
+ /** Sets the value of the 'span_id' field */
+ public org.apache.htrace.avro.Span.Builder setSpanId(long value) {
+ validate(fields()[2], value);
+ this.span_id = value;
+ fieldSetFlags()[2] = true;
+ return this;
+ }
+
+ /** Checks whether the 'span_id' field has been set */
+ public boolean hasSpanId() {
+ return fieldSetFlags()[2];
+ }
+
+ /** Clears the value of the 'span_id' field */
+ public org.apache.htrace.avro.Span.Builder clearSpanId() {
+ fieldSetFlags()[2] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'start' field */
+ public java.lang.Long getStart() {
+ return start;
+ }
+
+ /** Sets the value of the 'start' field */
+ public org.apache.htrace.avro.Span.Builder setStart(long value) {
+ validate(fields()[3], value);
+ this.start = value;
+ fieldSetFlags()[3] = true;
+ return this;
+ }
+
+ /** Checks whether the 'start' field has been set */
+ public boolean hasStart() {
+ return fieldSetFlags()[3];
+ }
+
+ /** Clears the value of the 'start' field */
+ public org.apache.htrace.avro.Span.Builder clearStart() {
+ fieldSetFlags()[3] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'stop' field */
+ public java.lang.Long getStop() {
+ return stop;
+ }
+
+ /** Sets the value of the 'stop' field */
+ public org.apache.htrace.avro.Span.Builder setStop(long value) {
+ validate(fields()[4], value);
+ this.stop = value;
+ fieldSetFlags()[4] = true;
+ return this;
+ }
+
+ /** Checks whether the 'stop' field has been set */
+ public boolean hasStop() {
+ return fieldSetFlags()[4];
+ }
+
+ /** Clears the value of the 'stop' field */
+ public org.apache.htrace.avro.Span.Builder clearStop() {
+ fieldSetFlags()[4] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'process_id' field */
+ public java.lang.CharSequence getProcessId() {
+ return process_id;
+ }
+
+ /** Sets the value of the 'process_id' field */
+ public org.apache.htrace.avro.Span.Builder setProcessId(java.lang.CharSequence value) {
+ validate(fields()[5], value);
+ this.process_id = value;
+ fieldSetFlags()[5] = true;
+ return this;
+ }
+
+ /** Checks whether the 'process_id' field has been set */
+ public boolean hasProcessId() {
+ return fieldSetFlags()[5];
+ }
+
+ /** Clears the value of the 'process_id' field */
+ public org.apache.htrace.avro.Span.Builder clearProcessId() {
+ process_id = null;
+ fieldSetFlags()[5] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'description' field */
+ public java.lang.CharSequence getDescription() {
+ return description;
+ }
+
+ /** Sets the value of the 'description' field */
+ public org.apache.htrace.avro.Span.Builder setDescription(java.lang.CharSequence value) {
+ validate(fields()[6], value);
+ this.description = value;
+ fieldSetFlags()[6] = true;
+ return this;
+ }
+
+ /** Checks whether the 'description' field has been set */
+ public boolean hasDescription() {
+ return fieldSetFlags()[6];
+ }
+
+ /** Clears the value of the 'description' field */
+ public org.apache.htrace.avro.Span.Builder clearDescription() {
+ description = null;
+ fieldSetFlags()[6] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'timeline_annotations' field */
+ public java.util.List getTimelineAnnotations() {
+ return timeline_annotations;
+ }
+
+ /** Sets the value of the 'timeline_annotations' field */
+ public org.apache.htrace.avro.Span.Builder setTimelineAnnotations(java.util.List value) {
+ validate(fields()[7], value);
+ this.timeline_annotations = value;
+ fieldSetFlags()[7] = true;
+ return this;
+ }
+
+ /** Checks whether the 'timeline_annotations' field has been set */
+ public boolean hasTimelineAnnotations() {
+ return fieldSetFlags()[7];
+ }
+
+ /** Clears the value of the 'timeline_annotations' field */
+ public org.apache.htrace.avro.Span.Builder clearTimelineAnnotations() {
+ timeline_annotations = null;
+ fieldSetFlags()[7] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'kv_annotations' field */
+ public java.util.List getKvAnnotations() {
+ return kv_annotations;
+ }
+
+ /** Sets the value of the 'kv_annotations' field */
+ public org.apache.htrace.avro.Span.Builder setKvAnnotations(java.util.List value) {
+ validate(fields()[8], value);
+ this.kv_annotations = value;
+ fieldSetFlags()[8] = true;
+ return this;
+ }
+
+ /** Checks whether the 'kv_annotations' field has been set */
+ public boolean hasKvAnnotations() {
+ return fieldSetFlags()[8];
+ }
+
+ /** Clears the value of the 'kv_annotations' field */
+ public org.apache.htrace.avro.Span.Builder clearKvAnnotations() {
+ kv_annotations = null;
+ fieldSetFlags()[8] = false;
+ return this;
+ }
+
+ @Override
+ public Span build() {
+ try {
+ Span record = new Span();
+ record.trace_id = fieldSetFlags()[0] ? this.trace_id : (java.lang.Long) defaultValue(fields()[0]);
+ record.parent_id = fieldSetFlags()[1] ? this.parent_id : (java.lang.Long) defaultValue(fields()[1]);
+ record.span_id = fieldSetFlags()[2] ? this.span_id : (java.lang.Long) defaultValue(fields()[2]);
+ record.start = fieldSetFlags()[3] ? this.start : (java.lang.Long) defaultValue(fields()[3]);
+ record.stop = fieldSetFlags()[4] ? this.stop : (java.lang.Long) defaultValue(fields()[4]);
+ record.process_id = fieldSetFlags()[5] ? this.process_id : (java.lang.CharSequence) defaultValue(fields()[5]);
+ record.description = fieldSetFlags()[6] ? this.description : (java.lang.CharSequence) defaultValue(fields()[6]);
+ record.timeline_annotations = fieldSetFlags()[7] ? this.timeline_annotations : (java.util.List) defaultValue(fields()[7]);
+ record.kv_annotations = fieldSetFlags()[8] ? this.kv_annotations : (java.util.List) defaultValue(fields()[8]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/htrace-flume/src/main/java/org/apache/htrace/avro/TimelineAnnotation.java b/htrace-flume/src/main/java/org/apache/htrace/avro/TimelineAnnotation.java
new file mode 100644
index 0000000..bafacbf
--- /dev/null
+++ b/htrace-flume/src/main/java/org/apache/htrace/avro/TimelineAnnotation.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.htrace.avro;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class TimelineAnnotation extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"TimelineAnnotation\",\"namespace\":\"org.apache.htrace.avro\",\"fields\":[{\"name\":\"time\",\"type\":\"long\"},{\"name\":\"message\",\"type\":[\"null\",\"string\"]}]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+ @Deprecated public long time;
+ @Deprecated public java.lang.CharSequence message;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use newBuilder().
+ */
+ public TimelineAnnotation() {}
+
+ /**
+ * All-args constructor.
+ */
+ public TimelineAnnotation(java.lang.Long time, java.lang.CharSequence message) {
+ this.time = time;
+ this.message = message;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return time;
+ case 1: return message;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: time = (java.lang.Long)value$; break;
+ case 1: message = (java.lang.CharSequence)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'time' field.
+ */
+ public java.lang.Long getTime() {
+ return time;
+ }
+
+ /**
+ * Sets the value of the 'time' field.
+ * @param value the value to set.
+ */
+ public void setTime(java.lang.Long value) {
+ this.time = value;
+ }
+
+ /**
+ * Gets the value of the 'message' field.
+ */
+ public java.lang.CharSequence getMessage() {
+ return message;
+ }
+
+ /**
+ * Sets the value of the 'message' field.
+ * @param value the value to set.
+ */
+ public void setMessage(java.lang.CharSequence value) {
+ this.message = value;
+ }
+
+ /** Creates a new TimelineAnnotation RecordBuilder */
+ public static org.apache.htrace.avro.TimelineAnnotation.Builder newBuilder() {
+ return new org.apache.htrace.avro.TimelineAnnotation.Builder();
+ }
+
+ /** Creates a new TimelineAnnotation RecordBuilder by copying an existing Builder */
+ public static org.apache.htrace.avro.TimelineAnnotation.Builder newBuilder(org.apache.htrace.avro.TimelineAnnotation.Builder other) {
+ return new org.apache.htrace.avro.TimelineAnnotation.Builder(other);
+ }
+
+ /** Creates a new TimelineAnnotation RecordBuilder by copying an existing TimelineAnnotation instance */
+ public static org.apache.htrace.avro.TimelineAnnotation.Builder newBuilder(org.apache.htrace.avro.TimelineAnnotation other) {
+ return new org.apache.htrace.avro.TimelineAnnotation.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for TimelineAnnotation instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase
+ implements org.apache.avro.data.RecordBuilder {
+
+ private long time;
+ private java.lang.CharSequence message;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(org.apache.htrace.avro.TimelineAnnotation.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(org.apache.htrace.avro.TimelineAnnotation.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.time)) {
+ this.time = data().deepCopy(fields()[0].schema(), other.time);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.message)) {
+ this.message = data().deepCopy(fields()[1].schema(), other.message);
+ fieldSetFlags()[1] = true;
+ }
+ }
+
+ /** Creates a Builder by copying an existing TimelineAnnotation instance */
+ private Builder(org.apache.htrace.avro.TimelineAnnotation other) {
+ super(org.apache.htrace.avro.TimelineAnnotation.SCHEMA$);
+ if (isValidValue(fields()[0], other.time)) {
+ this.time = data().deepCopy(fields()[0].schema(), other.time);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.message)) {
+ this.message = data().deepCopy(fields()[1].schema(), other.message);
+ fieldSetFlags()[1] = true;
+ }
+ }
+
+ /** Gets the value of the 'time' field */
+ public java.lang.Long getTime() {
+ return time;
+ }
+
+ /** Sets the value of the 'time' field */
+ public org.apache.htrace.avro.TimelineAnnotation.Builder setTime(long value) {
+ validate(fields()[0], value);
+ this.time = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'time' field has been set */
+ public boolean hasTime() {
+ return fieldSetFlags()[0];
+ }
+
+ /** Clears the value of the 'time' field */
+ public org.apache.htrace.avro.TimelineAnnotation.Builder clearTime() {
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'message' field */
+ public java.lang.CharSequence getMessage() {
+ return message;
+ }
+
+ /** Sets the value of the 'message' field */
+ public org.apache.htrace.avro.TimelineAnnotation.Builder setMessage(java.lang.CharSequence value) {
+ validate(fields()[1], value);
+ this.message = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /** Checks whether the 'message' field has been set */
+ public boolean hasMessage() {
+ return fieldSetFlags()[1];
+ }
+
+ /** Clears the value of the 'message' field */
+ public org.apache.htrace.avro.TimelineAnnotation.Builder clearMessage() {
+ message = null;
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ @Override
+ public TimelineAnnotation build() {
+ try {
+ TimelineAnnotation record = new TimelineAnnotation();
+ record.time = fieldSetFlags()[0] ? this.time : (java.lang.Long) defaultValue(fields()[0]);
+ record.message = fieldSetFlags()[1] ? this.message : (java.lang.CharSequence) defaultValue(fields()[1]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/htrace-flume/src/main/java/org/apache/htrace/avro/htrace_avro.java b/htrace-flume/src/main/java/org/apache/htrace/avro/htrace_avro.java
new file mode 100644
index 0000000..37fcb94
--- /dev/null
+++ b/htrace-flume/src/main/java/org/apache/htrace/avro/htrace_avro.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.htrace.avro;
+
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public interface htrace_avro {
+ public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"htrace_avro\",\"namespace\":\"org.apache.htrace.avro\",\"types\":[{\"type\":\"record\",\"name\":\"TimelineAnnotation\",\"fields\":[{\"name\":\"time\",\"type\":\"long\"},{\"name\":\"message\",\"type\":[\"null\",\"string\"]}]},{\"type\":\"record\",\"name\":\"KVAnnotation\",\"fields\":[{\"name\":\"key\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"value\",\"type\":[\"null\",\"bytes\"]}]},{\"type\":\"record\",\"name\":\"Span\",\"fields\":[{\"name\":\"trace_id\",\"type\":\"long\"},{\"name\":\"parent_id\",\"type\":\"long\"},{\"name\":\"span_id\",\"type\":\"long\"},{\"name\":\"start\",\"type\":\"long\"},{\"name\":\"stop\",\"type\":\"long\"},{\"name\":\"process_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"description\",\"type\":[\"null\",\"string\"]},{\"name\":\"timeline_annotations\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"TimelineAnnotation\"}]},{\"name\":\"kv_annotations\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"KVAnnotation\"}]}]}],\"messages\":{}}");
+
+ @SuppressWarnings("all")
+ public interface Callback extends htrace_avro {
+ public static final org.apache.avro.Protocol PROTOCOL = org.apache.htrace.avro.htrace_avro.PROTOCOL;
+ }
+}
diff --git a/htrace-flume/src/main/java/org/apache/htrace/impl/AvroConverter.java b/htrace-flume/src/main/java/org/apache/htrace/impl/AvroConverter.java
new file mode 100644
index 0000000..05b491d
--- /dev/null
+++ b/htrace-flume/src/main/java/org/apache/htrace/impl/AvroConverter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.htrace.avro.Span;
+import org.apache.htrace.avro.TimelineAnnotation;
+import org.apache.htrace.avro.KVAnnotation;
+
+public class AvroConverter {
+
+ // Convert an HTrace span to an Avro span
+ private static Span convertToAvroSpan(org.apache.htrace.Span span) {
+ Span avroSpan = new Span();
+ avroSpan.setTraceId(span.getTraceId());
+ avroSpan.setParentId(span.getParentId());
+ avroSpan.setSpanId(span.getSpanId());
+ avroSpan.setStart(span.getStartTimeMillis());
+ avroSpan.setStop(span.getStopTimeMillis());
+ avroSpan.setProcessId(span.getProcessId());
+ avroSpan.setDescription(span.getDescription());
+
+ List tla = span.getTimelineAnnotations();
+ if (tla.size() > 0) {
+ ArrayList avroTla = new ArrayList(tla.size());
+ for (org.apache.htrace.TimelineAnnotation annotation : tla) {
+ TimelineAnnotation avroAnnotation = new TimelineAnnotation();
+ avroAnnotation.setTime(annotation.getTime());
+ avroAnnotation.setMessage(annotation.getMessage());
+ avroTla.add(avroAnnotation);
+ }
+ avroSpan.setTimelineAnnotations(avroTla);
+ }
+
+ Map kva = span.getKVAnnotations();
+ if (kva.size() > 0) {
+ ArrayList avroKva = new ArrayList(kva.size());
+ Iterator> it = kva.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry annotation = (Entry)it.next();
+ KVAnnotation avroAnnotation = new KVAnnotation();
+ avroAnnotation.setKey(ByteBuffer.wrap(annotation.getKey()));
+ avroAnnotation.setValue(ByteBuffer.wrap(annotation.getValue()));
+ avroKva.add(avroAnnotation);
+ }
+ avroSpan.setKvAnnotations(avroKva);
+ }
+ return avroSpan;
+ }
+
+ // Serialize
+ public static byte[] serialize(org.apache.htrace.Span span, FlumeSpanReceiver.SerializationFormat format) throws IOException {
+
+ Span avroSpan = convertToAvroSpan(span);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Encoder encoder = null;
+ switch (format) {
+ case Json:
+ encoder = EncoderFactory.get().jsonEncoder(avroSpan.getSchema(), out);
+ break;
+ case Avro:
+ default:
+ encoder = EncoderFactory.get().binaryEncoder(out, null);
+ break;
+ }
+ DatumWriter writer = new SpecificDatumWriter(Span.getClassSchema());
+ writer.write(avroSpan, encoder);
+ encoder.flush();
+ // since we use memory stream, no need to close in finally
+ out.close();
+
+ return out.toByteArray();
+ }
+
+ // Convert an Avro span to an HTrace span
+ private static org.apache.htrace.Span convertToHTraceSpan(Span avroSpan) {
+ org.apache.htrace.Span span = new org.apache.htrace.impl.MilliSpan(
+ avroSpan.getDescription().toString(),
+ avroSpan.getTraceId(),
+ avroSpan.getParentId(),
+ avroSpan.getSpanId(),
+ avroSpan.getProcessId().toString()
+ );
+
+ // org.apache.htrace.Span API does not allow setting Timeline Annotations directly
+ /*
+ List avroTla = avroSpan.getTimelineAnnotations();
+ if (avroTla != null && avroTla.size() > 0) {
+ ArrayList tla = new ArrayList(avroTla.size());
+ for (TimelineAnnotation avroAnnotation : avroTla) {
+ org.apache.htrace.TimelineAnnotation annotation = new org.apache.htrace.TimelineAnnotation(
+ avroAnnotation.getTime(),
+ avroAnnotation.getMessage().toString()
+ );
+ tla.add(annotation);
+ }
+ span.setTimelineAnnotations(tla);
+ }
+ */
+
+ List avroKva = avroSpan.getKvAnnotations();
+ if (avroKva != null) {
+ for (KVAnnotation avroAnnotation : avroKva) {
+ span.addKVAnnotation(avroAnnotation.getKey().array(), avroAnnotation.getValue().array());
+ }
+ }
+ return span;
+ }
+
+ // Deserialize
+ public static org.apache.htrace.Span deserialize(byte[] bytes, FlumeSpanReceiver.SerializationFormat format) throws IOException {
+ Decoder decoder = null;
+ switch (format) {
+ case Json:
+ decoder = DecoderFactory.get().jsonDecoder(Span.getClassSchema(), new ByteArrayInputStream(bytes));
+ break;
+ case Avro:
+ default:
+ decoder = DecoderFactory.get().binaryDecoder(bytes, null);
+ break;
+ }
+ SpecificDatumReader reader = new SpecificDatumReader(Span.class);
+ Span avroSpan = reader.read(null, decoder);
+ org.apache.htrace.Span span = convertToHTraceSpan(avroSpan);
+ return span;
+ }
+}
diff --git a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java
new file mode 100644
index 0000000..96dc848
--- /dev/null
+++ b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.apache.htrace.HTraceConfiguration;
+import org.apache.htrace.Span;
+import org.apache.htrace.SpanReceiver;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class FlumeSpanReceiver implements SpanReceiver {
+ private static final Log LOG = LogFactory.getLog(FlumeSpanReceiver.class);
+
+ public static final String NUM_THREADS_KEY = "htrace.flume.num-threads";
+ public static final int DEFAULT_NUM_THREADS = 1;
+ public static final String FLUME_HOSTNAME_KEY = "htrace.flume.hostname";
+ public static final String DEFAULT_FLUME_HOSTNAME = "localhost";
+ public static final String FLUME_PORT_KEY = "htrace.flume.port";
+ public static final String FLUME_BATCHSIZE_KEY = "htrace.flume.batchsize";
+ public static final int DEFAULT_FLUME_BATCHSIZE = 100;
+ public enum SerializationFormat {
+ Avro,
+ Json
+ };
+ public static final String FLUME_FORMAT_KEY = "htrace.flume.format";
+ public static final String DEFAULT_FLUME_FORMAT = SerializationFormat.Avro.name();
+
+ /**
+ * How long this receiver will try and wait for all threads to shutdown.
+ */
+ private static final int SHUTDOWN_TIMEOUT = 30;
+
+ /**
+ * How many errors in a row before we start dropping traces on the floor.
+ */
+ private static final int MAX_ERRORS = 10;
+
+ /**
+ * The queue that will get all HTrace spans that are to be sent.
+ */
+ private final BlockingQueue queue;
+
+ /**
+ * Boolean used to signal that the threads should end.
+ */
+ private final AtomicBoolean running = new AtomicBoolean(true);
+
+ /**
+ * The thread factory used to create new ExecutorService.
+ *
+ * This will be the same factory for the lifetime of this object so that
+ * no thread names will ever be duplicated.
+ */
+ private final ThreadFactory tf;
+
+ ////////////////////
+ /// Variables that will change on each call to configure()
+ ///////////////////
+ private ExecutorService service;
+ private int maxSpanBatchSize;
+ private String flumeHostName;
+ private int flumePort;
+ private SerializationFormat format;
+
+ public FlumeSpanReceiver(HTraceConfiguration conf) {
+ this.queue = new ArrayBlockingQueue(1000);
+ this.tf = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("flumeSpanReceiver-%d")
+ .build();
+ configure(conf);
+ }
+
+ private void configure (HTraceConfiguration conf) {
+
+ // Read configuration
+ int numThreads = conf.getInt(NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
+ this.flumeHostName = conf.get(FLUME_HOSTNAME_KEY, DEFAULT_FLUME_HOSTNAME);
+ this.flumePort = conf.getInt(FLUME_PORT_KEY, 0);
+ if (this.flumePort == 0) {
+ throw new IllegalArgumentException(FLUME_PORT_KEY + " is required in configuration.");
+ }
+ this.maxSpanBatchSize = conf.getInt(FLUME_BATCHSIZE_KEY, DEFAULT_FLUME_BATCHSIZE);
+ String format_string = conf.get(FLUME_FORMAT_KEY, DEFAULT_FLUME_FORMAT);
+ if (format_string.compareToIgnoreCase(SerializationFormat.Json.name()) == 0) {
+ this.format = SerializationFormat.Json;
+ } else {
+ if (format_string.compareToIgnoreCase(SerializationFormat.Avro.name()) != 0) {
+ LOG.warn("Unknown serialization format:" + format_string + ", default to Avro.");
+ }
+ this.format = SerializationFormat.Avro;
+ }
+
+ // Initialize executors
+ // If there are already threads running tear them down.
+ if (this.service != null) {
+ this.service.shutdownNow();
+ this.service = null;
+ }
+ this.service = Executors.newFixedThreadPool(numThreads, tf);
+ for (int i = 0; i < numThreads; i++) {
+ this.service.submit(new WriteSpanRunnable());
+ }
+ }
+
+ private class WriteSpanRunnable implements Runnable {
+ private RpcClient flumeClient = null;
+
+ /**
+ * This runnable sends a HTrace span to the Flume.
+ */
+ @Override
+ public void run() {
+ List dequeuedSpans = new ArrayList(maxSpanBatchSize);
+ long errorCount = 0;
+
+ while (running.get() || queue.size() > 0) {
+ Span firstSpan = null;
+ try {
+ // Block for up to a second. to try and get a span.
+ // We only block for a little bit in order to notice
+ // if the running value has changed
+ firstSpan = queue.poll(1, TimeUnit.SECONDS);
+
+ // If the poll was successful then it's possible that there
+ // will be other spans to get. Try and get them.
+ if (firstSpan != null) {
+ // Add the first one that we got
+ dequeuedSpans.add(firstSpan);
+ // Try and get up to 100 queues
+ queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1);
+ }
+ } catch (InterruptedException ie) {
+ // Ignored.
+ }
+
+ startClient();
+ if (dequeuedSpans.isEmpty()) {
+ continue;
+ }
+
+ try {
+ List events = new ArrayList(dequeuedSpans.size());
+ for (Span span : dequeuedSpans) {
+
+ // Headers allow Flume to filter
+ Map headers = new HashMap();
+ headers.put("TraceId", Long.toString(span.getTraceId()));
+ headers.put("SpanId", Long.toString(span.getSpanId()));
+ headers.put("ProcessId", span.getProcessId());
+ headers.put("Description", span.getDescription());
+
+ byte[] body = AvroConverter.serialize(span, format);
+
+ Event evt = EventBuilder.withBody(body, headers);
+ events.add(evt);
+ }
+ flumeClient.appendBatch(events);
+
+ // clear the list for the next time through.
+ dequeuedSpans.clear();
+ // reset the error counter.
+ errorCount = 0;
+ } catch (Exception e) {
+ errorCount += 1;
+ // If there have been ten errors in a row start dropping things.
+ if (errorCount < MAX_ERRORS) {
+ try {
+ queue.addAll(dequeuedSpans);
+ } catch (IllegalStateException ex) {
+ LOG.error("Drop " + dequeuedSpans.size() +
+ " span(s) because writing to HBase failed.");
+ }
+ }
+ closeClient();
+ try {
+ // Since there was an error sleep just a little bit to try and allow the
+ // HBase some time to recover.
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ // Ignored
+ }
+ }
+ }
+ closeClient();
+ }
+
+ /**
+ * Close Flume RPC client
+ */
+ private void closeClient() {
+ if (flumeClient != null) {
+ try {
+ flumeClient.close();
+ } catch (FlumeException ex) {
+ LOG.warn("Error while trying to close Flume Rpc Client.", ex);
+ } finally {
+ flumeClient = null;
+ }
+ }
+ }
+
+ /**
+ * Create / reconnect Flume RPC client
+ */
+ private void startClient() {
+ // If current client is inactive, close it
+ if (flumeClient != null && !flumeClient.isActive()) {
+ flumeClient.close();
+ flumeClient = null;
+ }
+ // Create client if needed
+ if (flumeClient == null) {
+ try {
+ flumeClient = RpcClientFactory.getDefaultInstance(flumeHostName, flumePort, maxSpanBatchSize);
+ } catch (FlumeException e) {
+ LOG.warn("Failed to create Flume RPC Client. " + e.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * Close the receiver.
+ *
+ * This tries to shutdown thread pool.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ running.set(false);
+ service.shutdown();
+ try {
+ if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
+ LOG.error("Was not able to process all remaining spans upon closing in: " +
+ SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS +
+ ". Left Spans could be dropped.");
+ }
+ } catch (InterruptedException e1) {
+ LOG.warn("Thread interrupted when terminating executor.", e1);
+ }
+ }
+
+ @Override
+ public void receiveSpan(Span span) {
+ if (running.get()) {
+ try {
+ this.queue.add(span);
+ } catch (IllegalStateException e) {
+ LOG.error("Error trying to append span (" +
+ span.getDescription() +
+ ") to the queue. Blocking Queue was full.");
+ }
+ }
+ }
+}
diff --git a/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java
new file mode 100644
index 0000000..5175be5
--- /dev/null
+++ b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcTestUtils;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+import org.apache.htrace.HTraceConfiguration;
+import org.apache.htrace.Span;
+import org.apache.htrace.SpanReceiver;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceCreator;
+import org.apache.htrace.impl.FlumeSpanReceiver.SerializationFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFlumeSpanReceiver {
+ private static final Log LOG = LogFactory.getLog(TestFlumeSpanReceiver.class);
+
+ private static final String ROOT_SPAN_DESC = "ROOT";
+
+ private SpanReceiver spanReceiver;
+ private Server flumeServer;
+ private TraceCreator traceCreator;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ public ArrayList addSimpleTraces() throws FlumeException,
+ EventDeliveryException, IOException {
+ ArrayList spans = new ArrayList();
+ Span rootSpan = new MilliSpan(ROOT_SPAN_DESC, 1, Span.ROOT_SPAN_ID, 100, "test");
+ Span innerOne = rootSpan.child("Some good work");
+ Span innerTwo = innerOne.child("Some more good work");
+ innerTwo.stop();
+ spans.add(innerTwo);
+ innerOne.stop();
+ spans.add(innerOne);
+ rootSpan.addKVAnnotation("foo".getBytes(), "bar".getBytes());
+ rootSpan.addTimelineAnnotation("timeline");
+ rootSpan.stop();
+ spans.add(rootSpan);
+ return spans;
+ }
+
+ @Test
+ public void testAvroConverter() throws FlumeException,
+ EventDeliveryException, IOException {
+ AvroHandler avroHandler = null;
+ ArrayList spans = null;
+ try {
+ avroHandler = new AvroHandler();
+ startReceiver(SerializationFormat.Avro, null, avroHandler);
+ spans = addSimpleTraces();
+ } finally {
+ stopReceiver();
+ }
+ List events = avroHandler.getAllEvents();
+ verifySpans(spans, events, SerializationFormat.Avro);
+ }
+
+ @Test
+ public void testJsonConverter() throws FlumeException,
+ EventDeliveryException, IOException {
+ AvroHandler avroHandler = null;
+ List spans = null;
+ try {
+ avroHandler = new AvroHandler();
+ startReceiver(SerializationFormat.Json, null, avroHandler);
+ spans = addSimpleTraces();
+ } finally {
+ stopReceiver();
+ }
+ List events = avroHandler.getAllEvents();
+ verifySpans(spans, events, SerializationFormat.Json);
+ }
+
+ @Test
+ public void testConcurrency() throws FlumeException,
+ EventDeliveryException, IOException {
+ try {
+ Map extraConf = new HashMap();
+ extraConf.put(FlumeSpanReceiver.NUM_THREADS_KEY, "5");
+ startReceiver(SerializationFormat.Avro, extraConf, new RpcTestUtils.OKAvroHandler());
+ traceCreator.createThreadedTrace();
+ } finally {
+ stopReceiver();
+ }
+ }
+
+ @Test
+ public void testResilience() throws FlumeException,
+ EventDeliveryException, IOException {
+ try {
+ startReceiver(SerializationFormat.Avro, null, new RpcTestUtils.FailedAvroHandler());
+ traceCreator.createThreadedTrace();
+ } finally {
+ stopReceiver();
+ }
+ }
+
+ private void startReceiver(SerializationFormat format,
+ Map extraConf, AvroSourceProtocol avroHandler) {
+ // Start Flume server
+ Assert.assertNull(flumeServer);
+ flumeServer = RpcTestUtils.startServer(avroHandler);
+
+ // Create and configure span receiver
+ Map conf = new HashMap();
+ conf.put(FlumeSpanReceiver.FLUME_HOSTNAME_KEY, "127.0.0.1");
+ conf.put(FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort()));
+ conf.put(FlumeSpanReceiver.FLUME_FORMAT_KEY, format.name());
+ if (extraConf != null) {
+ conf.putAll(extraConf);
+ }
+
+ spanReceiver = new FlumeSpanReceiver(HTraceConfiguration.fromMap(conf));
+
+ // Create trace creator, it will register our receiver
+ traceCreator = new TraceCreator(spanReceiver);
+ }
+
+ private void stopReceiver() throws IOException {
+ // Close span receiver
+ if (spanReceiver != null) {
+ Trace.removeReceiver(spanReceiver);
+ spanReceiver.close();
+ spanReceiver = null;
+ }
+
+ // Close Flume server
+ if (flumeServer != null) {
+ RpcTestUtils.stopServer(flumeServer);
+ flumeServer = null;
+ }
+ }
+
+ private static class AvroHandler implements AvroSourceProtocol {
+ private ArrayList all_events = new ArrayList();
+
+ public List getAllEvents() {
+ return new ArrayList(all_events);
+ }
+
+ @Override
+ public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ all_events.add(event);
+ return Status.OK;
+ }
+
+ @Override
+ public Status appendBatch(List events) throws
+ AvroRemoteException {
+ all_events.addAll(events);
+ return Status.OK;
+ }
+ }
+
+ private void verifySpanEqual(Span s1, Span s2) {
+ Assert.assertEquals(s1.getTraceId(), s2.getTraceId());
+ Assert.assertEquals(s1.getParentId(), s2.getParentId());
+ Assert.assertEquals(s1.getSpanId(), s2.getSpanId());
+ Assert.assertEquals(s1.getProcessId(), s2.getProcessId());
+ Assert.assertEquals(s1.getDescription(), s2.getDescription());
+ Assert.assertEquals(s1.getKVAnnotations().size(), s2.getKVAnnotations().size());
+
+ // Cannot verify these fields because they cannot be deserialized. See AvroConverter.deserialize.
+ // Assert.assertEquals(s1.getStartTimeMillis(), s2.getStartTimeMillis());
+ // Assert.assertEquals(s1.getStopTimeMillis(), s2.getStopTimeMillis());
+ // Assert.assertEquals(s1.getTimelineAnnotations().size(), s2.getTimelineAnnotations().size());
+ }
+
+ private void verifySpans(List spans, List events, SerializationFormat format) throws IOException {
+ Assert.assertEquals(spans.size(), events.size());
+ for (int i = 0; i < spans.size(); i ++) {
+ Span s1 = AvroConverter.deserialize(events.get(i).getBody().array(), format);
+ verifySpanEqual(spans.get(i), s1);
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index b14c6b8..ee3135e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,6 +24,7 @@ language governing permissions and limitations under the License. -->
htrace-core
htrace-zipkin
htrace-hbase
+ htrace-flume