diff --git a/htrace-hbase/pom.xml b/htrace-hbase/pom.xml
index 773212e..3af11b2 100644
--- a/htrace-hbase/pom.xml
+++ b/htrace-hbase/pom.xml
@@ -31,10 +31,8 @@ language governing permissions and limitations under the License. -->
UTF-8
- 0.99.2
-
- 2.4.0
-
+ 1.0.0
+ 2.5.1
2.5.0
@@ -154,6 +152,17 @@ language governing permissions and limitations under the License. -->
${hbase.version}
test
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ org.apache.htrace
+ htrace-core
+
+
+
diff --git a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
index 2faf4bb..6ac1cbb 100644
--- a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
+++ b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
@@ -160,6 +160,9 @@ public class HBaseSpanReceiver implements SpanReceiver {
private class WriteSpanRunnable implements Runnable {
private Connection hconnection;
private Table htable;
+ private SpanProtos.Span.Builder sbuilder = SpanProtos.Span.newBuilder();
+ private SpanProtos.TimelineAnnotation.Builder tlbuilder =
+ SpanProtos.TimelineAnnotation.newBuilder();
public WriteSpanRunnable() {
}
@@ -169,10 +172,8 @@ public class HBaseSpanReceiver implements SpanReceiver {
*/
@Override
public void run() {
- SpanProtos.Span.Builder sbuilder = SpanProtos.Span.newBuilder();
- SpanProtos.TimelineAnnotation.Builder tlbuilder =
- SpanProtos.TimelineAnnotation.newBuilder();
List dequeuedSpans = new ArrayList(maxSpanBatchSize);
+ List puts = new ArrayList(maxSpanBatchSize);
long errorCount = 0;
while (running.get() || queue.size() > 0) {
@@ -191,62 +192,15 @@ public class HBaseSpanReceiver implements SpanReceiver {
// Try and get up to 100 queues
queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1);
}
+ for (Span span : dequeuedSpans) {
+ puts.add(createPut(span));
+ }
} catch (InterruptedException ie) {
// Ignored.
}
startClient();
- if (dequeuedSpans.isEmpty()) {
- try {
- this.htable.flushCommits();
- } catch (IOException e) {
- LOG.error("failed to flush writes to HBase.");
- closeClient();
- }
- continue;
- }
-
try {
- for (Span span : dequeuedSpans) {
- sbuilder.clear()
- .setTraceId(span.getTraceId())
- .setStart(span.getStartTimeMillis())
- .setStop(span.getStopTimeMillis())
- .setSpanId(span.getSpanId())
- .setProcessId(span.getProcessId())
- .setDescription(span.getDescription());
-
- if (span.getParents().length == 0) {
- sbuilder.setParentId(0);
- } else if (span.getParents().length > 0) {
- sbuilder.setParentId(span.getParents()[0]);
- if (span.getParents().length > 1) {
- LOG.error("error: HBaseSpanReceiver does not support spans " +
- "with multiple parents. Ignoring multiple parents for " +
- span);
- }
- }
- for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
- sbuilder.addTimeline(tlbuilder.clear()
- .setTime(ta.getTime())
- .setMessage(ta.getMessage())
- .build());
- }
- Put put = new Put(Bytes.toBytes(span.getTraceId()));
- put.add(HBaseSpanReceiver.this.cf,
- sbuilder.build().toByteArray(),
- null);
- if (span.getParents().length == 0) {
- put.add(HBaseSpanReceiver.this.icf,
- INDEX_TIME_QUAL,
- Bytes.toBytes(span.getStartTimeMillis()));
- put.add(HBaseSpanReceiver.this.icf,
- INDEX_SPAN_QUAL,
- sbuilder.build().toByteArray());
- }
- this.htable.put(put);
- }
- // clear the list for the next time through.
- dequeuedSpans.clear();
+ this.htable.put(puts);
// reset the error counter.
errorCount = 0;
} catch (Exception e) {
@@ -268,6 +222,10 @@ public class HBaseSpanReceiver implements SpanReceiver {
} catch (InterruptedException e1) {
// Ignored
}
+ } finally {
+ // clear the list for the next time through.
+ dequeuedSpans.clear();
+ puts.clear();
}
}
closeClient();
@@ -305,6 +263,45 @@ public class HBaseSpanReceiver implements SpanReceiver {
}
}
}
+
+ private Put createPut(Span span) {
+ sbuilder.clear()
+ .setTraceId(span.getTraceId())
+ .setStart(span.getStartTimeMillis())
+ .setStop(span.getStopTimeMillis())
+ .setSpanId(span.getSpanId())
+ .setProcessId(span.getProcessId())
+ .setDescription(span.getDescription());
+ if (span.getParents().length == 0) {
+ sbuilder.setParentId(0);
+ } else if (span.getParents().length > 0) {
+ sbuilder.setParentId(span.getParents()[0]);
+ if (span.getParents().length > 1) {
+ LOG.error("error: HBaseSpanReceiver does not support spans " +
+ "with multiple parents. Ignoring multiple parents for " +
+ span);
+ }
+ }
+ for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
+ sbuilder.addTimeline(tlbuilder.clear()
+ .setTime(ta.getTime())
+ .setMessage(ta.getMessage())
+ .build());
+ }
+ Put put = new Put(Bytes.toBytes(span.getTraceId()));
+ put.add(HBaseSpanReceiver.this.cf,
+ sbuilder.build().toByteArray(),
+ null);
+ if (span.getParents().length == 0) {
+ put.add(HBaseSpanReceiver.this.icf,
+ INDEX_TIME_QUAL,
+ Bytes.toBytes(span.getStartTimeMillis()));
+ put.add(HBaseSpanReceiver.this.icf,
+ INDEX_SPAN_QUAL,
+ sbuilder.build().toByteArray());
+ }
+ return put;
+ }
}
/**