diff --git a/htrace-hbase/pom.xml b/htrace-hbase/pom.xml
index fc8b188..ce7da56 100644
--- a/htrace-hbase/pom.xml
+++ b/htrace-hbase/pom.xml
@@ -31,7 +31,7 @@ language governing permissions and limitations under the License. -->
UTF-8
- 0.99.2
+ 1.1.2
9.2.13.v20150730
true
@@ -148,6 +148,7 @@ language governing permissions and limitations under the License. -->
commons-logging
commons-logging
+ provided
junit
diff --git a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
index aa471ab..f2537f9 100644
--- a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
+++ b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java
@@ -35,10 +35,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.HBaseHTraceConfiguration;
import org.apache.htrace.core.HTraceConfiguration;
@@ -119,7 +119,6 @@ public class HBaseSpanReceiver extends SpanReceiver {
};
private ExecutorService service;
- private final HTraceConfiguration conf;
private final Configuration hconf;
private final byte[] table;
private final byte[] cf;
@@ -128,7 +127,6 @@ public class HBaseSpanReceiver extends SpanReceiver {
public HBaseSpanReceiver(HTraceConfiguration conf) {
this.queue = new ArrayBlockingQueue(1000);
- this.conf = conf;
this.hconf = HBaseConfiguration.create();
this.table = Bytes.toBytes(conf.get(TABLE_KEY, DEFAULT_TABLE));
this.cf = Bytes.toBytes(conf.get(COLUMNFAMILY_KEY, DEFAULT_COLUMNFAMILY));
@@ -156,7 +154,7 @@ public class HBaseSpanReceiver extends SpanReceiver {
private class WriteSpanRunnable implements Runnable {
private Connection hconnection;
- private Table htable;
+ private BufferedMutator mutator;
public WriteSpanRunnable() {
}
@@ -194,14 +192,15 @@ public class HBaseSpanReceiver extends SpanReceiver {
startClient();
if (dequeuedSpans.isEmpty()) {
try {
- this.htable.flushCommits();
+ this.mutator.flush();
} catch (IOException e) {
- LOG.error("failed to flush writes to HBase.");
+ LOG.error("Failed to flush writes to HBase.");
closeClient();
}
continue;
}
+
try {
for (Span span : dequeuedSpans) {
sbuilder.clear()
@@ -229,18 +228,14 @@ public class HBaseSpanReceiver extends SpanReceiver {
.build());
}
Put put = new Put(Bytes.toBytes(span.getSpanId().getHigh()));
- put.add(HBaseSpanReceiver.this.cf,
- sbuilder.build().toByteArray(),
- null);
+ put.addColumn(HBaseSpanReceiver.this.cf, sbuilder.build().toByteArray(), null);
if (span.getParents().length == 0) {
- put.add(HBaseSpanReceiver.this.icf,
- INDEX_TIME_QUAL,
- Bytes.toBytes(span.getStartTimeMillis()));
- put.add(HBaseSpanReceiver.this.icf,
- INDEX_SPAN_QUAL,
- sbuilder.build().toByteArray());
+ put.addColumn(HBaseSpanReceiver.this.icf, INDEX_TIME_QUAL,
+ Bytes.toBytes(span.getStartTimeMillis()));
+ put.addColumn(HBaseSpanReceiver.this.icf, INDEX_SPAN_QUAL,
+ sbuilder.build().toByteArray());
}
- this.htable.put(put);
+ this.mutator.mutate(put);
}
// clear the list for the next time through.
dequeuedSpans.clear();
@@ -276,9 +271,9 @@ public class HBaseSpanReceiver extends SpanReceiver {
private void closeClient() {
// close out the transport.
try {
- if (this.htable != null) {
- this.htable.close();
- this.htable = null;
+ if (this.mutator != null) {
+ this.mutator.close();
+ this.mutator = null;
}
if (this.hconnection != null) {
this.hconnection.close();
@@ -293,10 +288,10 @@ public class HBaseSpanReceiver extends SpanReceiver {
* Re-connect to HBase
*/
private void startClient() {
- if (this.htable == null) {
+ if (this.mutator == null) {
try {
hconnection = ConnectionFactory.createConnection(hconf);
- htable = hconnection.getTable(TableName.valueOf(table));
+ mutator = hconnection.getBufferedMutator(TableName.valueOf(table));
} catch (IOException e) {
LOG.warn("Failed to create HBase connection. " + e.getMessage());
}