diff --git a/htrace-hbase/pom.xml b/htrace-hbase/pom.xml index 773212e..3af11b2 100644 --- a/htrace-hbase/pom.xml +++ b/htrace-hbase/pom.xml @@ -31,10 +31,8 @@ language governing permissions and limitations under the License. --> UTF-8 - 0.99.2 - - 2.4.0 - + 1.0.0 + 2.5.1 2.5.0 @@ -154,6 +152,17 @@ language governing permissions and limitations under the License. --> ${hbase.version} test + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.htrace + htrace-core + + + diff --git a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java index 2faf4bb..6ac1cbb 100644 --- a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java +++ b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java @@ -160,6 +160,9 @@ public class HBaseSpanReceiver implements SpanReceiver { private class WriteSpanRunnable implements Runnable { private Connection hconnection; private Table htable; + private SpanProtos.Span.Builder sbuilder = SpanProtos.Span.newBuilder(); + private SpanProtos.TimelineAnnotation.Builder tlbuilder = + SpanProtos.TimelineAnnotation.newBuilder(); public WriteSpanRunnable() { } @@ -169,10 +172,8 @@ public class HBaseSpanReceiver implements SpanReceiver { */ @Override public void run() { - SpanProtos.Span.Builder sbuilder = SpanProtos.Span.newBuilder(); - SpanProtos.TimelineAnnotation.Builder tlbuilder = - SpanProtos.TimelineAnnotation.newBuilder(); List dequeuedSpans = new ArrayList(maxSpanBatchSize); + List puts = new ArrayList(maxSpanBatchSize); long errorCount = 0; while (running.get() || queue.size() > 0) { @@ -191,62 +192,15 @@ public class HBaseSpanReceiver implements SpanReceiver { // Try and get up to 100 queues queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1); } + for (Span span : dequeuedSpans) { + puts.add(createPut(span)); + } } catch (InterruptedException ie) { // Ignored. } startClient(); - if (dequeuedSpans.isEmpty()) { - try { - this.htable.flushCommits(); - } catch (IOException e) { - LOG.error("failed to flush writes to HBase."); - closeClient(); - } - continue; - } - try { - for (Span span : dequeuedSpans) { - sbuilder.clear() - .setTraceId(span.getTraceId()) - .setStart(span.getStartTimeMillis()) - .setStop(span.getStopTimeMillis()) - .setSpanId(span.getSpanId()) - .setProcessId(span.getProcessId()) - .setDescription(span.getDescription()); - - if (span.getParents().length == 0) { - sbuilder.setParentId(0); - } else if (span.getParents().length > 0) { - sbuilder.setParentId(span.getParents()[0]); - if (span.getParents().length > 1) { - LOG.error("error: HBaseSpanReceiver does not support spans " + - "with multiple parents. Ignoring multiple parents for " + - span); - } - } - for (TimelineAnnotation ta : span.getTimelineAnnotations()) { - sbuilder.addTimeline(tlbuilder.clear() - .setTime(ta.getTime()) - .setMessage(ta.getMessage()) - .build()); - } - Put put = new Put(Bytes.toBytes(span.getTraceId())); - put.add(HBaseSpanReceiver.this.cf, - sbuilder.build().toByteArray(), - null); - if (span.getParents().length == 0) { - put.add(HBaseSpanReceiver.this.icf, - INDEX_TIME_QUAL, - Bytes.toBytes(span.getStartTimeMillis())); - put.add(HBaseSpanReceiver.this.icf, - INDEX_SPAN_QUAL, - sbuilder.build().toByteArray()); - } - this.htable.put(put); - } - // clear the list for the next time through. - dequeuedSpans.clear(); + this.htable.put(puts); // reset the error counter. errorCount = 0; } catch (Exception e) { @@ -268,6 +222,10 @@ public class HBaseSpanReceiver implements SpanReceiver { } catch (InterruptedException e1) { // Ignored } + } finally { + // clear the list for the next time through. + dequeuedSpans.clear(); + puts.clear(); } } closeClient(); @@ -305,6 +263,45 @@ public class HBaseSpanReceiver implements SpanReceiver { } } } + + private Put createPut(Span span) { + sbuilder.clear() + .setTraceId(span.getTraceId()) + .setStart(span.getStartTimeMillis()) + .setStop(span.getStopTimeMillis()) + .setSpanId(span.getSpanId()) + .setProcessId(span.getProcessId()) + .setDescription(span.getDescription()); + if (span.getParents().length == 0) { + sbuilder.setParentId(0); + } else if (span.getParents().length > 0) { + sbuilder.setParentId(span.getParents()[0]); + if (span.getParents().length > 1) { + LOG.error("error: HBaseSpanReceiver does not support spans " + + "with multiple parents. Ignoring multiple parents for " + + span); + } + } + for (TimelineAnnotation ta : span.getTimelineAnnotations()) { + sbuilder.addTimeline(tlbuilder.clear() + .setTime(ta.getTime()) + .setMessage(ta.getMessage()) + .build()); + } + Put put = new Put(Bytes.toBytes(span.getTraceId())); + put.add(HBaseSpanReceiver.this.cf, + sbuilder.build().toByteArray(), + null); + if (span.getParents().length == 0) { + put.add(HBaseSpanReceiver.this.icf, + INDEX_TIME_QUAL, + Bytes.toBytes(span.getStartTimeMillis())); + put.add(HBaseSpanReceiver.this.icf, + INDEX_SPAN_QUAL, + sbuilder.build().toByteArray()); + } + return put; + } } /**