commit e2144bdae171805ca4404046162ce4d25e148595 Author: stack Date: Tue Dec 8 10:19:12 2015 -0800 HBASE-14451 Move on to htrace-4.0.1 (from htrace-3.2.0) diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index 401e28e..fe38e07 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -170,7 +170,7 @@ org.apache.htrace - htrace-core + htrace-core4 org.codehaus.jackson diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f1fa3eb..d5cbeca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.htrace.Trace; import com.google.common.annotations.VisibleForTesting; @@ -1012,8 +1011,10 @@ class AsyncProcess { if (connection.getConnectionMetrics() != null) { connection.getConnectionMetrics().incrNormalRunners(); } - return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); + Runnable runnable = + new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); + return Collections.singletonList( + connection.getTracer().wrap(runnable, "AsyncProcess.sendMultiAction")); } // group the actions by the amount of delay @@ -1051,7 +1052,7 @@ class AsyncProcess { connection.getConnectionMetrics().incrNormalRunners(); } } - runnable = Trace.wrap(traceText, runnable); + runnable = connection.getTracer().wrap(runnable, traceText); toReturn.add(runnable); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 99071fa..21c5283 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; +import org.apache.htrace.core.Tracer; /** Internal methods on HConnection that should not be used by user code. */ @InterfaceAudience.Private @@ -286,7 +287,7 @@ public interface ClusterConnection extends HConnection { * @return RpcRetryingCallerFactory */ RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf); - + /** * @return the current statistics tracker associated with this connection */ @@ -300,6 +301,13 @@ public interface ClusterConnection extends HConnection { /** * @return the MetricsConnection instance associated with this connection. */ - public MetricsConnection getConnectionMetrics(); + MetricsConnection getConnectionMetrics(); + /** + * @return This Connection's non-null Tracer instance. + */ + // All the 'extras' in here are annotated Private, as for internal use only but adding + // this annotation here to underline this fact. + @InterfaceAudience.Private + Tracer getTracer(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java index a3f6fe6..1ae5b10 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; * thread will obtain its own Table instance. Caching or pooling of {@link Table} and {@link Admin} * is not recommended. * - *

This class replaces {@link HConnection}, which is now deprecated. + *

This class replaces HConnection, which is now deprecated. * @see ConnectionFactory * @since 0.99.0 */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 62a7998..22e2a04 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunn import org.apache.hadoop.hbase.quotas.ThrottlingException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -78,6 +79,9 @@ import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; +import org.apache.htrace.core.HTraceConfiguration; +import org.apache.htrace.core.Tracer; +import org.apache.htrace.core.TracerId; import org.apache.zookeeper.KeeperException; import javax.annotation.Nullable; @@ -148,8 +152,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // be waiting for the master lock => deadlock. private final Object masterAndZKLock = new Object(); - private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE; - // thread executor shared by all HTableInterface instances created // by this connection private volatile ExecutorService batchPool = null; @@ -186,6 +188,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable { private final ClientBackoffPolicy backoffPolicy; /** + * Connection tracer. + */ + private final Tracer tracer; + + /** * constructor * @param conf Configuration object */ @@ -227,6 +234,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.metrics = null; } this.metaCache = new MetaCache(this.metrics); + + // Set up tracer. Add Connection hash to the TraceId so it comes out as "HConnection,hash". + conf.set(HBaseHTraceConfiguration.KEY_PREFIX + TracerId.TRACER_ID_KEY, toString()); + HTraceConfiguration htraceConfiguration = new HBaseHTraceConfiguration(conf); + this.tracer = new Tracer.Builder("").conf(htraceConfiguration).build(); boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); @@ -1367,10 +1379,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { private ZooKeeperKeepAliveConnection keepAliveZookeeper; private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0); - private boolean canCloseZKW = true; - - // keepAlive time, in ms. No reason to make it configurable. - private static final long keepAlive = 5 * 60 * 1000; /** * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it. @@ -1385,10 +1393,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } // We don't check that our link to ZooKeeper is still valid // But there is a retry mechanism in the ZooKeeperWatcher itself - keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this); + keepAliveZookeeper = + new ZooKeeperKeepAliveConnection(conf, this.toString(), this, getTracer()); } keepAliveZookeeperUserCount.addAndGet(1); - keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE; return keepAliveZookeeper; } } @@ -1397,9 +1405,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (zkw == null){ return; } - if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0) { - keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive; - } } private void closeZooKeeperWatcher() { @@ -2315,4 +2320,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return RpcRetryingCallerFactory .instantiate(conf, this.interceptor, this.getStatisticsTracker()); } -} + + @Override + public Tracer getTracer() { + return this.tracer; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 66079dd..ab31ae6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1937,7 +1937,7 @@ public class HBaseAdmin implements Admin { try { checkTableExists(tableName); zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(), - new ThrowableAbortable()); + new ThrowableAbortable(), connection.getTracer()); List> pairs; if (TableName.META_TABLE_NAME.equals(tableName)) { pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper); @@ -2347,7 +2347,7 @@ public class HBaseAdmin implements Admin { try { checkTableExists(tableName); zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(), - new ThrowableAbortable()); + new ThrowableAbortable(), this.connection.getTracer()); List> pairs; if (TableName.META_TABLE_NAME.equals(tableName)) { pairs = new MetaTableLocator().getMetaRegionsAndLocations(zookeeper); @@ -2870,7 +2870,7 @@ public class HBaseAdmin implements Admin { throws IOException { ZooKeeperWatcher zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(), - new ThrowableAbortable()); + new ThrowableAbortable(), this.connection.getTracer()); List regions = null; try { if (TableName.META_TABLE_NAME.equals(tableName)) { @@ -4160,7 +4160,7 @@ public class HBaseAdmin implements Admin { default: ZooKeeperWatcher zookeeper = new ZooKeeperWatcher(conf, ZK_IDENTIFIER_PREFIX + connection.toString(), - new ThrowableAbortable()); + new ThrowableAbortable(), this.connection.getTracer()); try { List> pairs; if (TableName.META_TABLE_NAME.equals(tableName)) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java index 9b32e93..6ef451b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java @@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.htrace.Trace; +import org.apache.htrace.core.Tracer; /** * A completion service for the RpcRetryingCallerFactory. @@ -143,7 +143,8 @@ public class ResultBoundedCompletionService { public void submit(RetryingCallable task, int callTimeout, int id) { QueueingFuture newFuture = new QueueingFuture(task, callTimeout); - executor.execute(Trace.wrap(newFuture)); + Tracer tracer = Tracer.curThreadTracer(); + executor.execute(tracer == null? newFuture: tracer.wrap(newFuture, "submit")); tasks[id] = newFuture; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java index 04d4b41..58fcb30 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZooKeeperKeepAliveConnection.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.htrace.core.Tracer; /** * We inherit the current ZooKeeperWatcher implementation to change the semantic @@ -39,8 +40,9 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; class ZooKeeperKeepAliveConnection extends ZooKeeperWatcher{ ZooKeeperKeepAliveConnection( Configuration conf, String descriptor, - ConnectionImplementation conn) throws IOException { - super(conf, descriptor, conn); + ConnectionImplementation conn, Tracer tracer) + throws IOException { + super(conf, descriptor, conn, tracer); } @Override @@ -53,4 +55,4 @@ class ZooKeeperKeepAliveConnection extends ZooKeeperWatcher{ void internalClose(){ super.close(); } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 8bd1267..e6ded02 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; @@ -98,7 +99,7 @@ public class ReplicationAdmin implements Closeable { public static final String REPLICATIONGLOBAL = Integer .toString(HConstants.REPLICATION_SCOPE_GLOBAL); - private final Connection connection; + private final ClusterConnection connection; // TODO: replication should be managed by master. All the classes except ReplicationAdmin should // be moved to hbase-server. Resolve it in HBASE-11392. private final ReplicationQueuesClient replicationQueuesClient; @@ -121,7 +122,7 @@ public class ReplicationAdmin implements Closeable { throw new RuntimeException("hbase.replication isn't true, please " + "enable it in order to use replication"); } - this.connection = ConnectionFactory.createConnection(conf); + this.connection = (ClusterConnection)ConnectionFactory.createConnection(conf); try { zkw = createZooKeeperWatcher(); try { @@ -165,7 +166,7 @@ public class ReplicationAdmin implements Closeable { public boolean isAborted() { return false; } - }); + }, connection.getTracer()); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 44e8322..2ce8e2e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -69,8 +69,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenSelector; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.Tracer; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; @@ -406,11 +406,10 @@ public class AsyncRpcChannel { .newBuilder(); requestHeaderBuilder.setCallId(call.id) .setMethodName(call.method.getName()).setRequestParam(call.param != null); - - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); + SpanId spanId = Tracer.getCurrentSpanId(); + if (spanId.isValid()) { requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder(). - setParentId(s.getSpanId()).setTraceId(s.getTraceId())); + setParentId(spanId.getLow()).setTraceId(spanId.getHigh())); } ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index d7a0029..e8ee70d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -66,9 +66,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenSelector; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import javax.net.SocketFactory; import javax.security.sasl.SaslException; @@ -144,15 +144,15 @@ public class RpcClientImpl extends AbstractRpcClient { private static class CallFuture { final Call call; final int priority; - final Span span; + final SpanId parentSpanId; // We will use this to stop the writer - final static CallFuture DEATH_PILL = new CallFuture(null, -1, null); + final static CallFuture DEATH_PILL = new CallFuture(null, -1); - CallFuture(Call call, int priority, Span span) { + CallFuture(Call call, int priority) { this.call = call; this.priority = priority; - this.span = span; + this.parentSpanId = Tracer.getCurrentSpanId(); } } @@ -206,9 +206,9 @@ public class RpcClientImpl extends AbstractRpcClient { protected final BlockingQueue callsToWrite; - public CallFuture sendCall(Call call, int priority, Span span) + public CallFuture sendCall(Call call, int priority) throws InterruptedException, IOException { - CallFuture cts = new CallFuture(call, priority, span); + CallFuture cts = new CallFuture(call, priority); if (!callsToWrite.offer(cts)) { throw new IOException("Can't add the call " + call.id + " to the write queue. callsToWrite.size()=" + callsToWrite.size()); @@ -267,9 +267,10 @@ public class RpcClientImpl extends AbstractRpcClient { if (cts.call.checkAndSetTimeout()) { continue; } - + Tracer tracer = Tracer.curThreadTracer(); + TraceScope traceScope = tracer == null? null: tracer.newScope("call"); try { - Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span); + Connection.this.writeRequest(cts.call, cts.priority); } catch (IOException e) { if (LOG.isDebugEnabled()) { LOG.debug("call write error for call #" + cts.call.id @@ -277,6 +278,8 @@ public class RpcClientImpl extends AbstractRpcClient { } cts.call.setException(e); markClosed(e); + } finally { + if (traceScope != null) traceScope.close(); } } @@ -865,27 +868,22 @@ public class RpcClientImpl extends AbstractRpcClient { } } - protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException { - TraceScope ts = Trace.continueSpan(span); - try { - writeRequest(call, priority, span); - } finally { - ts.close(); - } - } - /** * Initiates a call by sending the parameter to the remote server. * Note: this is not called from the Connection thread, but by other * threads. * @see #readResponse() */ - private void writeRequest(Call call, final int priority, Span span) throws IOException { + private void writeRequest(Call call, final int priority) throws IOException { RequestHeader.Builder builder = RequestHeader.newBuilder(); builder.setCallId(call.id); - if (span != null) { - builder.setTraceInfo( - RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId())); + SpanId spanId = Tracer.getCurrentSpanId(); + if (spanId.isValid()) { + // Pre-4.0.0 htrace, we had parent and trace id. In 4.0.0, there is one id only and it is + // 128 bits. Rather than add new fields or change field names, just set the high and low of + // the 128 integer into the old parent id and trace id fields. + builder.setTraceInfo(RPCTInfo.newBuilder(). + setParentId(spanId.getLow()).setTraceId(spanId.getHigh())); } builder.setMethodName(call.md.getName()); builder.setRequestParam(call.param != null); @@ -1214,8 +1212,12 @@ public class RpcClientImpl extends AbstractRpcClient { final Connection connection = getConnection(ticket, call, addr); final CallFuture cts; + Tracer tracer = Tracer.curThreadTracer(); if (connection.callSender != null) { - cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan()); + TraceScope traceScope = tracer == null? null: + tracer.newScope("call " + connection.callSender.getName()); + try { + cts = connection.callSender.sendCall(call, pcrc.getPriority()); pcrc.notifyOnCancel(new RpcCallback() { @Override public void run(Object parameter) { @@ -1227,9 +1229,17 @@ public class RpcClientImpl extends AbstractRpcClient { call.callComplete(); return new Pair(call.response, call.cells); } + } finally { + if (traceScope != null) traceScope.close(); + } } else { cts = null; - connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan()); + TraceScope traceScope = tracer == null? null: tracer.newScope("call"); + try { + connection.writeRequest(call, pcrc.getPriority()); + } finally { + if (traceScope != null) traceScope.close(); + } } while (!call.done) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index c1eb214..f5287ee 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -44,8 +46,6 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.proto.SetDataRequest; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; /** * A zookeeper that can handle 'recoverable' errors. @@ -97,17 +97,19 @@ public class RecoverableZooKeeper { private static final int ID_LENGTH_OFFSET = MAGIC_SIZE; private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT; + private final Tracer tracer; + public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis) + Watcher watcher, int maxRetries, int retryIntervalMillis, final Tracer tracer) throws IOException { - this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, - null); + this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, tracer, null); } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE", justification="None. Its always been this way.") public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier) + Watcher watcher, int maxRetries, int retryIntervalMillis, final Tracer tracer, + String identifier) throws IOException { // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. this.retryCounterFactory = @@ -127,6 +129,7 @@ public class RecoverableZooKeeper { this.quorumServers = quorumServers; try {checkZk();} catch (Exception x) {/* ignore */} salter = new Random(); + this.tracer = tracer; } /** @@ -168,9 +171,8 @@ public class RecoverableZooKeeper { */ public void delete(String path, int version) throws InterruptedException, KeeperException { - TraceScope traceScope = null; + TraceScope traceScope = this.tracer.newScope("RecoverableZookeeper.delete"); try { - traceScope = Trace.startSpan("RecoverableZookeeper.delete"); RetryCounter retryCounter = retryCounterFactory.create(); boolean isRetry = false; // False for first attempt, true for all retries. while (true) { @@ -212,9 +214,8 @@ public class RecoverableZooKeeper { */ public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { - TraceScope traceScope = null; + TraceScope traceScope = this.tracer.newScope("RecoverableZookeeper.exists"); try { - traceScope = Trace.startSpan("RecoverableZookeeper.exists"); RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -244,9 +245,8 @@ public class RecoverableZooKeeper { */ public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { - TraceScope traceScope = null; + TraceScope traceScope = this.tracer.newScope("RecoverableZookeeper.exists"); try { - traceScope = Trace.startSpan("RecoverableZookeeper.exists"); RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -285,10 +285,9 @@ public class RecoverableZooKeeper { * @return List of children znodes */ public List getChildren(String path, Watcher watcher) - throws KeeperException, InterruptedException { - TraceScope traceScope = null; + throws KeeperException, InterruptedException { + TraceScope traceScope = this.tracer.newScope("RecoverableZookeeper.getChildren"); try { - traceScope = Trace.startSpan("RecoverableZookeeper.getChildren"); RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -318,9 +317,8 @@ public class RecoverableZooKeeper { */ public List getChildren(String path, boolean watch) throws KeeperException, InterruptedException { - TraceScope traceScope = null; + TraceScope traceScope = this.tracer.newScope("RecoverableZookeeper.getChildren"); try { - traceScope = Trace.startSpan("RecoverableZookeeper.getChildren"); RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -350,9 +348,8 @@ public class RecoverableZooKeeper { */ public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { - TraceScope traceScope = null; + TraceScope traceScope = this.tracer.newScope("RecoverableZookeeper.getData"); try { - traceScope = Trace.startSpan("RecoverableZookeeper.getData"); RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -383,9 +380,8 @@ public class RecoverableZooKeeper { */ public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException { - TraceScope traceScope = null; + TraceScope traceScope = this.tracer.newScope("RecoverableZookeeper.getData"); try { - traceScope = Trace.startSpan("RecoverableZookeeper.getData"); RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -418,9 +414,8 @@ public class RecoverableZooKeeper { */ public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException { - TraceScope traceScope = null; + TraceScope traceScope = this.tracer.newScope("RecoverableZookeeper.setData"); try { - traceScope = Trace.startSpan("RecoverableZookeeper.setData"); RetryCounter retryCounter = retryCounterFactory.create(); byte[] newData = appendMetaData(data); boolean isRetry = false; @@ -470,7 +465,7 @@ public class RecoverableZooKeeper { throws KeeperException, InterruptedException { TraceScope traceScope = null; try { - traceScope = Trace.startSpan("RecoverableZookeeper.getAcl"); + traceScope = this.tracer.newScope("RecoverableZookeeper.getAcl"); RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -502,7 +497,7 @@ public class RecoverableZooKeeper { throws KeeperException, InterruptedException { TraceScope traceScope = null; try { - traceScope = Trace.startSpan("RecoverableZookeeper.setAcl"); + traceScope = this.tracer.newScope("RecoverableZookeeper.setAcl"); RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { @@ -544,9 +539,8 @@ public class RecoverableZooKeeper { public String create(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { - TraceScope traceScope = null; + TraceScope traceScope = this.tracer.newScope("RecoverableZookeeper.create"); try { - traceScope = Trace.startSpan("RecoverableZookeeper.create"); byte[] newData = appendMetaData(data); switch (createMode) { case EPHEMERAL: @@ -674,9 +668,8 @@ public class RecoverableZooKeeper { */ public List multi(Iterable ops) throws KeeperException, InterruptedException { - TraceScope traceScope = null; + TraceScope traceScope = this.tracer.newScope("RecoverableZookeeper.multi"); try { - traceScope = Trace.startSpan("RecoverableZookeeper.multi"); RetryCounter retryCounter = retryCounterFactory.create(); Iterable multiOps = prepareZKMulti(ops); while (true) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 633525f..e64cf43 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.DeleteNodeFailSilent; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.apache.htrace.core.Tracer; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -126,20 +127,21 @@ public class ZKUtil { * @return connection to zookeeper * @throws IOException if unable to connect to zk or config problem */ - public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) + public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher, + final Tracer tracer) throws IOException { String ensemble = ZKConfig.getZKQuorumServersString(conf); - return connect(conf, ensemble, watcher); + return connect(conf, ensemble, watcher, tracer); } public static RecoverableZooKeeper connect(Configuration conf, String ensemble, - Watcher watcher) + Watcher watcher, final Tracer tracer) throws IOException { - return connect(conf, ensemble, watcher, null); + return connect(conf, ensemble, watcher, tracer, null); } public static RecoverableZooKeeper connect(Configuration conf, String ensemble, - Watcher watcher, final String identifier) + Watcher watcher, final Tracer tracer, final String identifier) throws IOException { if(ensemble == null) { throw new IOException("Unable to determine ZooKeeper ensemble"); @@ -155,7 +157,7 @@ public class ZKUtil { zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout", 1000); return new RecoverableZooKeeper(ensemble, timeout, watcher, - retry, retryIntervalMillis, identifier); + retry, retryIntervalMillis, tracer, identifier); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZkAclReset.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZkAclReset.java index 9bb6bf7..2252944 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZkAclReset.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZkAclReset.java @@ -27,8 +27,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.htrace.core.Tracer; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; @@ -47,7 +49,8 @@ public class ZkAclReset extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(ZkAclReset.class); private static void resetAcls(final ZooKeeperWatcher zkw, final String znode, - final boolean eraseAcls) throws Exception { + final boolean eraseAcls) + throws Exception { List children = ZKUtil.listChildrenNoWatch(zkw, znode); if (children != null) { for (String child: children) { @@ -65,9 +68,9 @@ public class ZkAclReset extends Configured implements Tool { } } - private static void resetAcls(final Configuration conf, boolean eraseAcls) + private static void resetAcls(final Configuration conf, boolean eraseAcls, Tracer tracer) throws Exception { - ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "ZkAclReset", null); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "ZkAclReset", null, tracer); try { LOG.info((eraseAcls ? "Erase" : "Set") + " HBase ACLs for " + zkw.getQuorum() + " " + zkw.getBaseZNode()); @@ -105,8 +108,10 @@ public class ZkAclReset extends Configured implements Tool { printUsageAndExit(); } } - - resetAcls(getConf(), eraseAcls); + try (Tracer tracer = new Tracer.Builder("ZkAclReset"). + conf(new HBaseHTraceConfiguration(getConf())).build();) { + resetAcls(getConf(), eraseAcls, tracer); + } return(0); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index f7a2175..1574ea3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.htrace.core.Tracer; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -126,6 +127,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private final Exception constructorCaller; + private final Tracer tracer; + /** * Instantiate a ZooKeeper connection and watcher. * @param identifier string that is passed to RecoverableZookeeper to be used as @@ -133,9 +136,22 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { * @throws IOException * @throws ZooKeeperConnectionException */ - public ZooKeeperWatcher(Configuration conf, String identifier, - Abortable abortable) throws ZooKeeperConnectionException, IOException { - this(conf, identifier, abortable, false); + public ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) + throws ZooKeeperConnectionException, IOException { + this(conf, identifier, abortable, null); + } + + /** + * Instantiate a ZooKeeper connection and watcher. + * @param identifier string that is passed to RecoverableZookeeper to be used as + * identifier for this instance. Use null for default. + * @throws IOException + * @throws ZooKeeperConnectionException + */ + public ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable, + final Tracer tracer) + throws ZooKeeperConnectionException, IOException { + this(conf, identifier, abortable, tracer, false); } /** @@ -143,8 +159,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { * @param conf * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for * this instance. Use null for default. - * @param abortable Can be null if there is on error there is no host to abort: e.g. client - * context. + * @param abortable What to call when abort. * @param canCreateBaseZNode * @throws IOException * @throws ZooKeeperConnectionException @@ -152,6 +167,22 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable, boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { + this(conf, identifier, abortable, null, canCreateBaseZNode); + } + + /** + * Instantiate a ZooKeeper connection and watcher. + * @param conf + * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for + * this instance. Use null for default. + * @param abortable What to call when abort. + * @param canCreateBaseZNode + * @throws IOException + * @throws ZooKeeperConnectionException + */ + public ZooKeeperWatcher(Configuration conf, String identifier, + Abortable abortable, Tracer tracer, boolean canCreateBaseZNode) + throws IOException, ZooKeeperConnectionException { this.conf = conf; // Capture a stack trace now. Will print it out later if problem so we can // distingush amongst the myriad ZKWs. @@ -167,10 +198,11 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { this.identifier = identifier + "0x0"; this.abortable = abortable; setNodeNames(conf); - this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier); + this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, tracer, identifier); if (canCreateBaseZNode) { createBaseZNodes(); } + this.tracer = tracer; } private void createBaseZNodes() throws ZooKeeperConnectionException { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java index 72de935..ac449ab 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java @@ -56,7 +56,7 @@ public class TestZKUtil { Configuration conf = HBaseConfiguration.create(); conf.set(Superusers.SUPERUSER_CONF_KEY, "user1,@group1,user2,@group2,user3"); String node = "/hbase/testCreateACL"; - ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, node, null, false); + ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, node, null, null, false); List aclList = ZKUtil.createACL(watcher, node, true); Assert.assertEquals(aclList.size(), 4); // 3+1, since ACL will be set for the creator by default Assert.assertTrue(!aclList.contains(new ACL(Perms.ALL, new Id("auth", "@group1"))) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java index 10a3816..f2dbdd7 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java @@ -34,7 +34,7 @@ public class TestZooKeeperWatcher { @Test public void testIsClientReadable() throws ZooKeeperConnectionException, IOException { ZooKeeperWatcher watcher = new ZooKeeperWatcher(HBaseConfiguration.create(), - "testIsClientReadable", null, false); + "testIsClientReadable", null, null, false); assertTrue(watcher.isClientReadable(watcher.baseZNode)); assertTrue(watcher.isClientReadable(watcher.getZNodeForReplica(0))); diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml index 5b43553..4b971f0 100644 --- a/hbase-common/pom.xml +++ b/hbase-common/pom.xml @@ -277,7 +277,7 @@ org.apache.htrace - htrace-core + htrace-core4 diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java index 56de264..0f09364 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseHTraceConfiguration.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.trace; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.htrace.HTraceConfiguration; +import org.apache.htrace.core.HTraceConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,17 +65,16 @@ public class HBaseHTraceConfiguration extends HTraceConfiguration { @Override public String get(String key) { - return conf.get(KEY_PREFIX +key); + return conf.get(KEY_PREFIX + key); } @Override public String get(String key, String defaultValue) { - return conf.get(KEY_PREFIX + key,defaultValue); - + return conf.get(KEY_PREFIX + key, defaultValue); } @Override public boolean getBoolean(String key, boolean defaultValue) { return conf.getBoolean(KEY_PREFIX + key, defaultValue); } -} +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java deleted file mode 100644 index b90d191..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.trace; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.htrace.SpanReceiver; -import org.apache.htrace.SpanReceiverBuilder; -import org.apache.htrace.Trace; - -/** - * This class provides functions for reading the names of SpanReceivers from - * hbase-site.xml, adding those SpanReceivers to the Tracer, and closing those - * SpanReceivers when appropriate. - */ -@InterfaceAudience.Private -public class SpanReceiverHost { - public static final String SPAN_RECEIVERS_CONF_KEY = "hbase.trace.spanreceiver.classes"; - private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class); - private Collection receivers; - private Configuration conf; - private boolean closed = false; - - private static enum SingletonHolder { - INSTANCE; - Object lock = new Object(); - SpanReceiverHost host = null; - } - - public static SpanReceiverHost getInstance(Configuration conf) { - synchronized (SingletonHolder.INSTANCE.lock) { - if (SingletonHolder.INSTANCE.host != null) { - return SingletonHolder.INSTANCE.host; - } - - SpanReceiverHost host = new SpanReceiverHost(conf); - host.loadSpanReceivers(); - SingletonHolder.INSTANCE.host = host; - return SingletonHolder.INSTANCE.host; - } - - } - - SpanReceiverHost(Configuration conf) { - receivers = new HashSet(); - this.conf = conf; - } - - /** - * Reads the names of classes specified in the {@code hbase.trace.spanreceiver.classes} property - * and instantiates and registers them with the Tracer. - * - */ - public void loadSpanReceivers() { - String[] receiverNames = conf.getStrings(SPAN_RECEIVERS_CONF_KEY); - if (receiverNames == null || receiverNames.length == 0) { - return; - } - - SpanReceiverBuilder builder = new SpanReceiverBuilder(new HBaseHTraceConfiguration(conf)); - for (String className : receiverNames) { - className = className.trim(); - - SpanReceiver receiver = builder.spanReceiverClass(className).build(); - if (receiver != null) { - receivers.add(receiver); - LOG.info("SpanReceiver " + className + " was loaded successfully."); - } - } - for (SpanReceiver rcvr : receivers) { - Trace.addReceiver(rcvr); - } - } - - /** - * Calls close() on all SpanReceivers created by this SpanReceiverHost. - */ - public synchronized void closeReceivers() { - if (closed) return; - closed = true; - for (SpanReceiver rcvr : receivers) { - try { - rcvr.close(); - } catch (IOException e) { - LOG.warn("Unable to close SpanReceiver correctly: " + e.getMessage(), e); - } - } - } -} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java index 3cae4d2..a6d0f01 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; /** * Common helpers for testing HBase that do not depend on specific server/etc. things. - * @see {@link HBaseTestingUtility} + * Like HBaseTestingUtility but with just common-module dependencies. * */ @InterfaceAudience.Public diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index f820193..4d8d57d 100644 --- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -33,9 +33,8 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.Addressing; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; - +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import java.io.IOException; import java.net.InetSocketAddress; @@ -133,8 +132,10 @@ public class MemcachedBlockCache implements BlockCache { boolean repeat, boolean updateCacheMetrics) { // Assume that nothing is the block cache HFileBlock result = null; - - try (TraceScope traceScope = Trace.startSpan("MemcachedBlockCache.getBlock")) { + Tracer tracer = Tracer.curThreadTracer(); + TraceScope traceScope = tracer == null? null: + tracer.newScope("MemcachedBlockCache.getBlock"); + try { result = client.get(cacheKey.toString(), tc); } catch (Exception e) { // Catch a pretty broad set of exceptions to limit any changes in the memecache client @@ -146,6 +147,7 @@ public class MemcachedBlockCache implements BlockCache { } result = null; } finally { + traceScope.close(); // Update stats if this request doesn't have it turned off 100% of the time if (updateCacheMetrics) { if (result == null) { @@ -155,8 +157,6 @@ public class MemcachedBlockCache implements BlockCache { } } } - - return result; } diff --git a/hbase-it/pom.xml b/hbase-it/pom.xml index c36be34..d28d4e0 100644 --- a/hbase-it/pom.xml +++ b/hbase-it/pom.xml @@ -255,7 +255,7 @@ org.apache.htrace - htrace-core + htrace-core4 diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java index 437f200..9af7aae 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mttr/IntegrationTestMTTR.java @@ -63,9 +63,10 @@ import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LoadTestTool; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import org.apache.htrace.impl.AlwaysSampler; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -158,6 +159,9 @@ public class IntegrationTestMTTR { @BeforeClass public static void setUp() throws Exception { + // TODO: Tracing had the ALWAYS Sampler turned on. The refactor to come up on htrace 4.0 did + // not add in this. Fix. + // Set up the integration test util if (util == null) { util = new IntegrationTestingUtility(); @@ -357,7 +361,7 @@ public class IntegrationTestMTTR { */ private static class TimingResult { DescriptiveStatistics stats = new DescriptiveStatistics(); - ArrayList traces = new ArrayList(10); + ArrayList traces = new ArrayList(10); /** * Add a result to this aggregate result. @@ -367,7 +371,7 @@ public class IntegrationTestMTTR { public void addResult(long time, Span span) { stats.addValue(TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS)); if (TimeUnit.SECONDS.convert(time, TimeUnit.NANOSECONDS) >= 1) { - traces.add(span.getTraceId()); + traces.add(span.getSpanId()); } } @@ -412,7 +416,8 @@ public class IntegrationTestMTTR { long start = System.nanoTime(); TraceScope scope = null; try { - scope = Trace.startSpan(getSpanName(), AlwaysSampler.INSTANCE); + scope = Tracer.curThreadTracer() == null? null: + Tracer.curThreadTracer().newScope(getSpanName()); boolean actionResult = doAction(); if (actionResult && future.isDone()) { numAfterDone++; diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java index 3845846..80431a2 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java @@ -138,7 +138,7 @@ public class IntegrationTestZKAndFSPermissions extends AbstractHBaseTool { private void testZNodeACLs() throws IOException, KeeperException, InterruptedException { ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, "IntegrationTestZnodeACLs", null); - RecoverableZooKeeper zk = ZKUtil.connect(this.conf, watcher); + RecoverableZooKeeper zk = ZKUtil.connect(this.conf, watcher, null); String baseZNode = watcher.baseZNode; diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java index f325aac..a93cf11 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java @@ -35,9 +35,8 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.ToolRunner; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -61,9 +60,10 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { private IntegrationTestingUtility util; private Random random = new Random(); private Admin admin; - private SpanReceiverHost receiverHost; public static void main(String[] args) throws Exception { + // TODO: Tracing had the ALWAYS Sampler turned on. The refactor to come up on htrace 4.0 did + // not add in this. Fix. Configuration configuration = HBaseConfiguration.create(); IntegrationTestingUtility.setUseDistributedCluster(configuration); IntegrationTestSendTraceRequests tool = new IntegrationTestSendTraceRequests(); @@ -108,13 +108,11 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { service.shutdown(); service.awaitTermination(100, TimeUnit.SECONDS); Thread.sleep(90000); - receiverHost.closeReceivers(); util.restoreCluster(); util = null; } private void doScans(ExecutorService service, final LinkedBlockingQueue rks) { - for (int i = 0; i < 100; i++) { Runnable runnable = new Runnable() { private TraceScope innerScope = null; @@ -123,7 +121,8 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { public void run() { ResultScanner rs = null; try { - innerScope = Trace.startSpan("Scan", Sampler.ALWAYS); + Tracer tracer = Tracer.curThreadTracer(); + innerScope = tracer == null? null: tracer.newScope("Scan"); Table ht = util.getConnection().getTable(tableName); Scan s = new Scan(); s.setStartRow(Bytes.toBytes(rowKeyQueue.take())); @@ -144,9 +143,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { } catch (IOException e) { e.printStackTrace(); - innerScope.getSpan().addKVAnnotation( - Bytes.toBytes("exception"), - Bytes.toBytes(e.getClass().getSimpleName())); + innerScope.getSpan().addKVAnnotation("exception", e.getClass().getSimpleName()); } catch (Exception e) { } finally { @@ -182,7 +179,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { long accum = 0; for (int x = 0; x < 5; x++) { try { - innerScope = Trace.startSpan("gets", Sampler.ALWAYS); + innerScope = Tracer.curThreadTracer().newScope("gets"); long rk = rowKeyQueue.take(); Result r1 = ht.get(new Get(Bytes.toBytes(rk))); if (r1 != null) { @@ -212,7 +209,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { private void createTable() throws IOException { TraceScope createScope = null; try { - createScope = Trace.startSpan("createTable", Sampler.ALWAYS); + createScope = Tracer.curThreadTracer().newScope("createTable"); util.createTable(tableName, familyName); } finally { if (createScope != null) createScope.close(); @@ -224,7 +221,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { try { if (admin.tableExists(tableName)) { - deleteScope = Trace.startSpan("deleteTable", Sampler.ALWAYS); + deleteScope = Tracer.curThreadTracer().newScope("deleteTable"); util.deleteTable(tableName); } } finally { @@ -237,7 +234,8 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); byte[] value = new byte[300]; for (int x = 0; x < 5000; x++) { - TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS); + Tracer tracer = Tracer.curThreadTracer(); + TraceScope traceScope = tracer == null? null: tracer.newScope("insertData"); try { for (int i = 0; i < 5; i++) { long rk = random.nextLong(); @@ -253,7 +251,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { admin.flush(tableName); } } finally { - traceScope.close(); + if (traceScope != null) traceScope.close(); } } admin.flush(tableName); @@ -280,6 +278,8 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { Configuration conf = new Configuration(util.getConfiguration()); conf.setBoolean("hbase.zipkin.is-in-client-mode", true); - this.receiverHost = SpanReceiverHost.getInstance(conf); + /** this.receiverHost = SpanReceiverHost.getInstance(conf); + * TODO: FIX!! What does this mean in htrace 4.0? + */ } } diff --git a/hbase-protocol/src/main/protobuf/Tracing.proto b/hbase-protocol/src/main/protobuf/Tracing.proto index 5a64cfc..b129504 100644 --- a/hbase-protocol/src/main/protobuf/Tracing.proto +++ b/hbase-protocol/src/main/protobuf/Tracing.proto @@ -22,12 +22,15 @@ option java_outer_classname = "TracingProtos"; option java_generate_equals_and_hash = true; option optimize_for = SPEED; -//Used to pass through the information necessary to continue -//a trace after an RPC is made. All we need is the traceid -//(so we know the overarching trace this message is a part of), and -//the id of the current span when this message was sent, so we know -//what span caused the new span we will create when this message is received. +// Used to pass through the information necessary to continue +// a trace after an RPC is made. All we need is the traceid +// (so we know the overarching trace this message is a part of). +// Pre-4.0.0 htrace, we had parent and trace id. In 4.0.0, there is one id only and it is +// 128 bits. Rather than add new fields or change field names, just set the high and low of +// the 128 integer into the old parent id and trace id fields. message RPCTInfo { + // In 4.0.0 htrace, trace_id holds the high part of the 128 traceid optional int64 trace_id = 1; + // In 4.0.0 htrace, trace_id holds the low part of the 128 traceid optional int64 parent_id = 2; } diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index 94f0b72..eec7c83 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -539,7 +539,7 @@ org.apache.htrace - htrace-core + htrace-core4 com.lmax diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index 9c292cc..809b7ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -113,7 +113,8 @@ public class LocalHBaseCluster { } @SuppressWarnings("unchecked") - private static Class getRegionServerImplementation(final Configuration conf) { + private static Class + getRegionServerImplementation(final Configuration conf) { return (Class)conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); } @@ -460,7 +461,8 @@ public class LocalHBaseCluster { * @return True if a 'local' address in hbase.master value. */ public static boolean isLocal(final Configuration c) { - boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); + boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, + HConstants.DEFAULT_CLUSTER_DISTRIBUTED); return(mode == HConstants.CLUSTER_IS_LOCAL); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java index 1a8b847..36cd1b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java @@ -25,9 +25,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Server; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; /** * Abstract base class for all HBase event handlers. Subclasses should @@ -68,13 +68,13 @@ public abstract class EventHandler implements Runnable, Comparable { // Time to wait for events to happen, should be kept short protected int waitingTimeForEvents; - private final Span parent; + private final SpanId parentSpanId; /** * Default base class constructor. */ public EventHandler(Server server, EventType eventType) { - this.parent = Trace.currentSpan(); + this.parentSpanId = Tracer.getCurrentSpanId(); this.server = server; this.eventType = eventType; seqid = seqids.incrementAndGet(); @@ -98,13 +98,17 @@ public abstract class EventHandler implements Runnable, Comparable { } public void run() { - TraceScope chunk = Trace.startSpan(this.getClass().getSimpleName(), parent); + Tracer tracer = Tracer.curThreadTracer(); + TraceScope traceScope = tracer == null? null: + this.parentSpanId.isValid()? + tracer.newScope(this.getClass().getSimpleName(), this.parentSpanId): + tracer.newScope(this.getClass().getSimpleName()); try { process(); } catch(Throwable t) { handleException(t); } finally { - chunk.close(); + if (traceScope != null) traceScope.close(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 930f42a..b7b6f8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -58,8 +58,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.hbase.util.ObjectIntPair; import org.apache.hadoop.io.WritableUtils; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import com.google.common.annotations.VisibleForTesting; @@ -1463,7 +1463,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { boolean useLock = false; IdLock.Entry lockEntry = null; - TraceScope traceScope = Trace.startSpan("HFileReaderImpl.readBlock"); + Tracer tracer = Tracer.curThreadTracer(); + TraceScope traceScope = tracer == null? null: tracer.newScope("HFileReaderImpl.readBlock"); try { while (true) { // Check cache for block. If found return. @@ -1476,7 +1477,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction, updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); if (cachedBlock != null) { - if (Trace.isTracing()) { + if (traceScope != null) { traceScope.getSpan().addTimelineAnnotation("blockCacheHit"); } assert cachedBlock.isUnpacked() : "Packed block leak."; @@ -1504,7 +1505,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // Carry on, please load. } - if (Trace.isTracing()) { + if (traceScope != null) { traceScope.getSpan().addTimelineAnnotation("blockCacheMiss"); } // Load block from filesystem. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 5b52521..aa66aae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -28,8 +28,9 @@ import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import com.google.protobuf.Message; @@ -102,8 +103,11 @@ public class CallRunner { throw new ServerNotRunningYetException("Server " + (address != null ? address : "(channel closed)") + " is not running yet"); } - if (call.tinfo != null) { - traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); + Span span = Tracer.getCurrentSpan(); + if (span != null) { + Tracer tracer = Tracer.curThreadTracer(); + traceScope = tracer == null? null: + tracer.newScope(call.toTraceString(), span.getSpanId()); } // make the call resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 402bca0..a66ed93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; +import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AuthMethod; @@ -125,7 +126,6 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; import org.codehaus.jackson.map.ObjectMapper; -import org.apache.htrace.TraceInfo; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.BlockingService; @@ -310,7 +310,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // set at call completion protected long size; // size of current call protected boolean isError; - protected TraceInfo tinfo; + protected RPCTInfo tinfo; private ByteBuffer cellBlock = null; private User user; @@ -319,7 +319,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, - long size, TraceInfo tinfo, final InetAddress remoteAddress) { + long size, RPCTInfo tinfo, final InetAddress remoteAddress) { this.id = id; this.service = service; this.md = md; @@ -1894,8 +1894,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return; } - TraceInfo traceInfo = header.hasTraceInfo() - ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) + RPCTInfo traceInfo = header.hasTraceInfo() + ? RPCTInfo.newBuilder().setTraceId(header.getTraceInfo().getTraceId()). + setParentId(header.getTraceInfo().getParentId()).build() : null; Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, traceInfo, this.addr); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index cc8a35c..c48b3a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -774,7 +774,7 @@ public class TableMapReduceUtil { io.netty.channel.Channel.class, com.google.protobuf.Message.class, com.google.common.collect.Lists.class, - org.apache.htrace.Trace.class, + org.apache.htrace.core.Tracer.class, com.yammer.metrics.core.MetricsRegistry.class); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java index d34f25e..e105689 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.htrace.Trace; +import org.apache.htrace.core.Tracer; /** * Handler to run disable of a table. @@ -205,11 +205,15 @@ public class DisableTableHandler extends EventHandler { continue; } final HRegionInfo hri = region; - pool.execute(Trace.wrap("DisableTableHandler.BulkDisabler",new Runnable() { + Tracer tracer = Tracer.curThreadTracer(); + Runnable runnable = new Runnable() { public void run() { assignmentManager.unassign(hri); } - })); + }; + pool.execute(tracer != null? + tracer.wrap(runnable, "DisableTableHandler.BulkDisabler"): + runnable); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index 716897f..6420a12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.htrace.Trace; +import org.apache.htrace.core.Tracer; @InterfaceAudience.Private public class DisableTableProcedure @@ -509,12 +509,15 @@ public class DisableTableProcedure && !regionStates.isRegionInState(region, RegionState.State.FAILED_CLOSE)) { continue; } - pool.execute(Trace.wrap("DisableTableHandler.BulkDisabler", new Runnable() { + Tracer tracer = Tracer.curThreadTracer(); + Runnable runnable = new Runnable() { @Override public void run() { assignmentManager.unassign(region); } - })); + }; + pool.execute(tracer == null? runnable: + tracer.wrap(runnable, "DisableTableHandler.BulkDisabler")); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 89ae0d1..a6afbb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -47,7 +47,8 @@ import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CollectionBackedScanner; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.htrace.Trace; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.Tracer; /** * The MemStore holds in-memory modifications to the Store. Modifications @@ -640,8 +641,9 @@ public class DefaultMemStore implements MemStore { this.snapshotAllocatorAtCreation = snapshotAllocator; this.snapshotAllocatorAtCreation.incScannerCount(); } - if (Trace.isTracing() && Trace.currentSpan() != null) { - Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); + Span span = Tracer.getCurrentSpan(); + if (span != null) { + span.addTimelineAnnotation("Creating MemStoreScanner"); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 484d5ee..91ca6df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -182,8 +182,8 @@ import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; @@ -5162,8 +5162,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi TraceScope traceScope = null; // If we're tracing start a span to show how long this took. - if (Trace.isTracing()) { - traceScope = Trace.startSpan("HRegion.getRowLock"); + Tracer tracer = Tracer.curThreadTracer(); + if (tracer != null) { + traceScope = tracer.newScope("HRegion.getRowLock"); traceScope.getSpan().addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock")); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 211fed5..245a526 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -142,7 +142,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.trace.SpanReceiverHost; +import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; @@ -173,6 +173,8 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.metrics.util.MBeanUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.htrace.core.Tracer; +import org.apache.htrace.core.TracerId; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; @@ -340,7 +342,7 @@ public class HRegionServer extends HasThread implements public static final String REGIONSERVER = "regionserver"; MetricsRegionServer metricsRegionServer; - private SpanReceiverHost spanReceiverHost; + private final Tracer tracer; /** * ChoreService used to schedule tasks that we want to run periodically @@ -579,13 +581,18 @@ public class HRegionServer extends HasThread implements this.tableDescriptors = getFsTableDescriptors(); service = new ExecutorService(getServerName().toShortString()); - spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); + + // Set up tracer. Add ServerName to the TraceId so it comes out as "RegionServer,host,port,ts" + this.conf.set(HBaseHTraceConfiguration.KEY_PREFIX + TracerId.TRACER_ID_KEY, + "%{tname}," + this.serverName.toString()); + this.tracer = new Tracer.Builder("RegionServer"). + conf(new HBaseHTraceConfiguration(this.conf)).build(); // Some unit tests don't need a cluster, so no zookeeper at all if (!conf.getBoolean("hbase.testing.nocluster", false)) { // Open connection to zookeeper and set primary watcher zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" + - rpcServices.isa.getPort(), this, canCreateBaseZNode()); + rpcServices.isa.getPort(), this, this.tracer, canCreateBaseZNode()); this.csm = (BaseCoordinatedStateManager) csm; this.csm.initialize(this); @@ -2162,10 +2169,6 @@ public class HRegionServer extends HasThread implements if (this.cacheFlusher != null) { this.cacheFlusher.join(); } - - if (this.spanReceiverHost != null) { - this.spanReceiverHost.closeReceivers(); - } if (this.walRoller != null) { Threads.shutdown(this.walRoller.getThread()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 40c5046..393bac3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -57,8 +57,8 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import com.google.common.base.Preconditions; @@ -579,11 +579,11 @@ class MemStoreFlusher implements FlushRequester { * amount of memstore consumption. */ public void reclaimMemStoreMemory() { - TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory"); + Tracer tracer = Tracer.curThreadTracer(); + TraceScope scope = tracer == null? null: + tracer.newScope("MemStoreFluser.reclaimMemStoreMemory"); if (isAboveHighWaterMark()) { - if (Trace.isTracing()) { - scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark."); - } + if (scope != null) scope.addTimelineAnnotation("Force Flush. We're above high water mark."); long start = EnvironmentEdgeManager.currentTime(); synchronized (this.blockSignal) { boolean blocked = false; @@ -630,7 +630,7 @@ class MemStoreFlusher implements FlushRequester { } else if (isAboveLowWaterMark()) { wakeupFlushThread(); } - scope.close(); + if (scope != null) scope.close(); } @Override public String toString() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 0d1ff0d..b7b1f3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -80,10 +81,10 @@ import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; -import org.apache.htrace.NullScope; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import com.google.common.annotations.VisibleForTesting; import com.lmax.disruptor.BlockingWaitStrategy; @@ -278,8 +279,6 @@ public class FSHLog implements WAL { private final int slowSyncNs; - private final static Object [] NO_ARGS = new Object []{}; - // If live datanode count is lower than the default replicas value, // RollWriter will be triggered in each sync(So the RollWriter will be // triggered one by one in a short time). Using it as a workaround to slow @@ -348,6 +347,11 @@ public class FSHLog implements WAL { /** + * Tracer to use. Non-null so all in WAL subsystem can do away with null checks. + */ + private final Tracer tracer; + + /** * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. * Throws an IllegalArgumentException if used to compare paths from different wals. */ @@ -439,6 +443,9 @@ public class FSHLog implements WAL { this.fullPathLogDir = new Path(rootDir, logDir); this.fullPathArchiveDir = new Path(rootDir, archiveDir); this.conf = conf; + Tracer t = Tracer.curThreadTracer(); + this.tracer = t != null? t: + new Tracer.Builder("FSHLog").conf(new HBaseHTraceConfiguration(conf)).build(); if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) { throw new IOException("Unable to mkdir " + fullPathLogDir); @@ -640,10 +647,11 @@ public class FSHLog implements WAL { * @param nextWriter * @param startTimeNanos */ - private void preemptiveSync(final ProtobufLogWriter nextWriter) { + private void preemptiveSync(final ProtobufLogWriter nextWriter, final TraceScope scope) { long startTimeNanos = System.nanoTime(); try { nextWriter.sync(); + if (scope != null) scope.addTimelineAnnotation("preemptive"); postSync(System.nanoTime() - startTimeNanos, 0); } catch (IOException e) { // optimization failed, no need to abort here. @@ -666,7 +674,8 @@ public class FSHLog implements WAL { LOG.debug("WAL closing. Skipping rolling of writer"); return regionsToFlush; } - TraceScope scope = Trace.startSpan("FSHLog.rollWriter"); + Tracer tracer = Tracer.curThreadTracer(); + TraceScope scope = tracer == null? null: tracer.newScope("FSHLog.rollWriter"); try { Path oldPath = getOldPath(); Path newPath = getNewPath(); @@ -677,7 +686,7 @@ public class FSHLog implements WAL { nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream(); // If a ProtobufLogWriter, go ahead and try and sync to force setup of pipeline. // If this fails, we just keep going.... it is an optimization, not the end of the world. - preemptiveSync((ProtobufLogWriter)nextWriter); + preemptiveSync((ProtobufLogWriter)nextWriter, scope); } tellListenersAboutPreLogRoll(oldPath, newPath); // NewPath could be equal to oldPath if replaceWriter fails. @@ -690,8 +699,7 @@ public class FSHLog implements WAL { } } finally { closeBarrier.endOp(); - assert scope == NullScope.INSTANCE || !scope.isDetached(); - scope.close(); + if (scope != null) scope.close(); } return regionsToFlush; } finally { @@ -805,7 +813,7 @@ public class FSHLog implements WAL { SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)? null: this.ringBufferEventHandler.attainSafePoint(); afterCreatingZigZagLatch(); - TraceScope scope = Trace.startSpan("FSHFile.replaceWriter"); + TraceScope scope = this.tracer.newScope("FSHFile.replaceWriter"); try { // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the // ring buffer between the above notification of writer that we want it to go to @@ -814,7 +822,7 @@ public class FSHLog implements WAL { // to come back. Cleanup this syncFuture down below after we are ready to run again. try { if (zigzagLatch != null) { - Trace.addTimelineAnnotation("awaiting safepoint"); + if (scope != null) scope.addTimelineAnnotation("awaiting safepoint"); syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer()); } } catch (FailedSyncBeforeLogCloseException e) { @@ -828,9 +836,9 @@ public class FSHLog implements WAL { // TODO: This is close is inline with critical section. Should happen in background? try { if (this.writer != null) { - Trace.addTimelineAnnotation("closing writer"); + if (scope != null) scope.addTimelineAnnotation("closing writer"); this.writer.close(); - Trace.addTimelineAnnotation("writer closed"); + if (scope != null) scope.addTimelineAnnotation("writer closed"); } this.closeErrorCount.set(0); } catch (IOException ioe) { @@ -885,7 +893,7 @@ public class FSHLog implements WAL { } } } finally { - scope.close(); + if (scope != null) scope.close(); } } return newPath; @@ -1058,9 +1066,6 @@ public class FSHLog implements WAL { public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException { if (this.closed) throw new IOException("Cannot append; log is closed"); - // Make a trace scope for the append. It is closed on other side of the ring buffer by the - // single consuming thread. Don't have to worry about it. - TraceScope scope = Trace.startSpan("FSHLog.append"); // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need // all this to make a key and then below to append the edit, we need to carry htd, info, @@ -1073,7 +1078,7 @@ public class FSHLog implements WAL { // edit with its edit/sequence id. // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore); - truck.loadPayload(entry, scope.detach()); + truck.loadPayload(entry); } finally { this.disruptor.getRingBuffer().publish(sequence); } @@ -1100,6 +1105,9 @@ public class FSHLog implements WAL { // Keep around last exception thrown. Clear on successful sync. private final BlockingQueue syncFutures; + // Rather than create each time, create once and reuse. + private SpanId [] oneSpanId = new SpanId[1]; + /** * UPDATE! * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, @@ -1170,6 +1178,19 @@ public class FSHLog implements WAL { } /** + * @return SpanIds of all futures in the queue that are less than currentSequence + */ + private List getSpanIds(final long currentSequence) { + List spanIds = null; + for (SyncFuture f: this.syncFutures) { + if (f.getRingBufferSequence() > currentSequence) break; + if (spanIds == null) spanIds = new ArrayList(this.syncFutures.size()); + spanIds.add(f.getSpanId()); + } + return spanIds; + } + + /** * @param sequence The sequence we ran the filesystem sync against. * @return Current highest synced sequence. */ @@ -1212,15 +1233,28 @@ public class FSHLog implements WAL { } break; } - // I got something. Lets run. Save off current sequence number in case it changes - // while we run. - TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); + // I got something. Save off current sequence number in case it changes while we run. + + // Do trace work if enabled. + TraceScope traceScope = null; + if (Tracer.getCurrentSpan() != null) { + traceScope = tracer.newScope("syncer"); + // Set parents. We have at least takeSyncFuture as parent but likely more. + List spanIds = getSpanIds(currentSequence); + if (spanIds != null) { + spanIds.add(takeSyncFuture.getSpanId()); + traceScope.getSpan().setParents(spanIds.toArray(this.oneSpanId)); + } else { + this.oneSpanId[0] = takeSyncFuture.getSpanId(); + traceScope.getSpan().setParents(this.oneSpanId); + } + } long start = System.nanoTime(); Throwable lastException = null; try { - Trace.addTimelineAnnotation("syncing writer"); + if (traceScope != null) traceScope.addTimelineAnnotation("syncing writer"); writer.sync(); - Trace.addTimelineAnnotation("writer synced"); + if (traceScope != null) traceScope.addTimelineAnnotation("writer synced"); currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { LOG.error("Error syncing, request close of WAL", e); @@ -1229,8 +1263,6 @@ public class FSHLog implements WAL { LOG.warn("UNEXPECTED", e); lastException = e; } finally { - // reattach the span to the future before releasing. - takeSyncFuture.setSpan(scope.detach()); // First release what we 'took' from the queue. syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); // Can we release other syncs? @@ -1321,12 +1353,8 @@ public class FSHLog implements WAL { } private SyncFuture publishSyncOnRingBuffer() { - return publishSyncOnRingBuffer(null); - } - - private SyncFuture publishSyncOnRingBuffer(Span span) { long sequence = this.disruptor.getRingBuffer().next(); - SyncFuture syncFuture = getSyncFuture(sequence, span); + SyncFuture syncFuture = getSyncFuture(sequence); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); truck.loadPayload(syncFuture); @@ -1337,15 +1365,15 @@ public class FSHLog implements WAL { } // Sync all known transactions - private Span publishSyncThenBlockOnCompletion(Span span) throws IOException { - return blockOnSync(publishSyncOnRingBuffer(span)); + private SyncFuture publishSyncThenBlockOnCompletion() throws IOException { + return blockOnSync(publishSyncOnRingBuffer()); } - private Span blockOnSync(final SyncFuture syncFuture) throws IOException { + private SyncFuture blockOnSync(final SyncFuture syncFuture) throws IOException { // Now we have published the ringbuffer, halt the current thread until we get an answer back. try { syncFuture.get(); - return syncFuture.getSpan(); + return syncFuture; } catch (InterruptedException ie) { LOG.warn("Interrupted", ie); throw convertInterruptedExceptionToIOException(ie); @@ -1361,13 +1389,13 @@ public class FSHLog implements WAL { return ioe; } - private SyncFuture getSyncFuture(final long sequence, Span span) { + private SyncFuture getSyncFuture(final long sequence) { SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); if (syncFuture == null) { syncFuture = new SyncFuture(); this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture); } - return syncFuture.reset(sequence, span); + return syncFuture.reset(sequence, Tracer.getCurrentSpanId()); } private void postSync(final long timeInNanos, final int handlerSyncs) { @@ -1376,7 +1404,8 @@ public class FSHLog implements WAL { new StringBuilder().append("Slow sync cost: ") .append(timeInNanos / 1000000).append(" ms, current pipeline: ") .append(Arrays.toString(getPipeLine())).toString(); - Trace.addTimelineAnnotation(msg); + Span span = Tracer.getCurrentSpan(); + if (span != null) span.addTimelineAnnotation(msg); LOG.info(msg); } if (!listeners.isEmpty()) { @@ -1427,12 +1456,8 @@ public class FSHLog implements WAL { @Override public void sync() throws IOException { - TraceScope scope = Trace.startSpan("FSHLog.sync"); - try { - scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); - } finally { - assert scope == NullScope.INSTANCE || !scope.isDetached(); - scope.close(); + try (TraceScope scope = this.tracer.newScope("FSHLog.sync");) { + publishSyncThenBlockOnCompletion(); } } @@ -1442,12 +1467,8 @@ public class FSHLog implements WAL { // Already sync'd. return; } - TraceScope scope = Trace.startSpan("FSHLog.sync"); - try { - scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); - } finally { - assert scope == NullScope.INSTANCE || !scope.isDetached(); - scope.close(); + try (TraceScope scope = this.tracer.newScope("FSHLog.sync txid=" + txid)) { + publishSyncThenBlockOnCompletion(); } } @@ -1721,7 +1742,6 @@ public class FSHLog implements WAL { // Force flush of syncs if we are carrying a full complement of syncFutures. if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true; } else if (truck.hasFSWALEntryPayload()) { - TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload()); try { FSWALEntry entry = truck.unloadFSWALEntryPayload(); if (this.exception != null) { @@ -1740,9 +1760,6 @@ public class FSHLog implements WAL { this.exception = e; // Return to keep processing events coming off the ringbuffer return; - } finally { - assert scope == NullScope.INSTANCE || !scope.isDetached(); - scope.close(); // append scope is complete } } else { // What is this if not an append or sync. Fail all up to this!!! diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java index 25c2111..4334974 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java @@ -25,7 +25,7 @@ import com.lmax.disruptor.EventFactory; /** * A 'truck' to carry a payload across the {@link FSHLog} ring buffer from Handler to WAL. - * Has EITHER a {@link FSWALEntry} for making an append OR it has a {@link SyncFuture} to + * Has EITHER a FSWALEntry for making an append OR it has a SyncFuture to * represent a 'sync' invocation. Truck instances are reused by the disruptor when it gets * around to it so their payload references must be discarded on consumption to release them * to GC. @@ -39,17 +39,10 @@ class RingBufferTruck { private FSWALEntry entry; /** - * The tracing span for this entry. Can be null. - * TODO: Fix up tracing. - */ - private Span span; - - /** * Load the truck with a {@link FSWALEntry} and associated {@link Span}. */ - void loadPayload(final FSWALEntry entry, final Span span) { + void loadPayload(final FSWALEntry entry) { this.entry = entry; - this.span = span; this.syncFuture = null; } @@ -59,7 +52,6 @@ class RingBufferTruck { void loadPayload(final SyncFuture syncFuture) { this.syncFuture = syncFuture; this.entry = null; - this.span = null; } /** @@ -97,15 +89,6 @@ class RingBufferTruck { } /** - * Unload the truck of its {@link Span} payload. The internal reference is released. - */ - Span unloadSpanPayload() { - Span ret = this.span; - this.span = null; - return ret; - } - - /** * Factory for making a bunch of these. Needed by the ringbuffer/disruptor. */ final static EventFactory EVENT_FACTORY = new EventFactory() { @@ -113,4 +96,4 @@ class RingBufferTruck { return new RingBufferTruck(); } }; -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 7de8367..d6e9d0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -21,7 +21,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.htrace.Span; +import org.apache.htrace.core.SpanId; /** * A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till @@ -74,9 +74,9 @@ class SyncFuture { private Thread t; /** - * Optionally carry a disconnected scope to the SyncRunner. + * Optionally carry current SpanId. */ - private Span span; + private SpanId spanId = SpanId.INVALID; /** * Call this method to clear old usage and get it ready for new deploy. Call @@ -94,17 +94,17 @@ class SyncFuture { * this method even if it is being used for the first time. * * @param sequence sequenceId from this Future's position in the RingBuffer - * @param span curren span, detached from caller. Don't forget to attach it when + * @param scope current TraceScope, detached from caller. Don't forget to attach it when * resuming after a call to {@link #get()}. * @return this */ - synchronized SyncFuture reset(final long sequence, Span span) { + synchronized SyncFuture reset(final long sequence, final SpanId spanId) { if (t != null && t != Thread.currentThread()) throw new IllegalStateException(); t = Thread.currentThread(); if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread()); this.doneSequence = NOT_DONE; this.ringBufferSequence = sequence; - this.span = span; + this.spanId = spanId; return this; } @@ -118,21 +118,16 @@ class SyncFuture { } /** - * Retrieve the {@code span} instance from this Future. EventHandler calls - * this method to continue the span. Thread waiting on this Future musn't call - * this method until AFTER calling {@link #get()} and the future has been + * Retrieve the {@code SpanId} instance from this Future. Thread waiting on this Future musn't + * call this method until AFTER calling {@link #get()} and the future has been * released back to the originating thread. */ - synchronized Span getSpan() { - return this.span; + synchronized SpanId getSpanId() { + return this.spanId; } - /** - * Used to re-attach a {@code span} to the Future. Called by the EventHandler - * after a it has completed processing and detached the span from its scope. - */ - synchronized void setSpan(Span span) { - this.span = span; + synchronized void setSpanId(SpanId spanId) { + this.spanId = spanId; } /** @@ -190,4 +185,4 @@ class SyncFuture { synchronized Throwable getThrowable() { return this.throwable; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 27f019a..cf26bfb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -221,4 +221,4 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint } } } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 006c3e7..75ce7e1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -358,7 +358,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { /** * Returns this classes's instance of {@link Configuration}. Be careful how - * you use the returned Configuration since {@link HConnection} instances + * you use the returned Configuration since HConnection instances * can be shared. The Map of HConnections is keyed by the Configuration. If * say, a Connection was being used against a cluster that had been shutdown, * see {@link #shutdownMiniCluster()}, then the Connection will no longer @@ -583,7 +583,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Start a minidfscluster. * @param servers How many DNs to start. * @throws Exception - * @see {@link #shutdownMiniDFSCluster()} + * @see #shutdownMiniCluster() * @return The mini dfs cluster created. */ public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { @@ -598,7 +598,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * datanodes will have the same host name. * @param hosts hostnames DNs to run on. * @throws Exception - * @see {@link #shutdownMiniDFSCluster()} + * @see #shutdownMiniDFSCluster() * @return The mini dfs cluster created. */ public MiniDFSCluster startMiniDFSCluster(final String hosts[]) @@ -616,7 +616,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param servers How many DNs to start. * @param hosts hostnames DNs to run on. * @throws Exception - * @see {@link #shutdownMiniDFSCluster()} + * @see #shutdownMiniDFSCluster() * @return The mini dfs cluster created. */ public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[]) @@ -825,7 +825,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** - * Shuts down zk cluster created by call to {@link #startMiniZKCluster(File)} + * Shuts down zk cluster created by call to #startMiniZKCluster(File) * or does nothing. * @throws IOException * @see #startMiniZKCluster() @@ -841,7 +841,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Start up a minicluster of hbase, dfs, and zookeeper. * @throws Exception * @return Mini hbase cluster instance created. - * @see {@link #shutdownMiniDFSCluster()} + * @see #shutdownMiniDFSCluster() */ public MiniHBaseCluster startMiniCluster() throws Exception { return startMiniCluster(1, 1); @@ -853,7 +853,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * (will overwrite if dir already exists) * @throws Exception * @return Mini hbase cluster instance created. - * @see {@link #shutdownMiniDFSCluster()} + * @see #shutdownMiniDFSCluster() */ public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create) throws Exception { @@ -870,7 +870,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise * bind errors. * @throws Exception - * @see {@link #shutdownMiniCluster()} + * @see #shutdownMiniCluster() * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numSlaves) @@ -882,7 +882,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * Start minicluster. Whether to create a new root or data dir path even if such a path * has been created earlier is decided based on flag create * @throws Exception - * @see {@link #shutdownMiniCluster()} + * @see #shutdownMiniCluster() * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numMasters, @@ -894,7 +894,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { /** * start minicluster * @throws Exception - * @see {@link #shutdownMiniCluster()} + * @see #shutdownMiniCluster() * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numMasters, @@ -931,7 +931,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * If you start MiniDFSCluster without host names, * all instances of the datanodes will have the same host name. * @throws Exception - * @see {@link #shutdownMiniCluster()} + * @see #shutdownMiniCluster() * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numMasters, @@ -973,7 +973,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param regionserverClass The class to use as HRegionServer, or null for * default * @throws Exception - * @see {@link #shutdownMiniCluster()} + * @see #shutdownMiniCluster() * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numMasters, @@ -1054,7 +1054,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @return Reference to the hbase mini hbase cluster. * @throws IOException * @throws InterruptedException - * @see {@link #startMiniCluster()} + * @see #startMiniCluster() */ public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves, Class masterClass, @@ -1136,7 +1136,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { /** * Stops mini hbase, zk, and hdfs clusters. * @throws IOException - * @see {@link #startMiniCluster(int)} + * @see #startMiniCluster(int) */ public void shutdownMiniCluster() throws Exception { LOG.info("Shutting down minicluster"); @@ -1761,7 +1761,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @param desc * @param startKey * @param endKey - * @return + * @return A Region instance * @throws IOException */ public HRegion createLocalHRegion(HTableDescriptor desc, byte [] startKey, @@ -1804,7 +1804,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. * @deprecated use - * {@link #createLocalHRegion(TableName, byte[], byte[], boolean, Durability, WAL, byte[]...)} + * #createLocalHRegion(TableName, byte[], byte[], boolean, Durability, WAL, byte[]...) */ @Deprecated public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, @@ -1816,13 +1816,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** - * @param tableName - * @param startKey - * @param stopKey - * @param callingMethod - * @param conf - * @param isReadOnly - * @param families * @return A region on which you must call * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. * @throws IOException @@ -1960,7 +1953,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** A tracker for tracking and validating table rows - * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])} + * generated with HBaseTestingUtility#loadTable(HTable, byte[]) */ public static class SeenRowTracker { int dim = 'z' - 'a' + 1; @@ -2195,7 +2188,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return digest.toString(); } - /** All the row values for the data loaded by {@link #loadTable(HTable, byte[])} */ + /** All the row values for the data loaded by #loadTable(HTable, byte[]) */ public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB static { int i = 0; @@ -2944,7 +2937,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * regions have been all assigned. Will timeout after default period (30 seconds) * Tolerates nonexistent table. * @param table Table to wait on. - * @param table * @throws InterruptedException * @throws IOException */ @@ -2956,7 +2948,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { /** * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the * regions have been all assigned. - * @see #waitTableEnabled(Admin, byte[], long) + * @see #waitTableEnabled(TableName, long) * @param table Table to wait on. * @param timeoutMillis Time to wait on it being marked enabled. * @throws InterruptedException @@ -3112,11 +3104,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... * * @param stream A DFSClient.DFSOutputStream. - * @param max - * @throws NoSuchFieldException - * @throws SecurityException - * @throws IllegalAccessException - * @throws IllegalArgumentException */ public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 33b50d4..07b41fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -74,7 +74,6 @@ import org.apache.hadoop.hbase.io.hfile.RandomDistribution; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; -import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.*; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -85,11 +84,12 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.htrace.core.HTraceConfiguration; +import org.apache.htrace.core.ProbabilitySampler; +import org.apache.htrace.core.Sampler; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import org.codehaus.jackson.map.ObjectMapper; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; -import org.apache.htrace.impl.ProbabilitySampler; import com.google.common.base.Objects; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -968,8 +968,7 @@ public class PerformanceEvaluation extends Configured implements Tool { protected final TestOptions opts; private final Status status; - private final Sampler traceSampler; - private final SpanReceiverHost receiverHost; + private final Tracer tracer; protected Connection connection; // protected Table table; @@ -985,18 +984,21 @@ public class PerformanceEvaluation extends Configured implements Tool { Test(final Connection con, final TestOptions options, final Status status) { this.connection = con; this.conf = con == null ? HBaseConfiguration.create() : this.connection.getConfiguration(); - this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf); this.opts = options; this.status = status; this.testName = this.getClass().getSimpleName(); + HTraceConfiguration htraceConfiguration = new HBaseHTraceConfiguration(conf); + this.tracer = new Tracer.Builder("PE").conf(htraceConfiguration).build(); + Sampler sampler; if (options.traceRate >= 1.0) { - this.traceSampler = Sampler.ALWAYS; + sampler = Sampler.ALWAYS; } else if (options.traceRate > 0.0) { conf.setDouble("hbase.sampler.fraction", options.traceRate); - this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf)); + sampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf)); } else { - this.traceSampler = Sampler.NEVER; + sampler = Sampler.NEVER; } + this.tracer.addSampler(sampler); everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); if (options.isValueZipf()) { this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); @@ -1067,7 +1069,7 @@ public class PerformanceEvaluation extends Configured implements Tool { if (!opts.oneCon) { connection.close(); } - receiverHost.closeReceivers(); + tracer.close(); } abstract void onTakedown() throws IOException; @@ -1100,11 +1102,8 @@ public class PerformanceEvaluation extends Configured implements Tool { for (int i = opts.startRow; i < lastRow; i++) { if (i % everyN != 0) continue; long startTime = System.nanoTime(); - TraceScope scope = Trace.startSpan("test row", traceSampler); - try { + try (TraceScope scope = this.tracer.newScope("test row");) { testRow(i); - } finally { - scope.close(); } if ( (i - opts.startRow) > opts.measureAfter) { latency.update((System.nanoTime() - startTime) / 1000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index b773b46..7e19aa5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -20,6 +20,15 @@ package org.apache.hadoop.hbase.client; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -50,15 +59,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - @Category({MediumTests.class, ClientTests.class}) public class TestReplicaWithCluster { private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java index cbcc166..9bd724d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java @@ -68,4 +68,4 @@ public class TestProcedureManager { assertArrayEquals("Incorrect return data from execProcedure", SimpleMasterProcedureManager.SIMPLE_DATA.getBytes(), result); } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestHTraceHooks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestHTraceHooks.java deleted file mode 100644 index 205f1d8..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TestHTraceHooks.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.trace; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.lang.reflect.Method; -import java.util.Collection; - -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.MiscTests; -import org.apache.htrace.Sampler; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; -import org.apache.htrace.TraceTree; -import org.apache.htrace.impl.POJOSpanReceiver; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({MiscTests.class, MediumTests.class}) -public class TestHTraceHooks { - - private static final byte[] FAMILY_BYTES = "family".getBytes(); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static POJOSpanReceiver rcvr; - private static long ROOT_SPAN_ID = 0; - - @BeforeClass - public static void before() throws Exception { - - // Find out what the right value to use fo SPAN_ROOT_ID after HTRACE-111. We use HTRACE-32 - // to find out to detect if we are using HTrace 3.2 or not. - try { - Method m = Span.class.getMethod("addKVAnnotation", String.class, String.class); - } catch (NoSuchMethodException e) { - ROOT_SPAN_ID = 0x74aceL; // Span.SPAN_ROOT_ID pre HTrace-3.2 - } - - TEST_UTIL.startMiniCluster(2, 3); - rcvr = new POJOSpanReceiver(new HBaseHTraceConfiguration(TEST_UTIL.getConfiguration())); - Trace.addReceiver(rcvr); - } - - @AfterClass - public static void after() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - Trace.removeReceiver(rcvr); - rcvr = null; - } - - @Test - public void testTraceCreateTable() throws Exception { - TraceScope tableCreationSpan = Trace.startSpan("creating table", Sampler.ALWAYS); - Table table; - try { - - table = TEST_UTIL.createTable(TableName.valueOf("table"), - FAMILY_BYTES); - } finally { - tableCreationSpan.close(); - } - - // Some table creation is async. Need to make sure that everything is full in before - // checking to see if the spans are there. - TEST_UTIL.waitFor(1000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return rcvr.getSpans().size() >= 5; - } - }); - - Collection spans = rcvr.getSpans(); - TraceTree traceTree = new TraceTree(spans); - Collection roots = traceTree.getSpansByParent().find(ROOT_SPAN_ID); - - assertEquals(1, roots.size()); - Span createTableRoot = roots.iterator().next(); - - assertEquals("creating table", createTableRoot.getDescription()); - - int createTableCount = 0; - - for (Span s : traceTree.getSpansByParent().find(createTableRoot.getSpanId())) { - if (s.getDescription().startsWith("MasterService.CreateTable")) { - createTableCount++; - } - } - - assertTrue(createTableCount >= 1); - assertTrue(traceTree.getSpansByParent().find(createTableRoot.getSpanId()).size() > 3); - assertTrue(spans.size() > 5); - - Put put = new Put("row".getBytes()); - put.addColumn(FAMILY_BYTES, "col".getBytes(), "value".getBytes()); - - TraceScope putSpan = Trace.startSpan("doing put", Sampler.ALWAYS); - try { - table.put(put); - } finally { - putSpan.close(); - } - - spans = rcvr.getSpans(); - traceTree = new TraceTree(spans); - roots = traceTree.getSpansByParent().find(ROOT_SPAN_ID); - - assertEquals(2, roots.size()); - Span putRoot = null; - for (Span root : roots) { - if (root.getDescription().equals("doing put")) { - putRoot = root; - } - } - - assertNotNull(putRoot); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TraceTree.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TraceTree.java new file mode 100644 index 0000000..9231b86 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/TraceTree.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.trace; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.core.MilliSpan; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; + +/** + * Used to create the graph formed by spans. Used to be in htrace 3.2. Copied over here for tests + * for htrace 4.0. Remove when htrace 4.0 has better test facility that includes what this class + * does. + */ +public class TraceTree { + public static final Log LOG = LogFactory.getLog(TraceTree.class); + + + public static class SpansByParent { + /** + * Compare two spans by span ID. + */ + private static Comparator COMPARATOR = + new Comparator() { + @Override + public int compare(Span a, Span b) { + return a.getSpanId().compareTo(b.getSpanId()); + } + }; + + private final TreeSet treeSet; + + private final HashMap> parentToSpans; + + SpansByParent(Collection spans) { + TreeSet treeSet = new TreeSet(COMPARATOR); + parentToSpans = new HashMap>(); + for (Span span : spans) { + treeSet.add(span); + for (SpanId parent : span.getParents()) { + LinkedList list = parentToSpans.get(parent); + if (list == null) { + list = new LinkedList(); + parentToSpans.put(parent, list); + } + list.add(span); + } + if (span.getParents().length == 0) { + LinkedList list = parentToSpans.get(Long.valueOf(0L)); + if (list == null) { + list = new LinkedList(); + parentToSpans.put(SpanId.INVALID, list); + } + list.add(span); + } + } + this.treeSet = treeSet; + } + + public List find(SpanId parentId) { + LinkedList spans = parentToSpans.get(parentId); + if (spans == null) { + return new LinkedList(); + } + return spans; + } + + public Iterator iterator() { + return Collections.unmodifiableSortedSet(treeSet).iterator(); + } + } + + public static class SpansByProcessId { + /** + * Compare two spans by process ID, and then by span ID. + */ + private static Comparator COMPARATOR = + new Comparator() { + @Override + public int compare(Span a, Span b) { + return a.getTracerId().compareTo(b.getTracerId()); + } + }; + + private final TreeSet treeSet; + + SpansByProcessId(Collection spans) { + TreeSet treeSet = new TreeSet(COMPARATOR); + for (Span span : spans) { + treeSet.add(span); + } + this.treeSet = treeSet; + } + + public List find(String processId) { + List spans = new ArrayList(); + Span span = new MilliSpan.Builder(). + spanId(new SpanId(Long.MIN_VALUE, Long.MIN_VALUE)). + tracerId(processId). + build(); + while (true) { + span = treeSet.higher(span); + if (span == null) { + break; + } + if (span.getTracerId().equals(processId)) { + break; + } + spans.add(span); + } + return spans; + } + + public Iterator iterator() { + return Collections.unmodifiableSortedSet(treeSet).iterator(); + } + } + + private final SpansByParent spansByParent; + private final SpansByProcessId spansByProcessId; + + /** + * Create a new TraceTree + * + * @param spans The collection of spans to use to create this TraceTree. Should + * have at least one root span. + */ + public TraceTree(Collection spans) { + this.spansByParent = new SpansByParent(spans); + this.spansByProcessId = new SpansByProcessId(spans); + } + + public SpansByParent getSpansByParent() { + return spansByParent; + } + + public SpansByProcessId getSpansByProcessId() { + return spansByProcessId; + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + String prefix = ""; + for (Iterator iter = spansByParent.iterator(); iter.hasNext();) { + Span span = iter.next(); + bld.append(prefix).append(span.toString()); + prefix = "\n"; + } + return bld.toString(); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 7996c17..2e208fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -49,18 +49,16 @@ import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.LogRoller; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; -import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; -import org.apache.htrace.impl.ProbabilitySampler; +import org.apache.htrace.core.ProbabilitySampler; +import org.apache.htrace.core.Sampler; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import com.yammer.metrics.core.Histogram; import com.yammer.metrics.core.Meter; @@ -72,6 +70,7 @@ import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; /** * This class runs performance benchmarks for {@link WAL}. @@ -108,6 +107,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { private int valueSize = 512; private int keySize = 16; + private Tracer tracer; + @Override public void setConf(Configuration conf) { super.setConf(conf); @@ -164,14 +165,15 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { Random rand = new Random(Thread.currentThread().getId()); WAL wal = region.getWAL(); - TraceScope threadScope = - Trace.startSpan("WALPerfEval." + Thread.currentThread().getName()); + Tracer tracer = Tracer.curThreadTracer(); + TraceScope threadScope = tracer == null? null: + tracer.newScope("WALPerfEval." + Thread.currentThread().getName()); try { long startTime = System.currentTimeMillis(); int lastSync = 0; for (int i = 0; i < numIterations; ++i) { - assert Trace.currentSpan() == threadScope.getSpan() : "Span leak detected."; - TraceScope loopScope = Trace.startSpan("runLoopIter" + i, loopSampler); + TraceScope loopScope = tracer == null? null: + tracer.newScope("runLoopIter" + i); try { long now = System.nanoTime(); Put put = setupPut(rand, key, value, numFamilies); @@ -306,9 +308,9 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { FileSystem fs = FileSystem.get(getConf()); LOG.info("FileSystem: " + fs); - SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null; - final Sampler sampler = trace ? Sampler.ALWAYS : Sampler.NEVER; - TraceScope scope = Trace.startSpan("WALPerfEval", sampler); + this.tracer = new Tracer.Builder("WALPerfEval"). + conf(new HBaseHTraceConfiguration(getConf())).build(); + this.tracer.addSampler(Sampler.ALWAYS); try { if (rootRegionDir == null) { @@ -330,8 +332,9 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { // a table per desired region means we can avoid carving up the key space final HTableDescriptor htd = createHTableDescriptor(i, numFamilies); regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller); - benchmarks[i] = Trace.wrap(new WALPutBenchmark(regions[i], htd, numIterations, noSync, - syncInterval, traceFreq)); + benchmarks[i] = + new WALPutBenchmark(regions[i], htd, numIterations, noSync, + syncInterval, traceFreq); } ConsoleReporter.enable(this.metrics, 30, TimeUnit.SECONDS); long putTime = runBenchmark(benchmarks, numThreads); @@ -379,8 +382,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { } finally { // We may be called inside a test that wants to keep on using the fs. if (!noclosefs) fs.close(); - scope.close(); - if (receiverHost != null) receiverHost.closeReceivers(); + this.tracer.close(); } return(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java index e71210d..839343b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java @@ -74,7 +74,7 @@ public class TestRecoverableZooKeeper { ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testSetDataVersionMismatchInLoop", abortable, true); String ensemble = ZKConfig.getZKQuorumServersString(conf); - RecoverableZooKeeper rzk = ZKUtil.connect(conf, ensemble, zkw); + RecoverableZooKeeper rzk = ZKUtil.connect(conf, ensemble, zkw, null); rzk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); rzk.setData(znode, "OPENING".getBytes(), 0); Field zkField = RecoverableZooKeeper.class.getDeclaredField("zk"); diff --git a/hbase-shell/pom.xml b/hbase-shell/pom.xml index f80858b..32b749e 100644 --- a/hbase-shell/pom.xml +++ b/hbase-shell/pom.xml @@ -249,7 +249,7 @@ org.apache.htrace - htrace-core + htrace-core4 diff --git a/hbase-shell/src/main/ruby/shell/commands/trace.rb b/hbase-shell/src/main/ruby/shell/commands/trace.rb index 5e00930..8017215 100644 --- a/hbase-shell/src/main/ruby/shell/commands/trace.rb +++ b/hbase-shell/src/main/ruby/shell/commands/trace.rb @@ -17,8 +17,9 @@ # limitations under the License. # HTrace = org.apache.htrace.Trace -java_import org.apache.htrace.Sampler -java_import org.apache.hadoop.hbase.trace.SpanReceiverHost +java_import org.apache.hadoop.hbase.HBaseConfiguration +java_import org.apache.htrace.core.Tracer +java_import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration module Shell module Commands @@ -47,28 +48,32 @@ Examples: EOF end - def command(startstop="status", spanname="HBaseShell") + def command(startstop="status", spanName="HBaseShell") format_and_return_simple_command do - trace(startstop, spanname) + trace(startstop, spanName) end end - def trace(startstop, spanname) - @@receiver ||= SpanReceiverHost.getInstance(@shell.hbase.configuration) + def trace(startstop, spanName) if startstop == "start" if not tracing? - @@tracescope = HTrace.startSpan(spanname, Sampler.ALWAYS) + hbc = HBaseConfiguration.create(); + hbc.set("hbase.htrace.sampler.classes".to_java, + "org.apache.htrace.core.AlwaysSampler".to_java) + hhc = HBaseHTraceConfiguration.new(hbc); + tracer = Tracer::Builder.new("Shell".to_java).conf(hhc).build(); + @@tracescope = tracer.newScope(spanName.to_java) end elsif startstop == "stop" if tracing? - @@tracescope.close() + @@tracescope.close() if @@tracescope end end tracing? end def tracing?() - HTrace.isTracing() + not Tracer.curThreadTracer().nil? end end diff --git a/hbase-thrift/pom.xml b/hbase-thrift/pom.xml index ff999dc..64f0f24 100644 --- a/hbase-thrift/pom.xml +++ b/hbase-thrift/pom.xml @@ -294,7 +294,7 @@ org.apache.htrace - htrace-core + htrace-core4 org.apache.thrift diff --git a/pom.xml b/pom.xml index d772c4b..f4f2d89 100644 --- a/pom.xml +++ b/pom.xml @@ -1191,7 +1191,7 @@ 1.6.8 4.12 1.3 - 3.1.0-incubating + 4.0.1-incubating 1.2.17 1.10.8 2.5.0 @@ -1720,7 +1720,7 @@ org.apache.htrace - htrace-core + htrace-core4 ${htrace.version} diff --git a/src/main/asciidoc/_chapters/tracing.adoc b/src/main/asciidoc/_chapters/tracing.adoc index 0cddd8a..590a20a 100644 --- a/src/main/asciidoc/_chapters/tracing.adoc +++ b/src/main/asciidoc/_chapters/tracing.adoc @@ -123,26 +123,18 @@ into: [source,java] ---- -TraceScope ts = Trace.startSpan("Gets", Sampler.ALWAYS); +Tracer tracer = Tracer.curThreadTracer(); +TraceScope ts = tracer == null? null: tracer.newScope("Gets"); try { Table table = connection.getTable(TableName.valueOf("t1")); Get get = new Get(Bytes.toBytes("r1")); Result res = table.get(get); } finally { - ts.close(); + if (ts != null) ts.close(); } ---- -If you wanted to trace half of your 'get' operations, you would pass in: - -[source,java] ----- - -new ProbabilitySampler(0.5) ----- - -in lieu of `Sampler.ALWAYS` to `Trace.startSpan()`. -See the HTrace _README_ for more information on Samplers. +See the HTrace documentation for more information on Samplers. [[tracing.client.shell]] == Tracing from HBase Shell