From 4605341ab2918c8e89ef5ecec2377b834549bc3e Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 30 Jul 2013 11:16:51 -0700 Subject: [PATCH] Update HTrace to 2.0 and add more hooks --- hbase-client/pom.xml | 2 +- .../apache/hadoop/hbase/client/AsyncProcess.java | 7 +- .../org/apache/hadoop/hbase/ipc/RpcClient.java | 2 +- .../hbase/zookeeper/RecoverableZooKeeper.java | 436 +++++++++++--------- .../org/apache/hadoop/hbase/zookeeper/ZKUtil.java | 1 + hbase-it/pom.xml | 6 +- .../hadoop/hbase/mttr/IntegrationTestMTTR.java | 12 +- .../trace/IntegrationTestSendTraceRequests.java | 272 ++++++++++++ hbase-server/pom.xml | 6 +- .../apache/hadoop/hbase/executor/EventHandler.java | 9 +- .../hadoop/hbase/io/hfile/HFileReaderV2.java | 9 +- .../apache/hadoop/hbase/ipc/RequestContext.java | 12 + .../org/apache/hadoop/hbase/ipc/RpcServer.java | 20 +- .../org/apache/hadoop/hbase/master/HMaster.java | 3 +- .../hbase/master/handler/DisableTableHandler.java | 2 +- .../hbase/master/handler/EnableTableHandler.java | 2 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 2 +- .../hadoop/hbase/regionserver/HRegionServer.java | 8 + .../hadoop/hbase/regionserver/MemStoreFlusher.java | 7 + .../hadoop/hbase/regionserver/wal/FSHLog.java | 61 +-- .../hbase/trace/HBaseHTraceConfiguration.java | 30 ++ .../hbase/trace/HBaseLocalFileSpanReceiver.java | 89 ---- .../hadoop/hbase/trace/SpanReceiverHost.java | 56 ++- .../apache/hadoop/hbase/trace/TestHTraceHooks.java | 18 +- pom.xml | 9 +- 25 files changed, 727 insertions(+), 354 deletions(-) create mode 100644 hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/trace/HBaseLocalFileSpanReceiver.java diff --git hbase-client/pom.xml hbase-client/pom.xml index 3ddf2ae..ca23654 100644 --- hbase-client/pom.xml +++ hbase-client/pom.xml @@ -120,7 +120,7 @@ org.cloudera.htrace - htrace + htrace-core org.codehaus.jackson diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index d70a65d..bc53f81 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -29,9 +29,12 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.protobuf.generated.Tracing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; +import org.cloudera.htrace.Span; +import org.cloudera.htrace.Trace; import java.io.IOException; import java.io.InterruptedIOException; @@ -407,7 +410,7 @@ class AsyncProcess { incTaskCounters(regionName); - Runnable runnable = new Runnable() { + Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() { @Override public void run() { MultiResponse res; @@ -427,7 +430,7 @@ class AsyncProcess { decTaskCounters(regionName); } } - }; + }); try { this.pool.submit(runnable); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 9a64780..0410583 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -993,7 +993,7 @@ public class RpcClient { RequestHeader.Builder builder = RequestHeader.newBuilder(); builder.setCallId(call.id); if (Trace.isTracing()) { - Span s = Trace.currentTrace(); + Span s = Trace.currentSpan(); builder.setTraceInfo(RPCTInfo.newBuilder(). setParentId(s.getSpanId()).setTraceId(s.getTraceId())); } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 8ddac6e..37890d1 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -38,6 +38,8 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.proto.SetDataRequest; +import org.cloudera.htrace.Trace; +import org.cloudera.htrace.TraceScope; import java.io.IOException; import java.lang.management.ManagementFactory; @@ -146,36 +148,42 @@ public class RecoverableZooKeeper { */ public void delete(String path, int version) throws InterruptedException, KeeperException { - RetryCounter retryCounter = retryCounterFactory.create(); - boolean isRetry = false; // False for first attempt, true for all retries. - while (true) { - try { - zk.delete(path, version); - return; - } catch (KeeperException e) { - switch (e.code()) { - case NONODE: - if (isRetry) { - LOG.info("Node " + path + " already deleted. Assuming a " + - "previous attempt succeeded."); - return; - } - LOG.warn("Node " + path + " already deleted, retry=" + isRetry); - throw e; + TraceScope traceScope = null; + try { + traceScope = Trace.startSpan("RecoverableZookeeper.delete"); + RetryCounter retryCounter = retryCounterFactory.create(); + boolean isRetry = false; // False for first attempt, true for all retries. + while (true) { + try { + zk.delete(path, version); + return; + } catch (KeeperException e) { + switch (e.code()) { + case NONODE: + if (isRetry) { + LOG.info("Node " + path + " already deleted. Assuming a " + + "previous attempt succeeded."); + return; + } + LOG.warn("Node " + path + " already deleted, retry=" + isRetry); + throw e; - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case OPERATIONTIMEOUT: - retryOrThrow(retryCounter, e, "delete"); - break; + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "delete"); + break; - default: - throw e; + default: + throw e; + } } + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + isRetry = true; } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - isRetry = true; + } finally { + if (traceScope != null) traceScope.close(); } } @@ -185,24 +193,30 @@ public class RecoverableZooKeeper { */ public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - return zk.exists(path, watcher); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case OPERATIONTIMEOUT: - retryOrThrow(retryCounter, e, "exists"); - break; - - default: - throw e; + TraceScope traceScope = null; + try { + traceScope = Trace.startSpan("RecoverableZookeeper.exists"); + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.exists(path, watcher); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "exists"); + break; + + default: + throw e; + } } + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + } finally { + if (traceScope != null) traceScope.close(); } } @@ -212,24 +226,30 @@ public class RecoverableZooKeeper { */ public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - return zk.exists(path, watch); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case OPERATIONTIMEOUT: - retryOrThrow(retryCounter, e, "exists"); - break; - - default: - throw e; + TraceScope traceScope = null; + try { + traceScope = Trace.startSpan("RecoverableZookeeper.exists"); + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.exists(path, watch); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "exists"); + break; + + default: + throw e; + } } + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + } finally { + if (traceScope != null) traceScope.close(); } } @@ -249,24 +269,30 @@ public class RecoverableZooKeeper { */ public List getChildren(String path, Watcher watcher) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - return zk.getChildren(path, watcher); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case OPERATIONTIMEOUT: - retryOrThrow(retryCounter, e, "getChildren"); - break; - - default: - throw e; + TraceScope traceScope = null; + try { + traceScope = Trace.startSpan("RecoverableZookeeper.getChildren"); + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.getChildren(path, watcher); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "getChildren"); + break; + + default: + throw e; + } } + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + } finally { + if (traceScope != null) traceScope.close(); } } @@ -276,24 +302,30 @@ public class RecoverableZooKeeper { */ public List getChildren(String path, boolean watch) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - return zk.getChildren(path, watch); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case OPERATIONTIMEOUT: - retryOrThrow(retryCounter, e, "getChildren"); - break; - - default: - throw e; + TraceScope traceScope = null; + try { + traceScope = Trace.startSpan("RecoverableZookeeper.getChildren"); + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.getChildren(path, watch); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "getChildren"); + break; + + default: + throw e; + } } + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + } finally { + if (traceScope != null) traceScope.close(); } } @@ -303,25 +335,31 @@ public class RecoverableZooKeeper { */ public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - byte[] revData = zk.getData(path, watcher, stat); - return this.removeMetaData(revData); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case OPERATIONTIMEOUT: - retryOrThrow(retryCounter, e, "getData"); - break; - - default: - throw e; + TraceScope traceScope = null; + try { + traceScope = Trace.startSpan("RecoverableZookeeper.getData"); + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + byte[] revData = zk.getData(path, watcher, stat); + return this.removeMetaData(revData); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "getData"); + break; + + default: + throw e; + } } + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + } finally { + if (traceScope != null) traceScope.close(); } } @@ -331,25 +369,31 @@ public class RecoverableZooKeeper { */ public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - byte[] revData = zk.getData(path, watch, stat); - return this.removeMetaData(revData); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case OPERATIONTIMEOUT: - retryOrThrow(retryCounter, e, "getData"); - break; - - default: - throw e; + TraceScope traceScope = null; + try { + traceScope = Trace.startSpan("RecoverableZookeeper.getData"); + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + byte[] revData = zk.getData(path, watch, stat); + return this.removeMetaData(revData); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "getData"); + break; + + default: + throw e; + } } + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + } finally { + if (traceScope != null) traceScope.close(); } } @@ -361,42 +405,48 @@ public class RecoverableZooKeeper { */ public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - byte[] newData = appendMetaData(data); - boolean isRetry = false; - while (true) { - try { - return zk.setData(path, newData, version); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case OPERATIONTIMEOUT: - retryOrThrow(retryCounter, e, "setData"); - break; - case BADVERSION: - if (isRetry) { - // try to verify whether the previous setData success or not - try{ - Stat stat = new Stat(); - byte[] revData = zk.getData(path, false, stat); - if(Bytes.compareTo(revData, newData) == 0) { - // the bad version is caused by previous successful setData - return stat; + TraceScope traceScope = null; + try { + traceScope = Trace.startSpan("RecoverableZookeeper.setData"); + RetryCounter retryCounter = retryCounterFactory.create(); + byte[] newData = appendMetaData(data); + boolean isRetry = false; + while (true) { + try { + return zk.setData(path, newData, version); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "setData"); + break; + case BADVERSION: + if (isRetry) { + // try to verify whether the previous setData success or not + try{ + Stat stat = new Stat(); + byte[] revData = zk.getData(path, false, stat); + if(Bytes.compareTo(revData, newData) == 0) { + // the bad version is caused by previous successful setData + return stat; + } + } catch(KeeperException keeperException){ + // the ZK is not reliable at this moment. just throwing exception + throw keeperException; } - } catch(KeeperException keeperException){ - // the ZK is not reliable at this moment. just throwing exception - throw keeperException; } - } - // throw other exceptions and verified bad version exceptions - default: - throw e; + // throw other exceptions and verified bad version exceptions + default: + throw e; + } } + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + isRetry = true; } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - isRetry = true; + } finally { + if (traceScope != null) traceScope.close(); } } @@ -418,19 +468,25 @@ public class RecoverableZooKeeper { public String create(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { - byte[] newData = appendMetaData(data); - switch (createMode) { - case EPHEMERAL: - case PERSISTENT: - return createNonSequential(path, newData, acl, createMode); - - case EPHEMERAL_SEQUENTIAL: - case PERSISTENT_SEQUENTIAL: - return createSequential(path, newData, acl, createMode); - - default: - throw new IllegalArgumentException("Unrecognized CreateMode: " + - createMode); + TraceScope traceScope = null; + try { + traceScope = Trace.startSpan("RecoverableZookeeper.create"); + byte[] newData = appendMetaData(data); + switch (createMode) { + case EPHEMERAL: + case PERSISTENT: + return createNonSequential(path, newData, acl, createMode); + + case EPHEMERAL_SEQUENTIAL: + case PERSISTENT_SEQUENTIAL: + return createSequential(path, newData, acl, createMode); + + default: + throw new IllegalArgumentException("Unrecognized CreateMode: " + + createMode); + } + } finally { + if (traceScope != null) traceScope.close(); } } @@ -545,25 +601,31 @@ public class RecoverableZooKeeper { */ public List multi(Iterable ops) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - Iterable multiOps = prepareZKMulti(ops); - while (true) { - try { - return zk.multi(multiOps); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case OPERATIONTIMEOUT: - retryOrThrow(retryCounter, e, "multi"); - break; - - default: - throw e; + TraceScope traceScope = null; + try { + traceScope = Trace.startSpan("RecoverableZookeeper.multi"); + RetryCounter retryCounter = retryCounterFactory.create(); + Iterable multiOps = prepareZKMulti(ops); + while (true) { + try { + return zk.multi(multiOps); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case OPERATIONTIMEOUT: + retryOrThrow(retryCounter, e, "multi"); + break; + + default: + throw e; + } } - } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } finally { + if (traceScope != null) traceScope.close(); } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index de3eedd..4a46ffc 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -1548,6 +1548,7 @@ public class ZKUtil { // run sequentially processSequentially(zkw, ops); } + } private static void processSequentially(ZooKeeperWatcher zkw, List ops) diff --git hbase-it/pom.xml hbase-it/pom.xml index 8a535d5..52a97c1 100644 --- hbase-it/pom.xml +++ hbase-it/pom.xml @@ -210,7 +210,11 @@ org.cloudera.htrace - htrace + htrace-core + + + org.cloudera.htrace + htrace-zipkin diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java index a4feca5..da4d36d 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.util.LoadTestTool; import org.cloudera.htrace.Sampler; import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; +import org.cloudera.htrace.TraceScope; +import org.cloudera.htrace.impl.AlwaysSampler; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -361,9 +363,9 @@ public class IntegrationTestMTTR { // Keep trying until the rs is back up and we've gotten a put through while (numAfterDone < 10) { long start = System.nanoTime(); - Span span = null; + TraceScope scope = null; try { - span = Trace.startSpan(getSpanName(), Sampler.ALWAYS); + scope = Trace.startSpan(getSpanName(), AlwaysSampler.INSTANCE); boolean actionResult = doAction(); if (actionResult && future.isDone()) { numAfterDone ++; @@ -371,11 +373,11 @@ public class IntegrationTestMTTR { } catch (Exception e) { numAfterDone = 0; } finally { - if (span != null) { - span.stop(); + if (scope != null) { + scope.close(); } } - result.addResult(System.nanoTime() - start, span); + result.addResult(System.nanoTime() - start, scope.getSpan()); } return result; } diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java new file mode 100644 index 0000000..f02bfa8 --- /dev/null +++ hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java @@ -0,0 +1,272 @@ +package org.apache.hadoop.hbase.trace; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.IntegrationTests; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.AbstractHBaseTool; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.ToolRunner; +import org.cloudera.htrace.Sampler; +import org.cloudera.htrace.Span; +import org.cloudera.htrace.Trace; +import org.cloudera.htrace.TraceScope; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +@Category(IntegrationTests.class) +public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { + + public static final String TABLE_ARG = "t"; + public static final String CF_ARG = "f"; + + public static final String TABLE_NAME_DEFAULT = "SendTracesTable"; + public static final String COLUMN_FAMILY_DEFAULT = "D"; + private String tableName = TABLE_NAME_DEFAULT; + private String familyName = COLUMN_FAMILY_DEFAULT; + private IntegrationTestingUtility util; + private Random random = new Random(); + private HBaseAdmin admin; + private SpanReceiverHost receiverHost; + + public static void main(String[] args) throws Exception { + Configuration configuration = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(configuration); + IntegrationTestSendTraceRequests tool = new IntegrationTestSendTraceRequests(); + ToolRunner.run(configuration, tool, args); + } + + @Override + protected void addOptions() { + addOptWithArg(TABLE_ARG, "The table name to target. Will be created if not there already."); + addOptWithArg(TABLE_ARG, "The family to target"); + } + + @Override + public void processOptions(CommandLine cmd) { + String tableNameString = cmd.getOptionValue(TABLE_ARG, TABLE_NAME_DEFAULT); + String familyString = cmd.getOptionValue(CF_ARG, COLUMN_FAMILY_DEFAULT); + + this.tableName = tableNameString; + this.familyName = familyString; + } + + @Override + public int doWork() throws Exception { + internalDoWork(); + return 0; + } + + @Test + public void internalDoWork() throws Exception { + util = createUtil(); + admin = util.getHBaseAdmin(); + setupReceiver(); + + deleteTable(); + createTable(); + LinkedBlockingQueue rks = insertData(); + + ExecutorService service = Executors.newFixedThreadPool(20); + doScans(service, rks); + doGets(service, rks); + + service.shutdown(); + service.awaitTermination(100, TimeUnit.SECONDS); + Thread.sleep(90000); + receiverHost.closeReceivers(); + util.restoreCluster(); + util = null; + } + + private void doScans(ExecutorService service, final LinkedBlockingQueue rks) { + + for (int i = 0; i < 100; i++) { + Runnable runnable = new Runnable() { + private TraceScope innerScope = null; + private final LinkedBlockingQueue rowKeyQueue = rks; + @Override + public void run() { + ResultScanner rs = null; + try { + innerScope = Trace.startSpan("Scan", Sampler.ALWAYS); + HTable ht = new HTable(util.getConfiguration(), tableName); + Scan s = new Scan(); + s.setStartRow(Bytes.toBytes(rowKeyQueue.take())); + s.setBatch(7); + rs = ht.getScanner(s); + // Something to keep the jvm from removing the loop. + long accum = 0; + + for(int x = 0; x < 1000; x++) { + Result r = rs.next(); + accum |= Bytes.toLong(r.getRow()); + } + + innerScope.getSpan().addTimelineAnnotation("Accum result = " + accum); + + ht.close(); + ht = null; + } catch (IOException e) { + e.printStackTrace(); + + innerScope.getSpan().addKVAnnotation( + Bytes.toBytes("exception"), + Bytes.toBytes(e.getClass().getSimpleName())); + + } catch (Exception e) { + } finally { + if (innerScope != null) innerScope.close(); + if (rs != null) rs.close(); + } + + } + }; + service.submit(runnable); + } + + } + + private void doGets(ExecutorService service, final LinkedBlockingQueue rowKeys) + throws IOException { + for (int i = 0; i < 100; i++) { + Runnable runnable = new Runnable() { + private TraceScope innerScope = null; + private final LinkedBlockingQueue rowKeyQueue = rowKeys; + + @Override + public void run() { + + + HTable ht = null; + try { + ht = new HTable(util.getConfiguration(), tableName); + } catch (IOException e) { + e.printStackTrace(); + } + + long accum = 0; + for (int x = 0; x < 5; x++) { + try { + innerScope = Trace.startSpan("gets", Sampler.ALWAYS); + long rk = rowKeyQueue.take(); + Result r1 = ht.get(new Get(Bytes.toBytes(rk))); + if (r1 != null) { + accum |= Bytes.toLong(r1.getRow()); + } + Result r2 = ht.get(new Get(Bytes.toBytes(rk))); + if (r2 != null) { + accum |= Bytes.toLong(r2.getRow()); + } + innerScope.getSpan().addTimelineAnnotation("Accum = " + accum); + + } catch (IOException e) { + // IGNORED + } catch (InterruptedException ie) { + // IGNORED + } finally { + if (innerScope != null) innerScope.close(); + } + } + + } + }; + service.submit(runnable); + } + } + + private void createTable() throws IOException { + TraceScope createScope = null; + try { + createScope = Trace.startSpan("createTable", Sampler.ALWAYS); + util.createTable(tableName, familyName); + } finally { + if (createScope != null) createScope.close(); + } + } + + private void deleteTable() throws IOException { + TraceScope deleteScope = null; + + try { + if (admin.tableExists(tableName)) { + deleteScope = Trace.startSpan("deleteTable", Sampler.ALWAYS); + util.deleteTable(tableName); + } + } finally { + if (deleteScope != null) deleteScope.close(); + } + } + + private LinkedBlockingQueue insertData() throws IOException, InterruptedException { + LinkedBlockingQueue rowKeys = new LinkedBlockingQueue(25000); + HTable ht = new HTable(util.getConfiguration(), this.tableName); + byte[] value = new byte[300]; + for (int x = 0; x < 5000; x++) { + TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS); + try { + ht.setAutoFlush(false); + for (int i = 0; i < 5; i++) { + long rk = random.nextLong(); + rowKeys.add(rk); + Put p = new Put(Bytes.toBytes(rk)); + for (int y = 0; y < 10; y++) { + random.nextBytes(value); + p.add(Bytes.toBytes(familyName), + Bytes.toBytes(random.nextLong()), + value); + } + ht.put(p); + } + if ((x % 1000) == 0) { + admin.flush(Bytes.toBytes(tableName)); + } + } finally { + traceScope.close(); + } + } + admin.flush(Bytes.toBytes(tableName)); + return rowKeys; + } + + private IntegrationTestingUtility createUtil() throws Exception { + Configuration conf = getConf(); + if (this.util == null) { + IntegrationTestingUtility u; + if (conf == null) { + u = new IntegrationTestingUtility(); + } else { + u = new IntegrationTestingUtility(conf); + } + util = u; + util.initializeCluster(1); + + } + return this.util; + } + + private void setupReceiver() { + Configuration conf = new Configuration(util.getConfiguration()); + conf.setBoolean("hbase.zipkin.is-in-client-mode", true); + + this.receiverHost = SpanReceiverHost.getInstance(conf); + } +} diff --git hbase-server/pom.xml hbase-server/pom.xml index 3d33968..c0742af 100644 --- hbase-server/pom.xml +++ hbase-server/pom.xml @@ -476,8 +476,12 @@ org.cloudera.htrace - htrace + htrace-core + + org.cloudera.htrace + htrace-zipkin + diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java index 559e7e8..076174d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.Server; import org.cloudera.htrace.Sampler; import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; +import org.cloudera.htrace.TraceScope; +import org.cloudera.htrace.impl.AlwaysSampler; /** @@ -99,7 +101,7 @@ public abstract class EventHandler implements Runnable, Comparable { * Default base class constructor. */ public EventHandler(Server server, EventType eventType) { - this.parent = Trace.currentTrace(); + this.parent = Trace.currentSpan(); this.server = server; this.eventType = eventType; seqid = seqids.incrementAndGet(); @@ -123,8 +125,7 @@ public abstract class EventHandler implements Runnable, Comparable { } public void run() { - Span chunk = Trace.startSpan(Thread.currentThread().getName(), parent, - Sampler.ALWAYS); + TraceScope chunk = Trace.startSpan(this.getClass().getSimpleName(), parent); try { if (getListener() != null) getListener().beforeProcess(this); process(); @@ -132,7 +133,7 @@ public abstract class EventHandler implements Runnable, Comparable { } catch(Throwable t) { LOG.error("Caught throwable while processing event " + eventType, t); } finally { - chunk.stop(); + chunk.close(); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 618c024..70e59f6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.io.WritableUtils; +import org.cloudera.htrace.Trace; +import org.cloudera.htrace.TraceScope; /** * {@link HFile} reader for version 2. @@ -292,9 +294,9 @@ public class HFileReaderV2 extends AbstractHFileReader { boolean useLock = false; IdLock.Entry lockEntry = null; + TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock"); try { while (true) { - if (useLock) { lockEntry = offsetLock.getLockEntry(dataBlockOffset); } @@ -329,7 +331,9 @@ public class HFileReaderV2 extends AbstractHFileReader { useLock = true; continue; } - + if (Trace.isTracing()) { + traceScope.getSpan().addTimelineAnnotation("blockCacheMiss"); + } // Load block from filesystem. long startTimeNs = System.nanoTime(); HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, @@ -352,6 +356,7 @@ public class HFileReaderV2 extends AbstractHFileReader { return hfileBlock; } } finally { + traceScope.close(); if (lockEntry != null) { offsetLock.releaseLockEntry(lockEntry); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java index 4adad49..49f2084 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.security.User; import com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.util.Bytes; +import org.cloudera.htrace.Trace; import java.net.InetAddress; @@ -97,6 +99,16 @@ public class RequestContext { ctx.remoteAddress = remoteAddress; ctx.service = service; ctx.inRequest = true; + if (Trace.isTracing()) { + if (user != null) { + Trace.currentSpan().addKVAnnotation(Bytes.toBytes("user"), Bytes.toBytes(user.getName())); + } + if (remoteAddress != null) { + Trace.currentSpan().addKVAnnotation( + Bytes.toBytes("remoteAddress"), + Bytes.toBytes(remoteAddress.getHostAddress())); + } + } } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 842363d..09e854d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -114,10 +114,9 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; import org.cloudera.htrace.Sampler; -import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceInfo; -import org.cloudera.htrace.impl.NullSpan; +import org.cloudera.htrace.TraceScope; import org.codehaus.jackson.map.ObjectMapper; import com.google.protobuf.BlockingService; @@ -313,6 +312,14 @@ public class RpcServer implements RpcServerInterface { return sb.toString(); } + String toTraceString() { + String serviceName = this.connection.service != null ? + this.connection.service.getDescriptorForType().getName() : ""; + String methodName = (this.md != null) ? this.md.getName() : ""; + String result = serviceName + "." + methodName; + return result; + } + protected synchronized void setSaslTokenResponse(ByteBuffer response) { this.response = response; } @@ -1779,14 +1786,13 @@ public class RpcServer implements RpcServerInterface { String error = null; Pair resultPair = null; CurCall.set(call); - Span currentRequestSpan = NullSpan.getInstance(); + TraceScope traceScope = null; try { if (!started) { throw new ServerNotRunningYetException("Server is not running yet"); } if (call.tinfo != null) { - currentRequestSpan = Trace.startSpan( - "handling " + call.toShortString(), call.tinfo, Sampler.ALWAYS); + traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); } User user; if (call.effectiveUser == null) { @@ -1811,7 +1817,9 @@ public class RpcServer implements RpcServerInterface { errorThrowable = e; error = StringUtils.stringifyException(e); } finally { - currentRequestSpan.stop(); + if (traceScope != null) { + traceScope.close(); + } // Must always clear the request context to avoid leaking // credentials between requests. RequestContext.clear(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 35aa0ce..1404f80 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -791,8 +791,7 @@ MasterServices, Server { status.setStatus("Initializing master coprocessors"); this.cpHost = new MasterCoprocessorHost(this, this.conf); - spanReceiverHost = new SpanReceiverHost(getConfiguration()); - spanReceiverHost.loadSpanReceivers(); + spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); // start up all service threads. status.setStatus("Initializing master service threads"); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java index f07669f..78a3c5b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java @@ -210,7 +210,7 @@ public class DisableTableHandler extends EventHandler { continue; } final HRegionInfo hri = region; - pool.execute(Trace.wrap(new Runnable() { + pool.execute(Trace.wrap("DisableTableHandler.BulkDisabler",new Runnable() { public void run() { assignmentManager.unassign(hri, true); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java index 8db3161..04177ce 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java @@ -268,7 +268,7 @@ public class EnableTableHandler extends EventHandler { continue; } final HRegionInfo hri = region; - pool.execute(Trace.wrap(new Runnable() { + pool.execute(Trace.wrap("BulkEnabler.populatePool",new Runnable() { public void run() { assignmentManager.assign(hri, true); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9e3f7c6..23a9b1e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3400,7 +3400,7 @@ public class HRegion implements HeapSize { // , Writable{ RegionScannerImpl(Scan scan, List additionalScanners, HRegion region) throws IOException { - // DebugPrint.println("HRegionScanner."); + this.region = region; this.maxResultSize = scan.getMaxResultSize(); if (scan.hasFilter()) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 423a1fd..52a4711 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -191,6 +191,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -376,6 +377,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa public static final String REGIONSERVER_CONF = "regionserver_conf"; private MetricsRegionServer metricsRegionServer; + private SpanReceiverHost spanReceiverHost; /* * Check for compactions requests. @@ -1190,6 +1192,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa this.hlog = setupWALAndReplication(); // Init in here rather than in constructor after thread name has been set this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this)); + + spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); + startServiceThreads(); LOG.info("Serving as " + this.serverNameFromMasterPOV + ", RpcServer on " + this.isa + @@ -1794,6 +1799,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (this.healthCheckChore != null) { Threads.shutdown(this.healthCheckChore.getThread()); } + if (this.spanReceiverHost != null) { + this.spanReceiverHost.closeReceivers(); + } if (this.hlogRoller != null) { Threads.shutdown(this.hlogRoller.getThread()); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 1903a4a..ab27815 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -50,6 +50,8 @@ import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; import com.google.common.base.Preconditions; +import org.cloudera.htrace.Trace; +import org.cloudera.htrace.TraceScope; /** * Thread that flushes cache on request @@ -505,7 +507,11 @@ class MemStoreFlusher implements FlushRequester { * amount of memstore consumption. */ public void reclaimMemStoreMemory() { + TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory"); if (isAboveHighWaterMark()) { + if (Trace.isTracing()) { + scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark."); + } long start = System.currentTimeMillis(); synchronized (this.blockSignal) { boolean blocked = false; @@ -542,6 +548,7 @@ class MemStoreFlusher implements FlushRequester { } else if (isAboveLowWaterMark()) { wakeupFlushThread(); } + scope.close(); } @Override public String toString() { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 77100c9..8cd86d7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -63,6 +63,8 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; +import org.cloudera.htrace.Trace; +import org.cloudera.htrace.TraceScope; /** * HLog stores all the edits to the HStore. Its the hbase write-ahead-log @@ -874,35 +876,40 @@ class FSHLog implements HLog, Syncable { if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long txid = 0; - synchronized (this.updateLock) { - long seqNum = obtainSeqNum(); - // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region (i.e. the first edit added to the particular - // memstore). . When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush - // is greater than or equal to the value in lastSeqWritten. - // Use encoded name. Its shorter, guaranteed unique and a subset of - // actual name. - byte [] encodedRegionName = info.getEncodedNameAsBytes(); - if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); - HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); - doWrite(info, logKey, edits, htd); - this.numEntries.incrementAndGet(); - txid = this.unflushedEntries.incrementAndGet(); - if (htd.isDeferredLogFlush()) { - lastDeferredTxid = txid; + TraceScope traceScope = Trace.startSpan("FSHlog.append"); + try { + long txid = 0; + synchronized (this.updateLock) { + long seqNum = obtainSeqNum(); + // The 'lastSeqWritten' map holds the sequence number of the oldest + // write for each region (i.e. the first edit added to the particular + // memstore). . When the cache is flushed, the entry for the + // region being flushed is removed if the sequence number of the flush + // is greater than or equal to the value in lastSeqWritten. + // Use encoded name. Its shorter, guaranteed unique and a subset of + // actual name. + byte [] encodedRegionName = info.getEncodedNameAsBytes(); + if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); + HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); + doWrite(info, logKey, edits, htd); + this.numEntries.incrementAndGet(); + txid = this.unflushedEntries.incrementAndGet(); + if (htd.isDeferredLogFlush()) { + lastDeferredTxid = txid; + } } + // Sync if catalog region, and if not then check if that table supports + // deferred log flushing + if (doSync && + (info.isMetaRegion() || + !htd.isDeferredLogFlush())) { + // sync txn to file system + this.sync(txid); + } + return txid; + } finally { + traceScope.close(); } - // Sync if catalog region, and if not then check if that table supports - // deferred log flushing - if (doSync && - (info.isMetaRegion() || - !htd.isDeferredLogFlush())) { - // sync txn to file system - this.sync(txid); - } - return txid; } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java hbase-server/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java new file mode 100644 index 0000000..bb24dc2 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java @@ -0,0 +1,30 @@ +package org.apache.hadoop.hbase.trace; + +import org.apache.hadoop.conf.Configuration; +import org.cloudera.htrace.HTraceConfiguration; + +public class HBaseHTraceConfiguration extends HTraceConfiguration { + + public static final String KEY_PREFIX = "hbase."; + private Configuration conf; + + public HBaseHTraceConfiguration(Configuration conf) { + this.conf = conf; + } + + @Override + public String get(String key) { + return conf.get(KEY_PREFIX +key); + } + + @Override + public String get(String key, String defaultValue) { + return conf.get(KEY_PREFIX + key,defaultValue); + + } + + @Override + public boolean getBoolean(String key, boolean defaultValue) { + return conf.getBoolean(KEY_PREFIX + key, defaultValue); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/trace/HBaseLocalFileSpanReceiver.java hbase-server/src/main/java/org/apache/hadoop/hbase/trace/HBaseLocalFileSpanReceiver.java deleted file mode 100644 index c542ad0..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/trace/HBaseLocalFileSpanReceiver.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.trace; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.cloudera.htrace.Span; -import org.cloudera.htrace.SpanReceiver; -import org.cloudera.htrace.Trace; -import org.cloudera.htrace.impl.LocalFileSpanReceiver; - -/** - * Wraps the LocalFileSpanReceiver provided in - * org.cloudera.htrace.impl.LocalFileSpanReceiver to read the file name - * destination for spans from hbase-site.xml. - * - * The file path should be added as a property with name - * "hbase.trace.spanreceiver.localfilespanreceiver.filename". - */ -public class HBaseLocalFileSpanReceiver implements SpanReceiver, Configurable { - public static final Log LOG = LogFactory - .getLog(HBaseLocalFileSpanReceiver.class); - public static final String FILE_NAME_CONF_KEY = "hbase.trace.spanreceiver.localfilespanreceiver.filename"; - private Configuration conf; - private LocalFileSpanReceiver rcvr; - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration arg0) { - this.conf = arg0; - // replace rcvr if it was already created - if (rcvr != null) { - try { - rcvr.close(); - } catch (IOException e) { - LOG.warn("Error closing LocalFileSpanReceiver.", e); - } - } - try { - rcvr = new LocalFileSpanReceiver(conf.get(FILE_NAME_CONF_KEY)); - } catch (IOException e) { - Trace.removeReceiver(this); - rcvr = null; - LOG.warn( - "Unable to initialize LocalFileSpanReceiver, removing owner (HBaseLocalFileSpanReceiver) from receiver list.", - e); - } - } - - @Override - public void close() throws IOException { - try{ - if (rcvr != null) { - rcvr.close(); - } - } finally { - rcvr = null; - } - } - - @Override - public void receiveSpan(Span span) { - if (rcvr != null) { - rcvr.receiveSpan(span); - } - } -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java index 61b7413..fee9426 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; import org.cloudera.htrace.SpanReceiver; import org.cloudera.htrace.Trace; +import org.cloudera.htrace.impl.ZipkinSpanReceiver; /** * This class provides functions for reading the names of SpanReceivers from @@ -38,8 +39,32 @@ public class SpanReceiverHost { private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class); private Collection receivers; private Configuration conf; + private boolean closed = false; - public SpanReceiverHost(Configuration conf) { + private static enum SingleTonholder { + INSTANCE; + Object lock = new Object(); + SpanReceiverHost host = null; + } + + public static SpanReceiverHost getInstance(Configuration conf) { + if (SingleTonholder.INSTANCE.host != null) { + return SingleTonholder.INSTANCE.host; + } + synchronized (SingleTonholder.INSTANCE.lock) { + if (SingleTonholder.INSTANCE.host != null) { + return SingleTonholder.INSTANCE.host; + } + + SpanReceiverHost host = new SpanReceiverHost(conf); + host.loadSpanReceivers(); + SingleTonholder.INSTANCE.host = host; + return SingleTonholder.INSTANCE.host; + } + + } + + SpanReceiverHost(Configuration conf) { receivers = new HashSet(); this.conf = conf; } @@ -48,13 +73,7 @@ public class SpanReceiverHost { * Reads the names of classes specified in the * "hbase.trace.spanreceiver.classes" property and instantiates and registers * them with the Tracer as SpanReceiver's. - * - * The nullary constructor is called during construction, but if the classes - * specified implement the Configurable interface, setConfiguration() will be - * called on them. This allows SpanReceivers to use values from - * hbase-site.xml. See - * {@link org.apache.hadoop.hbase.trace.HBaseLocalFileSpanReceiver} for an - * example. + * */ public void loadSpanReceivers() { Class implClass = null; @@ -67,8 +86,12 @@ public class SpanReceiverHost { try { implClass = Class.forName(className); - receivers.add(loadInstance(implClass)); - LOG.info("SpanReceiver " + className + " was loaded successfully."); + SpanReceiver receiver = loadInstance(implClass); + if (receiver != null) { + receivers.add(receiver); + LOG.info("SpanReceiver " + className + " was loaded successfully."); + } + } catch (ClassNotFoundException e) { LOG.warn("Class " + className + " cannot be found. " + e.getMessage()); } catch (IOException e) { @@ -83,16 +106,21 @@ public class SpanReceiverHost { private SpanReceiver loadInstance(Class implClass) throws IOException { - SpanReceiver impl; + SpanReceiver impl = null; try { - Object o = ReflectionUtils.newInstance(implClass, conf); + Object o = implClass.newInstance(); impl = (SpanReceiver)o; + impl.configure(new HBaseHTraceConfiguration(this.conf)); } catch (SecurityException e) { throw new IOException(e); } catch (IllegalArgumentException e) { throw new IOException(e); } catch (RuntimeException e) { throw new IOException(e); + } catch (InstantiationException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); } return impl; @@ -101,7 +129,9 @@ public class SpanReceiverHost { /** * Calls close() on all SpanReceivers created by this SpanReceiverHost. */ - public void closeReceivers() { + public synchronized void closeReceivers() { + if (closed) return; + closed = true; for (SpanReceiver rcvr : receivers) { try { rcvr.close(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestHTraceHooks.java hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestHTraceHooks.java index a19dbb3..faf015c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestHTraceHooks.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestHTraceHooks.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Put; import org.cloudera.htrace.Sampler; import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; +import org.cloudera.htrace.TraceScope; import org.cloudera.htrace.TraceTree; import org.cloudera.htrace.impl.POJOSpanReceiver; import org.junit.AfterClass; @@ -60,13 +61,14 @@ public class TestHTraceHooks { @Test public void testTraceCreateTable() throws Exception { - Span tableCreationSpan = Trace.startSpan("creating table", Sampler.ALWAYS); + TraceScope tableCreationSpan = Trace.startSpan("creating table", Sampler.ALWAYS); HTable table; try { + table = TEST_UTIL.createTable("table".getBytes(), FAMILY_BYTES); } finally { - tableCreationSpan.stop(); + tableCreationSpan.close(); } Collection spans = rcvr.getSpans(); @@ -80,26 +82,26 @@ public class TestHTraceHooks { Multimap spansByParentIdMap = traceTree .getSpansByParentIdMap(); - int startsWithHandlingCount = 0; + int createTableCount = 0; for (Span s : spansByParentIdMap.get(createTableRoot.getSpanId())) { - if (s.getDescription().startsWith("handling")) { - startsWithHandlingCount++; + if (s.getDescription().startsWith("MasterAdminService.CreateTable")) { + createTableCount++; } } - assertTrue(startsWithHandlingCount > 3); + assertTrue(createTableCount >= 1); assertTrue(spansByParentIdMap.get(createTableRoot.getSpanId()).size() > 3); assertTrue(spans.size() > 5); Put put = new Put("row".getBytes()); put.add(FAMILY_BYTES, "col".getBytes(), "value".getBytes()); - Span putSpan = Trace.startSpan("doing put", Sampler.ALWAYS); + TraceScope putSpan = Trace.startSpan("doing put", Sampler.ALWAYS); try { table.put(put); } finally { - putSpan.stop(); + putSpan.close(); } spans = rcvr.getSpans(); diff --git pom.xml pom.xml index 92385c0..9e2e338 100644 --- pom.xml +++ pom.xml @@ -902,7 +902,7 @@ 1.8 1.6.8 4.11 - 1.50 + 2.00 1.2.17 1.9.0 2.4.1 @@ -1326,7 +1326,12 @@ org.cloudera.htrace - htrace + htrace-core + ${htrace.version} + + + org.cloudera.htrace + htrace-zipkin ${htrace.version} -- 1.7.10.2 (Apple Git-33)