diff --git a/htrace-hbase/pom.xml b/htrace-hbase/pom.xml index fc8b188..ce7da56 100644 --- a/htrace-hbase/pom.xml +++ b/htrace-hbase/pom.xml @@ -31,7 +31,7 @@ language governing permissions and limitations under the License. --> UTF-8 - 0.99.2 + 1.1.2 9.2.13.v20150730 true @@ -148,6 +148,7 @@ language governing permissions and limitations under the License. --> commons-logging commons-logging + provided junit diff --git a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java index aa471ab..f2537f9 100644 --- a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java +++ b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java @@ -35,10 +35,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.htrace.HBaseHTraceConfiguration; import org.apache.htrace.core.HTraceConfiguration; @@ -119,7 +119,6 @@ public class HBaseSpanReceiver extends SpanReceiver { }; private ExecutorService service; - private final HTraceConfiguration conf; private final Configuration hconf; private final byte[] table; private final byte[] cf; @@ -128,7 +127,6 @@ public class HBaseSpanReceiver extends SpanReceiver { public HBaseSpanReceiver(HTraceConfiguration conf) { this.queue = new ArrayBlockingQueue(1000); - this.conf = conf; this.hconf = HBaseConfiguration.create(); this.table = Bytes.toBytes(conf.get(TABLE_KEY, DEFAULT_TABLE)); this.cf = Bytes.toBytes(conf.get(COLUMNFAMILY_KEY, DEFAULT_COLUMNFAMILY)); @@ -156,7 +154,7 @@ public class HBaseSpanReceiver extends SpanReceiver { private class WriteSpanRunnable implements Runnable { private Connection hconnection; - private Table htable; + private BufferedMutator mutator; public WriteSpanRunnable() { } @@ -194,14 +192,15 @@ public class HBaseSpanReceiver extends SpanReceiver { startClient(); if (dequeuedSpans.isEmpty()) { try { - this.htable.flushCommits(); + this.mutator.flush(); } catch (IOException e) { - LOG.error("failed to flush writes to HBase."); + LOG.error("Failed to flush writes to HBase."); closeClient(); } continue; } + try { for (Span span : dequeuedSpans) { sbuilder.clear() @@ -229,18 +228,14 @@ public class HBaseSpanReceiver extends SpanReceiver { .build()); } Put put = new Put(Bytes.toBytes(span.getSpanId().getHigh())); - put.add(HBaseSpanReceiver.this.cf, - sbuilder.build().toByteArray(), - null); + put.addColumn(HBaseSpanReceiver.this.cf, sbuilder.build().toByteArray(), null); if (span.getParents().length == 0) { - put.add(HBaseSpanReceiver.this.icf, - INDEX_TIME_QUAL, - Bytes.toBytes(span.getStartTimeMillis())); - put.add(HBaseSpanReceiver.this.icf, - INDEX_SPAN_QUAL, - sbuilder.build().toByteArray()); + put.addColumn(HBaseSpanReceiver.this.icf, INDEX_TIME_QUAL, + Bytes.toBytes(span.getStartTimeMillis())); + put.addColumn(HBaseSpanReceiver.this.icf, INDEX_SPAN_QUAL, + sbuilder.build().toByteArray()); } - this.htable.put(put); + this.mutator.mutate(put); } // clear the list for the next time through. dequeuedSpans.clear(); @@ -276,9 +271,9 @@ public class HBaseSpanReceiver extends SpanReceiver { private void closeClient() { // close out the transport. try { - if (this.htable != null) { - this.htable.close(); - this.htable = null; + if (this.mutator != null) { + this.mutator.close(); + this.mutator = null; } if (this.hconnection != null) { this.hconnection.close(); @@ -293,10 +288,10 @@ public class HBaseSpanReceiver extends SpanReceiver { * Re-connect to HBase */ private void startClient() { - if (this.htable == null) { + if (this.mutator == null) { try { hconnection = ConnectionFactory.createConnection(hconf); - htable = hconnection.getTable(TableName.valueOf(table)); + mutator = hconnection.getBufferedMutator(TableName.valueOf(table)); } catch (IOException e) { LOG.warn("Failed to create HBase connection. " + e.getMessage()); }