diff --git a/htrace-flume/pom.xml b/htrace-flume/pom.xml new file mode 100644 index 0000000..3f8d70c --- /dev/null +++ b/htrace-flume/pom.xml @@ -0,0 +1,117 @@ + + + + 4.0.0 + + htrace-flume + jar + + + htrace + org.apache.htrace + 3.1.0-SNAPSHOT + + + htrace-flume + https://github.com/cloudera/htrace + + + UTF-8 + 1.6.0-SNAPSHOT + + + + + + org.apache.maven.plugins + maven-source-plugin + + + maven-javadoc-plugin + + + maven-compiler-plugin + + + org.apache.maven.plugins + maven-gpg-plugin + + + org.apache.rat + apache-rat-plugin + + + + maven-deploy-plugin + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + + + + + + org.apache.htrace + htrace-core + ${project.version} + provided + + + org.apache.htrace + htrace-core + ${project.version} + tests + test + + + + commons-logging + commons-logging + provided + + + com.google.guava + guava + provided + + + junit + junit + test + + + + org.apache.flume + flume-ng-sdk + ${flume.version} + + + org.apache.flume + flume-ng-sdk + ${flume.version} + tests + test + + + org.apache.avro + avro + 1.7.4 + + + diff --git a/htrace-flume/src/main/avro/span.avdl b/htrace-flume/src/main/avro/span.avdl new file mode 100644 index 0000000..0eb143f --- /dev/null +++ b/htrace-flume/src/main/avro/span.avdl @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@namespace ("org.apache.htrace.avro") +protocol htrace_avro { + record TimelineAnnotation { + long time; + union {null, string} message; + } + + record KVAnnotation { + union {null, bytes} key; + union {null, bytes} value; + } + + record Span { + long trace_id; + long parent_id; + long span_id; + long start; + long stop; + union {null, string} process_id; + union {null, string} description; + union {null, array} timeline_annotations; + union {null, array} kv_annotations; + } +} diff --git a/htrace-flume/src/main/java/org/apache/htrace/avro/KVAnnotation.java b/htrace-flume/src/main/java/org/apache/htrace/avro/KVAnnotation.java new file mode 100644 index 0000000..64e7581 --- /dev/null +++ b/htrace-flume/src/main/java/org/apache/htrace/avro/KVAnnotation.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.htrace.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class KVAnnotation extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"KVAnnotation\",\"namespace\":\"org.apache.htrace.avro\",\"fields\":[{\"name\":\"key\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"value\",\"type\":[\"null\",\"bytes\"]}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + @Deprecated public java.nio.ByteBuffer key; + @Deprecated public java.nio.ByteBuffer value; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public KVAnnotation() {} + + /** + * All-args constructor. + */ + public KVAnnotation(java.nio.ByteBuffer key, java.nio.ByteBuffer value) { + this.key = key; + this.value = value; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return key; + case 1: return value; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: key = (java.nio.ByteBuffer)value$; break; + case 1: value = (java.nio.ByteBuffer)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'key' field. + */ + public java.nio.ByteBuffer getKey() { + return key; + } + + /** + * Sets the value of the 'key' field. + * @param value the value to set. + */ + public void setKey(java.nio.ByteBuffer value) { + this.key = value; + } + + /** + * Gets the value of the 'value' field. + */ + public java.nio.ByteBuffer getValue() { + return value; + } + + /** + * Sets the value of the 'value' field. + * @param value the value to set. + */ + public void setValue(java.nio.ByteBuffer value) { + this.value = value; + } + + /** Creates a new KVAnnotation RecordBuilder */ + public static org.apache.htrace.avro.KVAnnotation.Builder newBuilder() { + return new org.apache.htrace.avro.KVAnnotation.Builder(); + } + + /** Creates a new KVAnnotation RecordBuilder by copying an existing Builder */ + public static org.apache.htrace.avro.KVAnnotation.Builder newBuilder(org.apache.htrace.avro.KVAnnotation.Builder other) { + return new org.apache.htrace.avro.KVAnnotation.Builder(other); + } + + /** Creates a new KVAnnotation RecordBuilder by copying an existing KVAnnotation instance */ + public static org.apache.htrace.avro.KVAnnotation.Builder newBuilder(org.apache.htrace.avro.KVAnnotation other) { + return new org.apache.htrace.avro.KVAnnotation.Builder(other); + } + + /** + * RecordBuilder for KVAnnotation instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private java.nio.ByteBuffer key; + private java.nio.ByteBuffer value; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.htrace.avro.KVAnnotation.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.htrace.avro.KVAnnotation.Builder other) { + super(other); + if (isValidValue(fields()[0], other.key)) { + this.key = data().deepCopy(fields()[0].schema(), other.key); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.value)) { + this.value = data().deepCopy(fields()[1].schema(), other.value); + fieldSetFlags()[1] = true; + } + } + + /** Creates a Builder by copying an existing KVAnnotation instance */ + private Builder(org.apache.htrace.avro.KVAnnotation other) { + super(org.apache.htrace.avro.KVAnnotation.SCHEMA$); + if (isValidValue(fields()[0], other.key)) { + this.key = data().deepCopy(fields()[0].schema(), other.key); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.value)) { + this.value = data().deepCopy(fields()[1].schema(), other.value); + fieldSetFlags()[1] = true; + } + } + + /** Gets the value of the 'key' field */ + public java.nio.ByteBuffer getKey() { + return key; + } + + /** Sets the value of the 'key' field */ + public org.apache.htrace.avro.KVAnnotation.Builder setKey(java.nio.ByteBuffer value) { + validate(fields()[0], value); + this.key = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'key' field has been set */ + public boolean hasKey() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'key' field */ + public org.apache.htrace.avro.KVAnnotation.Builder clearKey() { + key = null; + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'value' field */ + public java.nio.ByteBuffer getValue() { + return value; + } + + /** Sets the value of the 'value' field */ + public org.apache.htrace.avro.KVAnnotation.Builder setValue(java.nio.ByteBuffer value) { + validate(fields()[1], value); + this.value = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'value' field has been set */ + public boolean hasValue() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'value' field */ + public org.apache.htrace.avro.KVAnnotation.Builder clearValue() { + value = null; + fieldSetFlags()[1] = false; + return this; + } + + @Override + public KVAnnotation build() { + try { + KVAnnotation record = new KVAnnotation(); + record.key = fieldSetFlags()[0] ? this.key : (java.nio.ByteBuffer) defaultValue(fields()[0]); + record.value = fieldSetFlags()[1] ? this.value : (java.nio.ByteBuffer) defaultValue(fields()[1]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} diff --git a/htrace-flume/src/main/java/org/apache/htrace/avro/Span.java b/htrace-flume/src/main/java/org/apache/htrace/avro/Span.java new file mode 100644 index 0000000..0688b28 --- /dev/null +++ b/htrace-flume/src/main/java/org/apache/htrace/avro/Span.java @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.htrace.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class Span extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Span\",\"namespace\":\"org.apache.htrace.avro\",\"fields\":[{\"name\":\"trace_id\",\"type\":\"long\"},{\"name\":\"parent_id\",\"type\":\"long\"},{\"name\":\"span_id\",\"type\":\"long\"},{\"name\":\"start\",\"type\":\"long\"},{\"name\":\"stop\",\"type\":\"long\"},{\"name\":\"process_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"description\",\"type\":[\"null\",\"string\"]},{\"name\":\"timeline_annotations\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"TimelineAnnotation\",\"fields\":[{\"name\":\"time\",\"type\":\"long\"},{\"name\":\"message\",\"type\":[\"null\",\"string\"]}]}}]},{\"name\":\"kv_annotations\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"KVAnnotation\",\"fields\":[{\"name\":\"key\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"value\",\"type\":[\"null\",\"bytes\"]}]}}]}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + @Deprecated public long trace_id; + @Deprecated public long parent_id; + @Deprecated public long span_id; + @Deprecated public long start; + @Deprecated public long stop; + @Deprecated public java.lang.CharSequence process_id; + @Deprecated public java.lang.CharSequence description; + @Deprecated public java.util.List timeline_annotations; + @Deprecated public java.util.List kv_annotations; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public Span() {} + + /** + * All-args constructor. + */ + public Span(java.lang.Long trace_id, java.lang.Long parent_id, java.lang.Long span_id, java.lang.Long start, java.lang.Long stop, java.lang.CharSequence process_id, java.lang.CharSequence description, java.util.List timeline_annotations, java.util.List kv_annotations) { + this.trace_id = trace_id; + this.parent_id = parent_id; + this.span_id = span_id; + this.start = start; + this.stop = stop; + this.process_id = process_id; + this.description = description; + this.timeline_annotations = timeline_annotations; + this.kv_annotations = kv_annotations; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return trace_id; + case 1: return parent_id; + case 2: return span_id; + case 3: return start; + case 4: return stop; + case 5: return process_id; + case 6: return description; + case 7: return timeline_annotations; + case 8: return kv_annotations; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: trace_id = (java.lang.Long)value$; break; + case 1: parent_id = (java.lang.Long)value$; break; + case 2: span_id = (java.lang.Long)value$; break; + case 3: start = (java.lang.Long)value$; break; + case 4: stop = (java.lang.Long)value$; break; + case 5: process_id = (java.lang.CharSequence)value$; break; + case 6: description = (java.lang.CharSequence)value$; break; + case 7: timeline_annotations = (java.util.List)value$; break; + case 8: kv_annotations = (java.util.List)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'trace_id' field. + */ + public java.lang.Long getTraceId() { + return trace_id; + } + + /** + * Sets the value of the 'trace_id' field. + * @param value the value to set. + */ + public void setTraceId(java.lang.Long value) { + this.trace_id = value; + } + + /** + * Gets the value of the 'parent_id' field. + */ + public java.lang.Long getParentId() { + return parent_id; + } + + /** + * Sets the value of the 'parent_id' field. + * @param value the value to set. + */ + public void setParentId(java.lang.Long value) { + this.parent_id = value; + } + + /** + * Gets the value of the 'span_id' field. + */ + public java.lang.Long getSpanId() { + return span_id; + } + + /** + * Sets the value of the 'span_id' field. + * @param value the value to set. + */ + public void setSpanId(java.lang.Long value) { + this.span_id = value; + } + + /** + * Gets the value of the 'start' field. + */ + public java.lang.Long getStart() { + return start; + } + + /** + * Sets the value of the 'start' field. + * @param value the value to set. + */ + public void setStart(java.lang.Long value) { + this.start = value; + } + + /** + * Gets the value of the 'stop' field. + */ + public java.lang.Long getStop() { + return stop; + } + + /** + * Sets the value of the 'stop' field. + * @param value the value to set. + */ + public void setStop(java.lang.Long value) { + this.stop = value; + } + + /** + * Gets the value of the 'process_id' field. + */ + public java.lang.CharSequence getProcessId() { + return process_id; + } + + /** + * Sets the value of the 'process_id' field. + * @param value the value to set. + */ + public void setProcessId(java.lang.CharSequence value) { + this.process_id = value; + } + + /** + * Gets the value of the 'description' field. + */ + public java.lang.CharSequence getDescription() { + return description; + } + + /** + * Sets the value of the 'description' field. + * @param value the value to set. + */ + public void setDescription(java.lang.CharSequence value) { + this.description = value; + } + + /** + * Gets the value of the 'timeline_annotations' field. + */ + public java.util.List getTimelineAnnotations() { + return timeline_annotations; + } + + /** + * Sets the value of the 'timeline_annotations' field. + * @param value the value to set. + */ + public void setTimelineAnnotations(java.util.List value) { + this.timeline_annotations = value; + } + + /** + * Gets the value of the 'kv_annotations' field. + */ + public java.util.List getKvAnnotations() { + return kv_annotations; + } + + /** + * Sets the value of the 'kv_annotations' field. + * @param value the value to set. + */ + public void setKvAnnotations(java.util.List value) { + this.kv_annotations = value; + } + + /** Creates a new Span RecordBuilder */ + public static org.apache.htrace.avro.Span.Builder newBuilder() { + return new org.apache.htrace.avro.Span.Builder(); + } + + /** Creates a new Span RecordBuilder by copying an existing Builder */ + public static org.apache.htrace.avro.Span.Builder newBuilder(org.apache.htrace.avro.Span.Builder other) { + return new org.apache.htrace.avro.Span.Builder(other); + } + + /** Creates a new Span RecordBuilder by copying an existing Span instance */ + public static org.apache.htrace.avro.Span.Builder newBuilder(org.apache.htrace.avro.Span other) { + return new org.apache.htrace.avro.Span.Builder(other); + } + + /** + * RecordBuilder for Span instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private long trace_id; + private long parent_id; + private long span_id; + private long start; + private long stop; + private java.lang.CharSequence process_id; + private java.lang.CharSequence description; + private java.util.List timeline_annotations; + private java.util.List kv_annotations; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.htrace.avro.Span.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.htrace.avro.Span.Builder other) { + super(other); + if (isValidValue(fields()[0], other.trace_id)) { + this.trace_id = data().deepCopy(fields()[0].schema(), other.trace_id); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.parent_id)) { + this.parent_id = data().deepCopy(fields()[1].schema(), other.parent_id); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.span_id)) { + this.span_id = data().deepCopy(fields()[2].schema(), other.span_id); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.start)) { + this.start = data().deepCopy(fields()[3].schema(), other.start); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.stop)) { + this.stop = data().deepCopy(fields()[4].schema(), other.stop); + fieldSetFlags()[4] = true; + } + if (isValidValue(fields()[5], other.process_id)) { + this.process_id = data().deepCopy(fields()[5].schema(), other.process_id); + fieldSetFlags()[5] = true; + } + if (isValidValue(fields()[6], other.description)) { + this.description = data().deepCopy(fields()[6].schema(), other.description); + fieldSetFlags()[6] = true; + } + if (isValidValue(fields()[7], other.timeline_annotations)) { + this.timeline_annotations = data().deepCopy(fields()[7].schema(), other.timeline_annotations); + fieldSetFlags()[7] = true; + } + if (isValidValue(fields()[8], other.kv_annotations)) { + this.kv_annotations = data().deepCopy(fields()[8].schema(), other.kv_annotations); + fieldSetFlags()[8] = true; + } + } + + /** Creates a Builder by copying an existing Span instance */ + private Builder(org.apache.htrace.avro.Span other) { + super(org.apache.htrace.avro.Span.SCHEMA$); + if (isValidValue(fields()[0], other.trace_id)) { + this.trace_id = data().deepCopy(fields()[0].schema(), other.trace_id); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.parent_id)) { + this.parent_id = data().deepCopy(fields()[1].schema(), other.parent_id); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.span_id)) { + this.span_id = data().deepCopy(fields()[2].schema(), other.span_id); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.start)) { + this.start = data().deepCopy(fields()[3].schema(), other.start); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.stop)) { + this.stop = data().deepCopy(fields()[4].schema(), other.stop); + fieldSetFlags()[4] = true; + } + if (isValidValue(fields()[5], other.process_id)) { + this.process_id = data().deepCopy(fields()[5].schema(), other.process_id); + fieldSetFlags()[5] = true; + } + if (isValidValue(fields()[6], other.description)) { + this.description = data().deepCopy(fields()[6].schema(), other.description); + fieldSetFlags()[6] = true; + } + if (isValidValue(fields()[7], other.timeline_annotations)) { + this.timeline_annotations = data().deepCopy(fields()[7].schema(), other.timeline_annotations); + fieldSetFlags()[7] = true; + } + if (isValidValue(fields()[8], other.kv_annotations)) { + this.kv_annotations = data().deepCopy(fields()[8].schema(), other.kv_annotations); + fieldSetFlags()[8] = true; + } + } + + /** Gets the value of the 'trace_id' field */ + public java.lang.Long getTraceId() { + return trace_id; + } + + /** Sets the value of the 'trace_id' field */ + public org.apache.htrace.avro.Span.Builder setTraceId(long value) { + validate(fields()[0], value); + this.trace_id = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'trace_id' field has been set */ + public boolean hasTraceId() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'trace_id' field */ + public org.apache.htrace.avro.Span.Builder clearTraceId() { + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'parent_id' field */ + public java.lang.Long getParentId() { + return parent_id; + } + + /** Sets the value of the 'parent_id' field */ + public org.apache.htrace.avro.Span.Builder setParentId(long value) { + validate(fields()[1], value); + this.parent_id = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'parent_id' field has been set */ + public boolean hasParentId() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'parent_id' field */ + public org.apache.htrace.avro.Span.Builder clearParentId() { + fieldSetFlags()[1] = false; + return this; + } + + /** Gets the value of the 'span_id' field */ + public java.lang.Long getSpanId() { + return span_id; + } + + /** Sets the value of the 'span_id' field */ + public org.apache.htrace.avro.Span.Builder setSpanId(long value) { + validate(fields()[2], value); + this.span_id = value; + fieldSetFlags()[2] = true; + return this; + } + + /** Checks whether the 'span_id' field has been set */ + public boolean hasSpanId() { + return fieldSetFlags()[2]; + } + + /** Clears the value of the 'span_id' field */ + public org.apache.htrace.avro.Span.Builder clearSpanId() { + fieldSetFlags()[2] = false; + return this; + } + + /** Gets the value of the 'start' field */ + public java.lang.Long getStart() { + return start; + } + + /** Sets the value of the 'start' field */ + public org.apache.htrace.avro.Span.Builder setStart(long value) { + validate(fields()[3], value); + this.start = value; + fieldSetFlags()[3] = true; + return this; + } + + /** Checks whether the 'start' field has been set */ + public boolean hasStart() { + return fieldSetFlags()[3]; + } + + /** Clears the value of the 'start' field */ + public org.apache.htrace.avro.Span.Builder clearStart() { + fieldSetFlags()[3] = false; + return this; + } + + /** Gets the value of the 'stop' field */ + public java.lang.Long getStop() { + return stop; + } + + /** Sets the value of the 'stop' field */ + public org.apache.htrace.avro.Span.Builder setStop(long value) { + validate(fields()[4], value); + this.stop = value; + fieldSetFlags()[4] = true; + return this; + } + + /** Checks whether the 'stop' field has been set */ + public boolean hasStop() { + return fieldSetFlags()[4]; + } + + /** Clears the value of the 'stop' field */ + public org.apache.htrace.avro.Span.Builder clearStop() { + fieldSetFlags()[4] = false; + return this; + } + + /** Gets the value of the 'process_id' field */ + public java.lang.CharSequence getProcessId() { + return process_id; + } + + /** Sets the value of the 'process_id' field */ + public org.apache.htrace.avro.Span.Builder setProcessId(java.lang.CharSequence value) { + validate(fields()[5], value); + this.process_id = value; + fieldSetFlags()[5] = true; + return this; + } + + /** Checks whether the 'process_id' field has been set */ + public boolean hasProcessId() { + return fieldSetFlags()[5]; + } + + /** Clears the value of the 'process_id' field */ + public org.apache.htrace.avro.Span.Builder clearProcessId() { + process_id = null; + fieldSetFlags()[5] = false; + return this; + } + + /** Gets the value of the 'description' field */ + public java.lang.CharSequence getDescription() { + return description; + } + + /** Sets the value of the 'description' field */ + public org.apache.htrace.avro.Span.Builder setDescription(java.lang.CharSequence value) { + validate(fields()[6], value); + this.description = value; + fieldSetFlags()[6] = true; + return this; + } + + /** Checks whether the 'description' field has been set */ + public boolean hasDescription() { + return fieldSetFlags()[6]; + } + + /** Clears the value of the 'description' field */ + public org.apache.htrace.avro.Span.Builder clearDescription() { + description = null; + fieldSetFlags()[6] = false; + return this; + } + + /** Gets the value of the 'timeline_annotations' field */ + public java.util.List getTimelineAnnotations() { + return timeline_annotations; + } + + /** Sets the value of the 'timeline_annotations' field */ + public org.apache.htrace.avro.Span.Builder setTimelineAnnotations(java.util.List value) { + validate(fields()[7], value); + this.timeline_annotations = value; + fieldSetFlags()[7] = true; + return this; + } + + /** Checks whether the 'timeline_annotations' field has been set */ + public boolean hasTimelineAnnotations() { + return fieldSetFlags()[7]; + } + + /** Clears the value of the 'timeline_annotations' field */ + public org.apache.htrace.avro.Span.Builder clearTimelineAnnotations() { + timeline_annotations = null; + fieldSetFlags()[7] = false; + return this; + } + + /** Gets the value of the 'kv_annotations' field */ + public java.util.List getKvAnnotations() { + return kv_annotations; + } + + /** Sets the value of the 'kv_annotations' field */ + public org.apache.htrace.avro.Span.Builder setKvAnnotations(java.util.List value) { + validate(fields()[8], value); + this.kv_annotations = value; + fieldSetFlags()[8] = true; + return this; + } + + /** Checks whether the 'kv_annotations' field has been set */ + public boolean hasKvAnnotations() { + return fieldSetFlags()[8]; + } + + /** Clears the value of the 'kv_annotations' field */ + public org.apache.htrace.avro.Span.Builder clearKvAnnotations() { + kv_annotations = null; + fieldSetFlags()[8] = false; + return this; + } + + @Override + public Span build() { + try { + Span record = new Span(); + record.trace_id = fieldSetFlags()[0] ? this.trace_id : (java.lang.Long) defaultValue(fields()[0]); + record.parent_id = fieldSetFlags()[1] ? this.parent_id : (java.lang.Long) defaultValue(fields()[1]); + record.span_id = fieldSetFlags()[2] ? this.span_id : (java.lang.Long) defaultValue(fields()[2]); + record.start = fieldSetFlags()[3] ? this.start : (java.lang.Long) defaultValue(fields()[3]); + record.stop = fieldSetFlags()[4] ? this.stop : (java.lang.Long) defaultValue(fields()[4]); + record.process_id = fieldSetFlags()[5] ? this.process_id : (java.lang.CharSequence) defaultValue(fields()[5]); + record.description = fieldSetFlags()[6] ? this.description : (java.lang.CharSequence) defaultValue(fields()[6]); + record.timeline_annotations = fieldSetFlags()[7] ? this.timeline_annotations : (java.util.List) defaultValue(fields()[7]); + record.kv_annotations = fieldSetFlags()[8] ? this.kv_annotations : (java.util.List) defaultValue(fields()[8]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} diff --git a/htrace-flume/src/main/java/org/apache/htrace/avro/TimelineAnnotation.java b/htrace-flume/src/main/java/org/apache/htrace/avro/TimelineAnnotation.java new file mode 100644 index 0000000..bafacbf --- /dev/null +++ b/htrace-flume/src/main/java/org/apache/htrace/avro/TimelineAnnotation.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.htrace.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class TimelineAnnotation extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"TimelineAnnotation\",\"namespace\":\"org.apache.htrace.avro\",\"fields\":[{\"name\":\"time\",\"type\":\"long\"},{\"name\":\"message\",\"type\":[\"null\",\"string\"]}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + @Deprecated public long time; + @Deprecated public java.lang.CharSequence message; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public TimelineAnnotation() {} + + /** + * All-args constructor. + */ + public TimelineAnnotation(java.lang.Long time, java.lang.CharSequence message) { + this.time = time; + this.message = message; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return time; + case 1: return message; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: time = (java.lang.Long)value$; break; + case 1: message = (java.lang.CharSequence)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'time' field. + */ + public java.lang.Long getTime() { + return time; + } + + /** + * Sets the value of the 'time' field. + * @param value the value to set. + */ + public void setTime(java.lang.Long value) { + this.time = value; + } + + /** + * Gets the value of the 'message' field. + */ + public java.lang.CharSequence getMessage() { + return message; + } + + /** + * Sets the value of the 'message' field. + * @param value the value to set. + */ + public void setMessage(java.lang.CharSequence value) { + this.message = value; + } + + /** Creates a new TimelineAnnotation RecordBuilder */ + public static org.apache.htrace.avro.TimelineAnnotation.Builder newBuilder() { + return new org.apache.htrace.avro.TimelineAnnotation.Builder(); + } + + /** Creates a new TimelineAnnotation RecordBuilder by copying an existing Builder */ + public static org.apache.htrace.avro.TimelineAnnotation.Builder newBuilder(org.apache.htrace.avro.TimelineAnnotation.Builder other) { + return new org.apache.htrace.avro.TimelineAnnotation.Builder(other); + } + + /** Creates a new TimelineAnnotation RecordBuilder by copying an existing TimelineAnnotation instance */ + public static org.apache.htrace.avro.TimelineAnnotation.Builder newBuilder(org.apache.htrace.avro.TimelineAnnotation other) { + return new org.apache.htrace.avro.TimelineAnnotation.Builder(other); + } + + /** + * RecordBuilder for TimelineAnnotation instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private long time; + private java.lang.CharSequence message; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.htrace.avro.TimelineAnnotation.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.htrace.avro.TimelineAnnotation.Builder other) { + super(other); + if (isValidValue(fields()[0], other.time)) { + this.time = data().deepCopy(fields()[0].schema(), other.time); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.message)) { + this.message = data().deepCopy(fields()[1].schema(), other.message); + fieldSetFlags()[1] = true; + } + } + + /** Creates a Builder by copying an existing TimelineAnnotation instance */ + private Builder(org.apache.htrace.avro.TimelineAnnotation other) { + super(org.apache.htrace.avro.TimelineAnnotation.SCHEMA$); + if (isValidValue(fields()[0], other.time)) { + this.time = data().deepCopy(fields()[0].schema(), other.time); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.message)) { + this.message = data().deepCopy(fields()[1].schema(), other.message); + fieldSetFlags()[1] = true; + } + } + + /** Gets the value of the 'time' field */ + public java.lang.Long getTime() { + return time; + } + + /** Sets the value of the 'time' field */ + public org.apache.htrace.avro.TimelineAnnotation.Builder setTime(long value) { + validate(fields()[0], value); + this.time = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'time' field has been set */ + public boolean hasTime() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'time' field */ + public org.apache.htrace.avro.TimelineAnnotation.Builder clearTime() { + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'message' field */ + public java.lang.CharSequence getMessage() { + return message; + } + + /** Sets the value of the 'message' field */ + public org.apache.htrace.avro.TimelineAnnotation.Builder setMessage(java.lang.CharSequence value) { + validate(fields()[1], value); + this.message = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'message' field has been set */ + public boolean hasMessage() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'message' field */ + public org.apache.htrace.avro.TimelineAnnotation.Builder clearMessage() { + message = null; + fieldSetFlags()[1] = false; + return this; + } + + @Override + public TimelineAnnotation build() { + try { + TimelineAnnotation record = new TimelineAnnotation(); + record.time = fieldSetFlags()[0] ? this.time : (java.lang.Long) defaultValue(fields()[0]); + record.message = fieldSetFlags()[1] ? this.message : (java.lang.CharSequence) defaultValue(fields()[1]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} diff --git a/htrace-flume/src/main/java/org/apache/htrace/avro/htrace_avro.java b/htrace-flume/src/main/java/org/apache/htrace/avro/htrace_avro.java new file mode 100644 index 0000000..37fcb94 --- /dev/null +++ b/htrace-flume/src/main/java/org/apache/htrace/avro/htrace_avro.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.htrace.avro; + +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public interface htrace_avro { + public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"htrace_avro\",\"namespace\":\"org.apache.htrace.avro\",\"types\":[{\"type\":\"record\",\"name\":\"TimelineAnnotation\",\"fields\":[{\"name\":\"time\",\"type\":\"long\"},{\"name\":\"message\",\"type\":[\"null\",\"string\"]}]},{\"type\":\"record\",\"name\":\"KVAnnotation\",\"fields\":[{\"name\":\"key\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"value\",\"type\":[\"null\",\"bytes\"]}]},{\"type\":\"record\",\"name\":\"Span\",\"fields\":[{\"name\":\"trace_id\",\"type\":\"long\"},{\"name\":\"parent_id\",\"type\":\"long\"},{\"name\":\"span_id\",\"type\":\"long\"},{\"name\":\"start\",\"type\":\"long\"},{\"name\":\"stop\",\"type\":\"long\"},{\"name\":\"process_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"description\",\"type\":[\"null\",\"string\"]},{\"name\":\"timeline_annotations\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"TimelineAnnotation\"}]},{\"name\":\"kv_annotations\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"KVAnnotation\"}]}]}],\"messages\":{}}"); + + @SuppressWarnings("all") + public interface Callback extends htrace_avro { + public static final org.apache.avro.Protocol PROTOCOL = org.apache.htrace.avro.htrace_avro.PROTOCOL; + } +} diff --git a/htrace-flume/src/main/java/org/apache/htrace/impl/AvroConverter.java b/htrace-flume/src/main/java/org/apache/htrace/impl/AvroConverter.java new file mode 100644 index 0000000..05b491d --- /dev/null +++ b/htrace-flume/src/main/java/org/apache/htrace/impl/AvroConverter.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.impl; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.htrace.avro.Span; +import org.apache.htrace.avro.TimelineAnnotation; +import org.apache.htrace.avro.KVAnnotation; + +public class AvroConverter { + + // Convert an HTrace span to an Avro span + private static Span convertToAvroSpan(org.apache.htrace.Span span) { + Span avroSpan = new Span(); + avroSpan.setTraceId(span.getTraceId()); + avroSpan.setParentId(span.getParentId()); + avroSpan.setSpanId(span.getSpanId()); + avroSpan.setStart(span.getStartTimeMillis()); + avroSpan.setStop(span.getStopTimeMillis()); + avroSpan.setProcessId(span.getProcessId()); + avroSpan.setDescription(span.getDescription()); + + List tla = span.getTimelineAnnotations(); + if (tla.size() > 0) { + ArrayList avroTla = new ArrayList(tla.size()); + for (org.apache.htrace.TimelineAnnotation annotation : tla) { + TimelineAnnotation avroAnnotation = new TimelineAnnotation(); + avroAnnotation.setTime(annotation.getTime()); + avroAnnotation.setMessage(annotation.getMessage()); + avroTla.add(avroAnnotation); + } + avroSpan.setTimelineAnnotations(avroTla); + } + + Map kva = span.getKVAnnotations(); + if (kva.size() > 0) { + ArrayList avroKva = new ArrayList(kva.size()); + Iterator> it = kva.entrySet().iterator(); + while (it.hasNext()) { + Entry annotation = (Entry)it.next(); + KVAnnotation avroAnnotation = new KVAnnotation(); + avroAnnotation.setKey(ByteBuffer.wrap(annotation.getKey())); + avroAnnotation.setValue(ByteBuffer.wrap(annotation.getValue())); + avroKva.add(avroAnnotation); + } + avroSpan.setKvAnnotations(avroKva); + } + return avroSpan; + } + + // Serialize + public static byte[] serialize(org.apache.htrace.Span span, FlumeSpanReceiver.SerializationFormat format) throws IOException { + + Span avroSpan = convertToAvroSpan(span); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Encoder encoder = null; + switch (format) { + case Json: + encoder = EncoderFactory.get().jsonEncoder(avroSpan.getSchema(), out); + break; + case Avro: + default: + encoder = EncoderFactory.get().binaryEncoder(out, null); + break; + } + DatumWriter writer = new SpecificDatumWriter(Span.getClassSchema()); + writer.write(avroSpan, encoder); + encoder.flush(); + // since we use memory stream, no need to close in finally + out.close(); + + return out.toByteArray(); + } + + // Convert an Avro span to an HTrace span + private static org.apache.htrace.Span convertToHTraceSpan(Span avroSpan) { + org.apache.htrace.Span span = new org.apache.htrace.impl.MilliSpan( + avroSpan.getDescription().toString(), + avroSpan.getTraceId(), + avroSpan.getParentId(), + avroSpan.getSpanId(), + avroSpan.getProcessId().toString() + ); + + // org.apache.htrace.Span API does not allow setting Timeline Annotations directly + /* + List avroTla = avroSpan.getTimelineAnnotations(); + if (avroTla != null && avroTla.size() > 0) { + ArrayList tla = new ArrayList(avroTla.size()); + for (TimelineAnnotation avroAnnotation : avroTla) { + org.apache.htrace.TimelineAnnotation annotation = new org.apache.htrace.TimelineAnnotation( + avroAnnotation.getTime(), + avroAnnotation.getMessage().toString() + ); + tla.add(annotation); + } + span.setTimelineAnnotations(tla); + } + */ + + List avroKva = avroSpan.getKvAnnotations(); + if (avroKva != null) { + for (KVAnnotation avroAnnotation : avroKva) { + span.addKVAnnotation(avroAnnotation.getKey().array(), avroAnnotation.getValue().array()); + } + } + return span; + } + + // Deserialize + public static org.apache.htrace.Span deserialize(byte[] bytes, FlumeSpanReceiver.SerializationFormat format) throws IOException { + Decoder decoder = null; + switch (format) { + case Json: + decoder = DecoderFactory.get().jsonDecoder(Span.getClassSchema(), new ByteArrayInputStream(bytes)); + break; + case Avro: + default: + decoder = DecoderFactory.get().binaryDecoder(bytes, null); + break; + } + SpecificDatumReader reader = new SpecificDatumReader(Span.class); + Span avroSpan = reader.read(null, decoder); + org.apache.htrace.Span span = convertToHTraceSpan(avroSpan); + return span; + } +} diff --git a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java new file mode 100644 index 0000000..96dc848 --- /dev/null +++ b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.flume.api.RpcClient; +import org.apache.flume.api.RpcClientFactory; +import org.apache.flume.event.EventBuilder; +import org.apache.htrace.HTraceConfiguration; +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiver; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class FlumeSpanReceiver implements SpanReceiver { + private static final Log LOG = LogFactory.getLog(FlumeSpanReceiver.class); + + public static final String NUM_THREADS_KEY = "htrace.flume.num-threads"; + public static final int DEFAULT_NUM_THREADS = 1; + public static final String FLUME_HOSTNAME_KEY = "htrace.flume.hostname"; + public static final String DEFAULT_FLUME_HOSTNAME = "localhost"; + public static final String FLUME_PORT_KEY = "htrace.flume.port"; + public static final String FLUME_BATCHSIZE_KEY = "htrace.flume.batchsize"; + public static final int DEFAULT_FLUME_BATCHSIZE = 100; + public enum SerializationFormat { + Avro, + Json + }; + public static final String FLUME_FORMAT_KEY = "htrace.flume.format"; + public static final String DEFAULT_FLUME_FORMAT = SerializationFormat.Avro.name(); + + /** + * How long this receiver will try and wait for all threads to shutdown. + */ + private static final int SHUTDOWN_TIMEOUT = 30; + + /** + * How many errors in a row before we start dropping traces on the floor. + */ + private static final int MAX_ERRORS = 10; + + /** + * The queue that will get all HTrace spans that are to be sent. + */ + private final BlockingQueue queue; + + /** + * Boolean used to signal that the threads should end. + */ + private final AtomicBoolean running = new AtomicBoolean(true); + + /** + * The thread factory used to create new ExecutorService. + *

+ * This will be the same factory for the lifetime of this object so that + * no thread names will ever be duplicated. + */ + private final ThreadFactory tf; + + //////////////////// + /// Variables that will change on each call to configure() + /////////////////// + private ExecutorService service; + private int maxSpanBatchSize; + private String flumeHostName; + private int flumePort; + private SerializationFormat format; + + public FlumeSpanReceiver(HTraceConfiguration conf) { + this.queue = new ArrayBlockingQueue(1000); + this.tf = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("flumeSpanReceiver-%d") + .build(); + configure(conf); + } + + private void configure (HTraceConfiguration conf) { + + // Read configuration + int numThreads = conf.getInt(NUM_THREADS_KEY, DEFAULT_NUM_THREADS); + this.flumeHostName = conf.get(FLUME_HOSTNAME_KEY, DEFAULT_FLUME_HOSTNAME); + this.flumePort = conf.getInt(FLUME_PORT_KEY, 0); + if (this.flumePort == 0) { + throw new IllegalArgumentException(FLUME_PORT_KEY + " is required in configuration."); + } + this.maxSpanBatchSize = conf.getInt(FLUME_BATCHSIZE_KEY, DEFAULT_FLUME_BATCHSIZE); + String format_string = conf.get(FLUME_FORMAT_KEY, DEFAULT_FLUME_FORMAT); + if (format_string.compareToIgnoreCase(SerializationFormat.Json.name()) == 0) { + this.format = SerializationFormat.Json; + } else { + if (format_string.compareToIgnoreCase(SerializationFormat.Avro.name()) != 0) { + LOG.warn("Unknown serialization format:" + format_string + ", default to Avro."); + } + this.format = SerializationFormat.Avro; + } + + // Initialize executors + // If there are already threads running tear them down. + if (this.service != null) { + this.service.shutdownNow(); + this.service = null; + } + this.service = Executors.newFixedThreadPool(numThreads, tf); + for (int i = 0; i < numThreads; i++) { + this.service.submit(new WriteSpanRunnable()); + } + } + + private class WriteSpanRunnable implements Runnable { + private RpcClient flumeClient = null; + + /** + * This runnable sends a HTrace span to the Flume. + */ + @Override + public void run() { + List dequeuedSpans = new ArrayList(maxSpanBatchSize); + long errorCount = 0; + + while (running.get() || queue.size() > 0) { + Span firstSpan = null; + try { + // Block for up to a second. to try and get a span. + // We only block for a little bit in order to notice + // if the running value has changed + firstSpan = queue.poll(1, TimeUnit.SECONDS); + + // If the poll was successful then it's possible that there + // will be other spans to get. Try and get them. + if (firstSpan != null) { + // Add the first one that we got + dequeuedSpans.add(firstSpan); + // Try and get up to 100 queues + queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1); + } + } catch (InterruptedException ie) { + // Ignored. + } + + startClient(); + if (dequeuedSpans.isEmpty()) { + continue; + } + + try { + List events = new ArrayList(dequeuedSpans.size()); + for (Span span : dequeuedSpans) { + + // Headers allow Flume to filter + Map headers = new HashMap(); + headers.put("TraceId", Long.toString(span.getTraceId())); + headers.put("SpanId", Long.toString(span.getSpanId())); + headers.put("ProcessId", span.getProcessId()); + headers.put("Description", span.getDescription()); + + byte[] body = AvroConverter.serialize(span, format); + + Event evt = EventBuilder.withBody(body, headers); + events.add(evt); + } + flumeClient.appendBatch(events); + + // clear the list for the next time through. + dequeuedSpans.clear(); + // reset the error counter. + errorCount = 0; + } catch (Exception e) { + errorCount += 1; + // If there have been ten errors in a row start dropping things. + if (errorCount < MAX_ERRORS) { + try { + queue.addAll(dequeuedSpans); + } catch (IllegalStateException ex) { + LOG.error("Drop " + dequeuedSpans.size() + + " span(s) because writing to HBase failed."); + } + } + closeClient(); + try { + // Since there was an error sleep just a little bit to try and allow the + // HBase some time to recover. + Thread.sleep(500); + } catch (InterruptedException e1) { + // Ignored + } + } + } + closeClient(); + } + + /** + * Close Flume RPC client + */ + private void closeClient() { + if (flumeClient != null) { + try { + flumeClient.close(); + } catch (FlumeException ex) { + LOG.warn("Error while trying to close Flume Rpc Client.", ex); + } finally { + flumeClient = null; + } + } + } + + /** + * Create / reconnect Flume RPC client + */ + private void startClient() { + // If current client is inactive, close it + if (flumeClient != null && !flumeClient.isActive()) { + flumeClient.close(); + flumeClient = null; + } + // Create client if needed + if (flumeClient == null) { + try { + flumeClient = RpcClientFactory.getDefaultInstance(flumeHostName, flumePort, maxSpanBatchSize); + } catch (FlumeException e) { + LOG.warn("Failed to create Flume RPC Client. " + e.getMessage()); + } + } + } + } + + /** + * Close the receiver. + *

+ * This tries to shutdown thread pool. + * + * @throws IOException + */ + @Override + public void close() throws IOException { + running.set(false); + service.shutdown(); + try { + if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) { + LOG.error("Was not able to process all remaining spans upon closing in: " + + SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS + + ". Left Spans could be dropped."); + } + } catch (InterruptedException e1) { + LOG.warn("Thread interrupted when terminating executor.", e1); + } + } + + @Override + public void receiveSpan(Span span) { + if (running.get()) { + try { + this.queue.add(span); + } catch (IllegalStateException e) { + LOG.error("Error trying to append span (" + + span.getDescription() + + ") to the queue. Blocking Queue was full."); + } + } + } +} diff --git a/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java new file mode 100644 index 0000000..5175be5 --- /dev/null +++ b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.Server; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.apache.flume.api.RpcTestUtils; +import org.apache.flume.source.avro.AvroFlumeEvent; +import org.apache.flume.source.avro.AvroSourceProtocol; +import org.apache.flume.source.avro.Status; +import org.apache.htrace.HTraceConfiguration; +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiver; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceCreator; +import org.apache.htrace.impl.FlumeSpanReceiver.SerializationFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestFlumeSpanReceiver { + private static final Log LOG = LogFactory.getLog(TestFlumeSpanReceiver.class); + + private static final String ROOT_SPAN_DESC = "ROOT"; + + private SpanReceiver spanReceiver; + private Server flumeServer; + private TraceCreator traceCreator; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + public ArrayList addSimpleTraces() throws FlumeException, + EventDeliveryException, IOException { + ArrayList spans = new ArrayList(); + Span rootSpan = new MilliSpan(ROOT_SPAN_DESC, 1, Span.ROOT_SPAN_ID, 100, "test"); + Span innerOne = rootSpan.child("Some good work"); + Span innerTwo = innerOne.child("Some more good work"); + innerTwo.stop(); + spans.add(innerTwo); + innerOne.stop(); + spans.add(innerOne); + rootSpan.addKVAnnotation("foo".getBytes(), "bar".getBytes()); + rootSpan.addTimelineAnnotation("timeline"); + rootSpan.stop(); + spans.add(rootSpan); + return spans; + } + + @Test + public void testAvroConverter() throws FlumeException, + EventDeliveryException, IOException { + AvroHandler avroHandler = null; + ArrayList spans = null; + try { + avroHandler = new AvroHandler(); + startReceiver(SerializationFormat.Avro, null, avroHandler); + spans = addSimpleTraces(); + } finally { + stopReceiver(); + } + List events = avroHandler.getAllEvents(); + verifySpans(spans, events, SerializationFormat.Avro); + } + + @Test + public void testJsonConverter() throws FlumeException, + EventDeliveryException, IOException { + AvroHandler avroHandler = null; + List spans = null; + try { + avroHandler = new AvroHandler(); + startReceiver(SerializationFormat.Json, null, avroHandler); + spans = addSimpleTraces(); + } finally { + stopReceiver(); + } + List events = avroHandler.getAllEvents(); + verifySpans(spans, events, SerializationFormat.Json); + } + + @Test + public void testConcurrency() throws FlumeException, + EventDeliveryException, IOException { + try { + Map extraConf = new HashMap(); + extraConf.put(FlumeSpanReceiver.NUM_THREADS_KEY, "5"); + startReceiver(SerializationFormat.Avro, extraConf, new RpcTestUtils.OKAvroHandler()); + traceCreator.createThreadedTrace(); + } finally { + stopReceiver(); + } + } + + @Test + public void testResilience() throws FlumeException, + EventDeliveryException, IOException { + try { + startReceiver(SerializationFormat.Avro, null, new RpcTestUtils.FailedAvroHandler()); + traceCreator.createThreadedTrace(); + } finally { + stopReceiver(); + } + } + + private void startReceiver(SerializationFormat format, + Map extraConf, AvroSourceProtocol avroHandler) { + // Start Flume server + Assert.assertNull(flumeServer); + flumeServer = RpcTestUtils.startServer(avroHandler); + + // Create and configure span receiver + Map conf = new HashMap(); + conf.put(FlumeSpanReceiver.FLUME_HOSTNAME_KEY, "127.0.0.1"); + conf.put(FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort())); + conf.put(FlumeSpanReceiver.FLUME_FORMAT_KEY, format.name()); + if (extraConf != null) { + conf.putAll(extraConf); + } + + spanReceiver = new FlumeSpanReceiver(HTraceConfiguration.fromMap(conf)); + + // Create trace creator, it will register our receiver + traceCreator = new TraceCreator(spanReceiver); + } + + private void stopReceiver() throws IOException { + // Close span receiver + if (spanReceiver != null) { + Trace.removeReceiver(spanReceiver); + spanReceiver.close(); + spanReceiver = null; + } + + // Close Flume server + if (flumeServer != null) { + RpcTestUtils.stopServer(flumeServer); + flumeServer = null; + } + } + + private static class AvroHandler implements AvroSourceProtocol { + private ArrayList all_events = new ArrayList(); + + public List getAllEvents() { + return new ArrayList(all_events); + } + + @Override + public Status append(AvroFlumeEvent event) throws AvroRemoteException { + all_events.add(event); + return Status.OK; + } + + @Override + public Status appendBatch(List events) throws + AvroRemoteException { + all_events.addAll(events); + return Status.OK; + } + } + + private void verifySpanEqual(Span s1, Span s2) { + Assert.assertEquals(s1.getTraceId(), s2.getTraceId()); + Assert.assertEquals(s1.getParentId(), s2.getParentId()); + Assert.assertEquals(s1.getSpanId(), s2.getSpanId()); + Assert.assertEquals(s1.getProcessId(), s2.getProcessId()); + Assert.assertEquals(s1.getDescription(), s2.getDescription()); + Assert.assertEquals(s1.getKVAnnotations().size(), s2.getKVAnnotations().size()); + + // Cannot verify these fields because they cannot be deserialized. See AvroConverter.deserialize. + // Assert.assertEquals(s1.getStartTimeMillis(), s2.getStartTimeMillis()); + // Assert.assertEquals(s1.getStopTimeMillis(), s2.getStopTimeMillis()); + // Assert.assertEquals(s1.getTimelineAnnotations().size(), s2.getTimelineAnnotations().size()); + } + + private void verifySpans(List spans, List events, SerializationFormat format) throws IOException { + Assert.assertEquals(spans.size(), events.size()); + for (int i = 0; i < spans.size(); i ++) { + Span s1 = AvroConverter.deserialize(events.get(i).getBody().array(), format); + verifySpanEqual(spans.get(i), s1); + } + } +} diff --git a/pom.xml b/pom.xml index b14c6b8..ee3135e 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,7 @@ language governing permissions and limitations under the License. --> htrace-core htrace-zipkin htrace-hbase + htrace-flume