From 005f1e1cb20477249d9923009e04c1304b9a7b6e Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 1 Sep 2015 14:37:30 -0700 Subject: [PATCH] HBASE-12911 Client-side metrics --- hbase-client/pom.xml | 22 +++ .../hadoop/hbase/client/ClusterConnection.java | 5 + .../org/apache/hadoop/hbase/client/Connection.java | 1 - .../hbase/client/ConnectionImplementation.java | 18 ++- .../org/apache/hadoop/hbase/client/MetaCache.java | 9 ++ .../hadoop/hbase/client/MetricsConnection.java | 80 +++++++++++ .../hbase/client/MetricsConnectionWrapperImpl.java | 101 ++++++++++++++ .../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 33 +++-- .../org/apache/hadoop/hbase/ipc/AsyncCall.java | 6 +- .../apache/hadoop/hbase/ipc/AsyncRpcChannel.java | 9 +- .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 102 +++++++++----- .../hbase/ipc/AsyncServerResponseHandler.java | 1 + .../java/org/apache/hadoop/hbase/ipc/Call.java | 11 +- .../apache/hadoop/hbase/ipc/RpcClientFactory.java | 22 ++- .../org/apache/hadoop/hbase/ipc/RpcClientImpl.java | 58 +++++--- .../hadoop/hbase/client/TestMetricsConnection.java | 150 +++++++++++++++++++++ .../hbase/client/MetricsConnectionHostSource.java | 56 ++++++++ .../hbase/client/MetricsConnectionSource.java | 91 +++++++++++++ .../client/MetricsConnectionSourceFactory.java | 32 +++++ .../hbase/client/MetricsConnectionWrapper.java | 50 +++++++ .../hbase/client/MetricsRegionClientWrapper.java | 25 ++++ .../hadoop/hbase/test/MetricsAssertHelper.java | 7 + .../client/MetricsConnectionHostSourceImpl.java | 54 ++++++++ .../client/MetricsConnectionSourceFactoryImpl.java | 33 +++++ .../hbase/client/MetricsConnectionSourceImpl.java | 131 ++++++++++++++++++ .../hadoop/hbase/metrics/BaseSourceImpl.java | 51 +++---- ...oop.hbase.client.MetricsConnectionSourceFactory | 18 +++ .../hadoop/hbase/test/MetricsAssertHelperImpl.java | 7 + .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../hbase/regionserver/MetricsRegionServer.java | 3 +- .../hadoop/hbase/client/TestClientTimeouts.java | 2 +- .../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 18 ++- .../org/apache/hadoop/hbase/ipc/TestAsyncIPC.java | 14 +- .../hadoop/hbase/ipc/TestGlobalEventLoopGroup.java | 6 +- .../java/org/apache/hadoop/hbase/ipc/TestIPC.java | 4 +- .../hadoop/hbase/ipc/TestRpcHandlerException.java | 3 +- 36 files changed, 1100 insertions(+), 135 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapperImpl.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSource.java create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSource.java create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactory.java create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapper.java create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsRegionClientWrapper.java create mode 100644 hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSourceImpl.java create mode 100644 hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactoryImpl.java create mode 100644 hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceImpl.java create mode 100644 hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.MetricsConnectionSourceFactory diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index 425bd05..c9b1b04 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -128,6 +128,28 @@ org.apache.hbase + hbase-hadoop-compat + + + org.apache.hbase + hbase-hadoop-compat + test-jar + test + + + org.apache.hbase + ${compat.module} + ${project.version} + + + org.apache.hbase + ${compat.module} + ${project.version} + test-jar + test + + + org.apache.hbase hbase-common test-jar diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index b3d99ae..99071fa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -297,4 +297,9 @@ public interface ClusterConnection extends HConnection { */ ClientBackoffPolicy getBackoffPolicy(); + /** + * @return the MetricsConnection instance associated with this connection. + */ + public MetricsConnection getConnectionMetrics(); + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java index dab4905..a3f6fe6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java @@ -174,5 +174,4 @@ public interface Connection extends Abortable, Closeable { * @return true if this connection is closed */ boolean isClosed(); - } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 2262a0f..e7d2af5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -121,6 +121,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { private final AsyncProcess asyncProcess; // single tracker per connection private final ServerStatisticTracker stats; + private final MetricsConnection metrics; private volatile boolean closed; private volatile boolean aborted; @@ -157,11 +158,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // Client rpc instance. private RpcClient rpcClient; - private MetaCache metaCache = new MetaCache(); + private final MetaCache metaCache; private int refCount; - private User user; + protected User user; private RpcRetryingCallerFactory rpcCallerFactory; @@ -188,7 +189,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.registry = setupRegistry(); retrieveClusterId(); - this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); + this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); // Do we publish the status? @@ -239,11 +240,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } else { nonceGenerator = new NoNonceGenerator(); } - stats = ServerStatisticTracker.create(conf); + this.stats = ServerStatisticTracker.create(conf); this.asyncProcess = createAsyncProcess(this.conf); this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); + this.metrics = new MetricsConnection(new MetricsConnectionWrapperImpl(this)); + this.metaCache = new MetaCache(this.metrics); } /** @@ -365,6 +368,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return new HBaseAdmin(this); } + @Override + public MetricsConnection getConnectionMetrics() { + return this.metrics; + } + private ExecutorService getBatchPool() { if (batchPool == null) { synchronized (this) { @@ -1320,7 +1328,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // InetSocketAddress, and this way we will rightfully get a new stubKey. // Also, include the hostname in the key so as to take care of those cases where the // DNS name is different but IP address remains the same. - InetAddress i = new InetSocketAddress(rsHostname, port).getAddress(); + InetAddress i = new InetSocketAddress(rsHostname, port).getAddress(); String address = rsHostname; if (i != null) { address = i.getHostAddress() + "-" + rsHostname; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index 8e1c93c..e763dd9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -59,6 +59,12 @@ public class MetaCache { // The access to this attribute must be protected by a lock on cachedRegionLocations private final Set cachedServers = new ConcurrentSkipListSet(); + private final MetricsConnection metrics; + + public MetaCache(MetricsConnection metrics) { + this.metrics = metrics; + } + /** * Search the cache for a location that fits our table and row key. * Return null if no suitable region is located. @@ -74,6 +80,7 @@ public class MetaCache { Entry e = tableLocations.floorEntry(row); if (e == null) { + metrics.incrMetaCacheMiss(); return null; } RegionLocations possibleRegion = e.getValue(); @@ -94,10 +101,12 @@ public class MetaCache { // HConstants.EMPTY_END_ROW) check itself will pass. if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || Bytes.compareTo(endKey, 0, endKey.length, row, 0, row.length) > 0) { + metrics.incrMetaCacheHit(); return possibleRegion; } // Passed all the way through, so we got nothing - complete cache miss + metrics.incrMetaCacheMiss(); return null; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java new file mode 100644 index 0000000..b0b3f6a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * This class is for maintaining the various connection statistics and publishing them through + * the metrics interfaces. + */ +@InterfaceAudience.Private +public class MetricsConnection { + + /** A container class for collecting details about the RPC call as it percolates. */ + public static class CallStats { + public long requestSizeBytes = 0; + public long responseSizeBytes = 0; + public long startTime = 0; + public long callTimeMs = 0; + } + + private final MetricsConnectionWrapper wrapper; + private final MetricsConnectionSource source; + + public MetricsConnection(MetricsConnectionWrapper wrapper) { + this.wrapper = wrapper; + this.source = CompatibilitySingletonFactory.getInstance(MetricsConnectionSourceFactory.class) + .createConnection(this.wrapper); + } + + @VisibleForTesting + MetricsConnectionSource getMetricsSource() { + return source; + } + + /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ + public static CallStats newCallStats() { + // TODO: instance pool to reduce GC? + return new CallStats(); + } + + /** Increment the number of meta cache hits. */ + public void incrMetaCacheHit() { + source.incrMetaCacheHit(); + } + + /** Increment the number of meta cache misses. */ + public void incrMetaCacheMiss() { + source.incrMetaCacheMiss(); + } + + /** Report RPC context to metrics system. */ + public void updateRpc(Descriptors.MethodDescriptor method, ServerName serverName, + CallStats stats) { + final String methodName = method.getService().getName() + "_" + method.getName(); + final String sn = serverName.toShortString().replace(':', '-'); + source.addRpcCallDuration(methodName, sn, stats.callTimeMs); + source.addRpcCallRequestSize(methodName, sn, stats.requestSizeBytes); + source.addRpcCallResponseSize(methodName, sn, stats.responseSizeBytes); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapperImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapperImpl.java new file mode 100644 index 0000000..28ce8ee --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapperImpl.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.util.concurrent.ThreadPoolExecutor; + +@InterfaceAudience.Private +public class MetricsConnectionWrapperImpl implements MetricsConnectionWrapper { + + private final ConnectionImplementation conn; + + public MetricsConnectionWrapperImpl(ConnectionImplementation connection) { + Preconditions.checkNotNull(connection); + this.conn = connection; + } + + @Override public String getId() { + return conn.toString(); + } + + @Override public String getUserName() { + return conn.user == null ? "" : conn.user.toString(); + } + + @Override public String getClusterId() { + return conn.clusterId; + } + + @Override public String getZookeeperQuorum() { + try { + return conn.getKeepAliveZooKeeperWatcher().getQuorum(); + } catch (Exception e) { + return ""; + } + } + + @Override public String getZookeeperBaseNode() { + try { + return conn.getKeepAliveZooKeeperWatcher().getBaseZNode(); + } catch (Exception e) { + return ""; + } + } + + @Override public int getMetaLookupPoolActiveCount() { + if (conn.getCurrentMetaLookupPool() == null) { + return 0; + } + ThreadPoolExecutor tpe = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool(); + return tpe.getActiveCount(); + } + + @Override public int getMetaLookupPoolLargestPoolSize() { + if (conn.getCurrentMetaLookupPool() == null) { + return 0; + } + ThreadPoolExecutor tpe = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool(); + return tpe.getLargestPoolSize(); + } + + @Override public String getBatchPoolId() { + if (conn.getCurrentBatchPool() == null) { + return ""; + } + return Integer.toHexString(conn.getCurrentBatchPool().hashCode()); + } + + @Override public int getBatchPoolActiveCount() { + if (conn.getCurrentBatchPool() == null) { + return 0; + } + ThreadPoolExecutor tpe = (ThreadPoolExecutor) conn.getCurrentBatchPool(); + return tpe.getActiveCount(); + } + + @Override public int getBatchPoolLargestPoolSize() { + if (conn.getCurrentBatchPool() == null) { + return 0; + } + ThreadPoolExecutor tpe = (ThreadPoolExecutor) conn.getCurrentBatchPool(); + return tpe.getLargestPoolSize(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 9be370d..71be8ac 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.security.User; @@ -55,6 +56,7 @@ public abstract class AbstractRpcClient implements RpcClient { protected final Configuration conf; protected String clusterId; protected final SocketAddress localAddr; + protected final MetricsConnection metrics; protected UserProvider userProvider; protected final IPCUtil ipcUtil; @@ -79,8 +81,10 @@ public abstract class AbstractRpcClient implements RpcClient { * @param conf configuration * @param clusterId the cluster id * @param localAddr client socket bind address. + * @param metrics the connection metrics */ - public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { + public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, + MetricsConnection metrics) { this.userProvider = UserProvider.instantiate(conf); this.localAddr = localAddr; this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); @@ -100,6 +104,7 @@ public abstract class AbstractRpcClient implements RpcClient { this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); + this.metrics = metrics; // login the server principal (if using secure Hadoop) if (LOG.isDebugEnabled()) { @@ -199,25 +204,27 @@ public abstract class AbstractRpcClient implements RpcClient { * @return A pair with the Message response and the Cell data (if any). */ Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, - Message param, Message returnType, final User ticket, final InetSocketAddress isa) + Message param, Message returnType, final User ticket, final InetSocketAddress isa, + final ServerName serverName) throws ServiceException { if (pcrc == null) { pcrc = new PayloadCarryingRpcController(); } - long startTime = 0; - if (LOG.isTraceEnabled()) { - startTime = EnvironmentEdgeManager.currentTime(); - } Pair val; try { - val = call(pcrc, md, param, returnType, ticket, isa); + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + cs.startTime = EnvironmentEdgeManager.currentTime(); + val = call(pcrc, md, param, returnType, ticket, isa, cs); // Shove the results into controller so can be carried across the proxy/pb service void. pcrc.setCellScanner(val.getSecond()); + cs.callTimeMs = EnvironmentEdgeManager.currentTime() - cs.startTime; + if (metrics != null) { + metrics.updateRpc(md, serverName, cs); + } if (LOG.isTraceEnabled()) { - long callTime = EnvironmentEdgeManager.currentTime() - startTime; - LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms"); + LOG.trace("Call: " + md.getName() + ", callTime: " + cs.callTimeMs + "ms"); } return val.getFirst(); } catch (Throwable e) { @@ -242,7 +249,8 @@ public abstract class AbstractRpcClient implements RpcClient { */ protected abstract Pair call(PayloadCarryingRpcController pcrc, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, - InetSocketAddress isa) throws IOException, InterruptedException; + InetSocketAddress isa, MetricsConnection.CallStats callStats) + throws IOException, InterruptedException; @Override public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, @@ -255,6 +263,7 @@ public abstract class AbstractRpcClient implements RpcClient { */ @VisibleForTesting public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { + private final ServerName sn; private final InetSocketAddress isa; private final AbstractRpcClient rpcClient; private final User ticket; @@ -265,6 +274,7 @@ public abstract class AbstractRpcClient implements RpcClient { */ protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient, final ServerName sn, final User ticket, int channelOperationTimeout) { + this.sn = sn; this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); this.rpcClient = rpcClient; this.ticket = ticket; @@ -285,7 +295,8 @@ public abstract class AbstractRpcClient implements RpcClient { pcrc.setCallTimeout(channelOperationTimeout); } - return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa); + return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa, + this.sn); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java index 431c669..a5da0dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -49,6 +50,7 @@ public class AsyncCall extends DefaultPromise { final Message responseDefaultType; final long startTime; final long rpcTimeout; + final MetricsConnection.CallStats callStats; /** * Constructor @@ -61,7 +63,8 @@ public class AsyncCall extends DefaultPromise { * @param responseDefaultType the default response type */ public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message - param, PayloadCarryingRpcController controller, Message responseDefaultType) { + param, PayloadCarryingRpcController controller, Message responseDefaultType, + MetricsConnection.CallStats callStats) { super(eventLoop); this.id = connectId; @@ -73,6 +76,7 @@ public class AsyncCall extends DefaultPromise { this.startTime = EnvironmentEdgeManager.currentTime(); this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0; + this.callStats = callStats; } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index dbc39cf..572d564 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -49,6 +49,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; @@ -310,10 +311,10 @@ public class AsyncRpcChannel { */ public Promise callMethod(final Descriptors.MethodDescriptor method, final PayloadCarryingRpcController controller, final Message request, - final Message responsePrototype) { + final Message responsePrototype, MetricsConnection.CallStats callStats) { final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request, - controller, responsePrototype); + controller, responsePrototype, callStats); controller.notifyOnCancel(new RpcCallback() { @Override public void run(Object parameter) { @@ -433,7 +434,7 @@ public class AsyncRpcChannel { ByteBuf b = channel.alloc().directBuffer(4 + totalSize); try(ByteBufOutputStream out = new ByteBufOutputStream(b)) { - IPCUtil.write(out, rh, call.param, cellBlock); + call.callStats.requestSizeBytes = IPCUtil.write(out, rh, call.param, cellBlock); } channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); @@ -579,8 +580,6 @@ public class AsyncRpcChannel { /** * Clean up calls. - * - * @param cleanAll true if all calls should be cleaned, false for only the timed out calls */ private void cleanupCalls() { List toCleanup = new ArrayList(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 005f03c..746f397 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -52,7 +52,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; @@ -146,12 +148,12 @@ public class AsyncRpcClient extends AbstractRpcClient { * @param configuration to HBase * @param clusterId for the cluster * @param localAddress local address to connect to + * @param metrics the connection metrics * @param channelInitializer for custom channel handlers */ - @VisibleForTesting - AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, - ChannelInitializer channelInitializer) { - super(configuration, clusterId, localAddress); + protected AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, + MetricsConnection metrics, ChannelInitializer channelInitializer) { + super(configuration, clusterId, localAddress, metrics); if (LOG.isDebugEnabled()) { LOG.debug("Starting async Hbase RPC client"); @@ -191,15 +193,28 @@ public class AsyncRpcClient extends AbstractRpcClient { } } + /** Used in test only. */ + AsyncRpcClient(Configuration configuration) { + this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null); + } + + /** Used in test only. */ + AsyncRpcClient(Configuration configuration, + ChannelInitializer channelInitializer) { + this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer); + } + /** * Constructor * * @param configuration to HBase * @param clusterId for the cluster * @param localAddress local address to connect to + * @param metrics the connection metrics */ - public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress) { - this(configuration, clusterId, localAddress, null); + public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, + MetricsConnection metrics) { + this(configuration, clusterId, localAddress, metrics, null); } /** @@ -219,13 +234,14 @@ public class AsyncRpcClient extends AbstractRpcClient { @Override protected Pair call(PayloadCarryingRpcController pcrc, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, - InetSocketAddress addr) throws IOException, InterruptedException { + InetSocketAddress addr, MetricsConnection.CallStats callStats) + throws IOException, InterruptedException { if (pcrc == null) { pcrc = new PayloadCarryingRpcController(); } final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); - Promise promise = connection.callMethod(md, pcrc, param, returnType); + Promise promise = connection.callMethod(md, pcrc, param, returnType, callStats); long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0; try { Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get(); @@ -244,40 +260,49 @@ public class AsyncRpcClient extends AbstractRpcClient { /** * Call method async */ - private void callMethod(Descriptors.MethodDescriptor md, final PayloadCarryingRpcController pcrc, - Message param, Message returnType, User ticket, InetSocketAddress addr, - final RpcCallback done) { + private void callMethod(final Descriptors.MethodDescriptor md, + final PayloadCarryingRpcController pcrc, Message param, Message returnType, User ticket, + InetSocketAddress addr, final ServerName serverName, final RpcCallback done) { final AsyncRpcChannel connection; try { connection = createRpcChannel(md.getService().getName(), addr, ticket); - - connection.callMethod(md, pcrc, param, returnType).addListener( + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + GenericFutureListener> listener = new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - if(!future.isSuccess()){ - Throwable cause = future.cause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - }else{ - pcrc.setFailed(new IOException(cause)); - } - }else{ - try { - done.run(future.get()); - }catch (ExecutionException e){ - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - }else{ - pcrc.setFailed(new IOException(cause)); + @Override + public void operationComplete(Future future) throws Exception { + cs.callTimeMs = EnvironmentEdgeManager.currentTime() - cs.startTime; + if (metrics != null) { + metrics.updateRpc(md, serverName, cs); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Call: " + md.getName() + ", callTime: " + cs.callTimeMs + "ms"); + } + if (!future.isSuccess()) { + Throwable cause = future.cause(); + if (cause instanceof IOException) { + pcrc.setFailed((IOException) cause); + } else { + pcrc.setFailed(new IOException(cause)); + } + } else { + try { + done.run(future.get()); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + pcrc.setFailed((IOException) cause); + } else { + pcrc.setFailed(new IOException(cause)); + } + } catch (InterruptedException e) { + pcrc.setFailed(new IOException(e)); + } } - }catch (InterruptedException e){ - pcrc.setFailed(new IOException(e)); } - } - } - }); + }; + cs.startTime = EnvironmentEdgeManager.currentTime(); + connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener); } catch (StoppedRpcClientException|FailedServerException e) { pcrc.setFailed(e); } @@ -433,6 +458,7 @@ public class AsyncRpcClient extends AbstractRpcClient { @VisibleForTesting public static class RpcChannelImplementation implements RpcChannel { private final InetSocketAddress isa; + private final ServerName serverName; private final AsyncRpcClient rpcClient; private final User ticket; private final int channelOperationTimeout; @@ -442,6 +468,7 @@ public class AsyncRpcClient extends AbstractRpcClient { */ protected RpcChannelImplementation(final AsyncRpcClient rpcClient, final ServerName sn, final User ticket, int channelOperationTimeout) { + this.serverName = sn; this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); this.rpcClient = rpcClient; this.ticket = ticket; @@ -462,7 +489,8 @@ public class AsyncRpcClient extends AbstractRpcClient { pcrc.setCallTimeout(channelOperationTimeout); } - this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done); + this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, + this.serverName, done); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java index f7aa8a9..cc938d9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java @@ -102,6 +102,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { cellBlockScanner = channel.client.createCellScanner(cellBlock); } call.setSuccess(value, cellBlockScanner); + call.callStats.responseSizeBytes = totalSize; } } catch (IOException e) { // Treat this as a fatal condition and close this connection diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index df32730..24cd997 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -21,6 +21,7 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -41,16 +42,18 @@ public class Call { Message responseDefaultType; IOException error; // exception, null if value volatile boolean done; // true when call is done - long startTime; final Descriptors.MethodDescriptor md; final int timeout; // timeout in millisecond for this call; 0 means infinite. + final MetricsConnection.CallStats callStats; protected Call(int id, final Descriptors.MethodDescriptor md, Message param, - final CellScanner cells, final Message responseDefaultType, int timeout) { + final CellScanner cells, final Message responseDefaultType, int timeout, + MetricsConnection.CallStats callStats) { this.param = param; this.md = md; this.cells = cells; - this.startTime = EnvironmentEdgeManager.currentTime(); + this.callStats = callStats; + this.callStats.startTime = EnvironmentEdgeManager.currentTime(); this.responseDefaultType = responseDefaultType; this.id = id; this.timeout = timeout; @@ -122,6 +125,6 @@ public class Call { } public long getStartTime() { - return this.startTime; + return this.callStats.startTime; } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java index 10ddc56..822daca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.ipc; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.util.ReflectionUtils; import java.net.SocketAddress; @@ -37,15 +39,23 @@ public final class RpcClientFactory { private RpcClientFactory() { } + /** Helper method for tests only. Creates an {@code RpcClient} without metrics. */ + @VisibleForTesting + public static RpcClient createClient(Configuration conf, String clusterId) { + return createClient(conf, clusterId, null); + } + /** * Creates a new RpcClient by the class defined in the configuration or falls back to * RpcClientImpl * @param conf configuration * @param clusterId the cluster id + * @param metrics the connection metrics * @return newly created RpcClient */ - public static RpcClient createClient(Configuration conf, String clusterId) { - return createClient(conf, clusterId, null); + public static RpcClient createClient(Configuration conf, String clusterId, + MetricsConnection metrics) { + return createClient(conf, clusterId, null, metrics); } /** @@ -54,16 +64,18 @@ public final class RpcClientFactory { * @param conf configuration * @param clusterId the cluster id * @param localAddr client socket bind address. + * @param metrics the connection metrics * @return newly created RpcClient */ public static RpcClient createClient(Configuration conf, String clusterId, - SocketAddress localAddr) { + SocketAddress localAddr, MetricsConnection metrics) { String rpcClientClass = conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, AsyncRpcClient.class.getName()); return ReflectionUtils.instantiateWithCustomCtor( rpcClientClass, - new Class[] { Configuration.class, String.class, SocketAddress.class }, - new Object[] { conf, clusterId, localAddr } + new Class[] { Configuration.class, String.class, SocketAddress.class, + MetricsConnection.class }, + new Object[] { conf, clusterId, localAddr, metrics } ); } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index c11273e..a4d8cbd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.Message.Builder; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -935,7 +937,7 @@ public class RpcClientImpl extends AbstractRpcClient { checkIsOpen(); // Now we're checking that it didn't became idle in between. try { - IPCUtil.write(this.out, header, call.param, cellBlock); + call.callStats.requestSizeBytes = IPCUtil.write(this.out, header, call.param, cellBlock); } catch (IOException e) { // We set the value inside the synchronized block, this way the next in line // won't even try to write @@ -984,12 +986,20 @@ public class RpcClientImpl extends AbstractRpcClient { int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); int whatIsLeftToRead = totalSize - readSoFar; IOUtils.skipFully(in, whatIsLeftToRead); + if (call != null) { + call.callStats.responseSizeBytes = totalSize; + call.callStats.callTimeMs = + EnvironmentEdgeManager.currentTime() - call.callStats.startTime; + } return false; } if (responseHeader.hasException()) { ExceptionResponse exceptionResponse = responseHeader.getException(); RemoteException re = createRemoteException(exceptionResponse); call.setException(re); + call.callStats.responseSizeBytes = totalSize; + call.callStats.callTimeMs = + EnvironmentEdgeManager.currentTime() - call.callStats.startTime; if (isFatalConnectionException(exceptionResponse)) { return markClosed(re); } @@ -1008,6 +1018,9 @@ public class RpcClientImpl extends AbstractRpcClient { cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); } call.setResponse(value, cellBlockScanner); + call.callStats.responseSizeBytes = totalSize; + call.callStats.callTimeMs = + EnvironmentEdgeManager.currentTime() - call.callStats.startTime; } } catch (IOException e) { if (expectedCall) call.setException(e); @@ -1098,13 +1111,15 @@ public class RpcClientImpl extends AbstractRpcClient { } /** - * Construct an IPC cluster client whose values are of the {@link Message} class. + * Used in test only. Construct an IPC cluster client whose values are of the + * {@link Message} class. * @param conf configuration * @param clusterId the cluster id * @param factory socket factory */ + @VisibleForTesting RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) { - this(conf, clusterId, factory, null); + this(conf, clusterId, factory, null, null); } /** @@ -1113,10 +1128,11 @@ public class RpcClientImpl extends AbstractRpcClient { * @param clusterId the cluster id * @param factory socket factory * @param localAddr client socket bind address + * @param metrics the connection metrics */ RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory, - SocketAddress localAddr) { - super(conf, clusterId, localAddr); + SocketAddress localAddr, MetricsConnection metrics) { + super(conf, clusterId, localAddr, metrics); this.socketFactory = factory; this.connections = new PoolMap(getPoolType(conf), getPoolSize(conf)); @@ -1124,25 +1140,36 @@ public class RpcClientImpl extends AbstractRpcClient { } /** - * Construct an IPC client for the cluster clusterId with the default SocketFactory - * @param conf configuration - * @param clusterId the cluster id + * Used in test only. Construct an IPC client for the cluster {@code clusterId} with + * the default SocketFactory + */ + @VisibleForTesting + RpcClientImpl(Configuration conf, String clusterId) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null); + } + + /** + * Used in test only. Construct an IPC client for the cluster {@code clusterId} with + * the default SocketFactory */ - public RpcClientImpl(Configuration conf, String clusterId) { - this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null); + @VisibleForTesting + public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr) { + this(conf, clusterId, localAddr, null); } /** - * Construct an IPC client for the cluster clusterId with the default SocketFactory + * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory * * This method is called with reflection by the RpcClientFactory to create an instance * * @param conf configuration * @param clusterId the cluster id * @param localAddr client socket bind address. + * @param metrics the connection metrics */ - public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr) { - this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr); + public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr, + MetricsConnection metrics) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics); } /** Stop all threads related to this client. No further calls may be made @@ -1197,7 +1224,8 @@ public class RpcClientImpl extends AbstractRpcClient { */ @Override protected Pair call(PayloadCarryingRpcController pcrc, MethodDescriptor md, - Message param, Message returnType, User ticket, InetSocketAddress addr) + Message param, Message returnType, User ticket, InetSocketAddress addr, + MetricsConnection.CallStats callStats) throws IOException, InterruptedException { if (pcrc == null) { pcrc = new PayloadCarryingRpcController(); @@ -1205,7 +1233,7 @@ public class RpcClientImpl extends AbstractRpcClient { CellScanner cells = pcrc.cellScanner(); final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType, - pcrc.getCallTimeout()); + pcrc.getCallTimeout(), MetricsConnection.newCallStats()); final Connection connection = getConnection(ticket, call, addr); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java new file mode 100644 index 0000000..9c5638e --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.CompatibilityFactory; +import org.apache.hadoop.hbase.test.MetricsAssertHelper; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.apache.hadoop.metrics2.lib.MutableHistogram.MAX_METRIC_NAME; +import static org.apache.hadoop.metrics2.lib.MutableHistogram.MIN_METRIC_NAME; +import static org.apache.hadoop.metrics2.lib.MutableHistogram.NUM_OPS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.MetricsConnectionSource.*; +import static org.junit.Assert.*; + +/** Unit test over client metrics */ +@Category({ ClientTests.class, SmallTests.class }) +public class TestMetricsConnection { + + private static class MetricsConnectionWrapperStub implements MetricsConnectionWrapper { + @Override public String getId() { + return "testConnectionId"; + } + + @Override public String getUserName() { + return "testUser"; + } + + @Override public String getClusterId() { + return "testClusterId"; + } + + @Override public String getZookeeperQuorum() { + return "foo:bar:baz"; + } + + @Override public String getZookeeperBaseNode() { + return "/testing"; + } + + @Override public int getMetaLookupPoolActiveCount() { + return 50; + } + + @Override public int getMetaLookupPoolLargestPoolSize() { + return 51; + } + + @Override public String getBatchPoolId() { + return "testBatchPoolId"; + } + + @Override public int getBatchPoolActiveCount() { + return 52; + } + + @Override public int getBatchPoolLargestPoolSize() { + return 53; + } + } + + public static MetricsAssertHelper HELPER = + CompatibilityFactory.getInstance(MetricsAssertHelper.class); + + private MetricsConnectionWrapper wrapper; + private MetricsConnection metricsConnection; + private MetricsConnectionSourceImpl source; + + @BeforeClass + public static void classSetUp() { + HELPER.init(); + } + + @Before + public void before() { + wrapper = new MetricsConnectionWrapperStub(); + metricsConnection = new MetricsConnection(wrapper); + source = (MetricsConnectionSourceImpl) metricsConnection.getMetricsSource(); + } + + @Test + public void testWrapperSource() { + HELPER.assertTag(CONNECTION_ID_NAME, wrapper.getId(), source); + HELPER.assertTag(USER_NAME_NAME, wrapper.getUserName(), source); + HELPER.assertTag(CLUSTER_ID_NAME, wrapper.getClusterId(), source); + HELPER.assertTag(ZOOKEEPER_QUORUM_NAME, wrapper.getZookeeperQuorum(), source); + HELPER.assertTag(ZOOKEEPER_ZNODE_NAME, wrapper.getZookeeperBaseNode(), source); + HELPER.assertGauge(META_LOOKUP_POOL_ACTIVE_THREAD_NAME, + wrapper.getMetaLookupPoolActiveCount(), source); + HELPER.assertGauge(META_LOOKUP_POOL_LARGEST_SIZE_NAME, + wrapper.getMetaLookupPoolLargestPoolSize(), source); + HELPER.assertTag(BATCH_POOL_ID_NAME, wrapper.getBatchPoolId(), source); + HELPER.assertGauge(BATCH_POOL_ACTIVE_THREAD_NAME, wrapper.getBatchPoolActiveCount(), source); + HELPER.assertGauge(BATCH_POOL_LARGEST_SIZE_NAME, wrapper.getBatchPoolLargestPoolSize(), + source); + } + + /** Should really be in a TestMetricsConnectionSourceImpl */ + @Test + public void testDynamicMethodRegistration() { + source.addRpcCallDuration("MethodFoo", "Server1", 10); + source.addRpcCallDuration("MethodFoo", "Server1", 20); + source.addRpcCallDuration("MethodFoo", "Server2", 30); + source.addRpcCallDuration("MethodBar", "Server2", 40); + + HELPER.assertCounter(/* we should see 3 total calls to MethodFoo */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + NUM_OPS_METRIC_NAME, 3, source); + HELPER.assertCounter(/* we should see 1 total call to MethodBar */ + RPC_CALL_DURATION_NAME_BASE + "MethodBar" + NUM_OPS_METRIC_NAME, 1, source); + + MetricsConnectionHostSource server1 = source.getOrCreate("Server1"); + assertNotNull("getOrCreate should always return something", server1); + HELPER.assertCounter(/* 2 calls of MethodFoo were made to server1 */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + NUM_OPS_METRIC_NAME, 2, server1); + HELPER.assertGauge(/* the smallest value reported by server1 should be 10 */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + MIN_METRIC_NAME, 10, server1); + HELPER.assertGauge(/* the largest value reported by server1 should be 20 */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + MAX_METRIC_NAME, 20, server1); + HELPER.assertCounterNotExist(/* assert no calls of MethodBar were made to server1 */ + RPC_CALL_DURATION_NAME_BASE + "MethodBar" + NUM_OPS_METRIC_NAME, server1); + + MetricsConnectionHostSource server2 = source.getOrCreate("Server2"); + assertNotNull("getOrCreate should always return something", server2); + HELPER.assertCounter(/* 1 call of MethodFoo was made to server2 */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + NUM_OPS_METRIC_NAME, 1, server2); + HELPER.assertGauge(/* the smallest value reported by server2 should be 30 */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + MIN_METRIC_NAME, 30, server2); + HELPER.assertCounter(/* 1 call of MethodBar was made to server2 */ + RPC_CALL_DURATION_NAME_BASE + "MethodBar" + NUM_OPS_METRIC_NAME, 1, server2); + } +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSource.java new file mode 100644 index 0000000..42ed660 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSource.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.metrics.BaseSource; + +/** + * Reports Host-level metrics as observed by the client. + */ +public interface MetricsConnectionHostSource extends BaseSource { + /** The name of the metrics. */ + String METRICS_NAME = "Host"; + + /** The name of the metrics context that metrics will be under. */ + String METRICS_CONTEXT = "connection"; + + /** Description */ + String METRICS_DESCRIPTION = "Metrics describing interactions with target host"; + + /** The name of the metrics context that metrics will be under in jmx. */ + String METRICS_JMX_CONTEXT_FMT = "Client,sub01=%s,sub02=%s"; + + /** Prefix string used for describing the rpc call time for a specific method. */ + String RPC_CALL_DURATION_NAME_BASE = "rpcCallDurationMs_"; + String RPC_CALL_DURATION_DESC = "The duration of an RPC call in milliseconds."; + /** Prefix string used for describing the rpc call time for a specific method. */ + String RPC_CALL_REQUEST_SIZE_NAME_BASE = "rpcCallRequestSizeBytes_"; + String RPC_CALL_REQUEST_SIZE_DESC = "The size of an RPC call request in bytes."; + /** Prefix string used for describing the rpc call time for a specific method. */ + String RPC_CALL_RESPONSE_SIZE_NAME_BASE = "rpcCallResponseSizeBytes_"; + String RPC_CALL_RESPONSE_SIZE_DESC = "The size of an RPC call response in bytes."; + + /** Record an RPC call duration. */ + void addRpcCallDuration(String method, long ms); + + /** Record an RPC request size. */ + void addRpcCallRequestSize(String method, long bytes); + + /** Record an RPC response size. */ + void addRpcCallResponseSize(String method, long bytes); +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSource.java new file mode 100644 index 0000000..d423f09 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSource.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.metrics.BaseSource; + +public interface MetricsConnectionSource extends BaseSource { + + /** The name of the metrics. */ + String METRICS_NAME = "Connection"; + + /** The name of the metrics context that metrics will be under. */ + String METRICS_CONTEXT = "connection"; + + /** Description. */ + String METRICS_DESCRIPTION = "Metrics about HBase Connection"; + + /** The name of the metrics context that metrics will be under in jmx. */ + String METRICS_JMX_CONTEXT_FMT = "Client,sub01=%s,sub02=connection"; + + /** Increment number of meta cache hits. */ + void incrMetaCacheHit(); + + /** Increment number of meta cache misses. */ + void incrMetaCacheMiss(); + + /** Add a new rpc call observation, duration of call in milliseconds. */ + void addRpcCallDuration(String method, String serverName, long ms); + + /** Add a new rpc call observation, request size in bytes. */ + void addRpcCallRequestSize(String method, String serverName, long bytes); + + /** Add a new rpc call observation, response size in bytes. */ + void addRpcCallResponseSize(String method, String serverName, long bytes); + + // Strings used for exporting to metrics system. + String CONNECTION_ID_NAME = "connectionId"; + String CONNECTION_ID_DESC = "The connection's process-unique identifier."; + String USER_NAME_NAME = "userName"; + String USER_NAME_DESC = "The user on behalf of whom the Connection is acting."; + String CLUSTER_ID_NAME = "clusterId"; + String CLUSTER_ID_DESC = "Cluster Id"; + String ZOOKEEPER_QUORUM_NAME = "zookeeperQuorum"; + String ZOOKEEPER_QUORUM_DESC = "Zookeeper Quorum"; + String ZOOKEEPER_ZNODE_NAME = "zookeeperBaseZNode"; + String ZOOKEEPER_ZNODE_DESC = "Base ZNode for this cluster."; + + String META_CACHE_HIT_NAME = "metaCacheHit"; + String META_CACHE_HIT_DESC = + "A counter on the number of times this connection's meta cache has a valid region location."; + String META_CACHE_MISS_NAME = "metaCacheMiss"; + String META_CACHE_MISS_DESC = + "A counter on the number of times this connection does not know where to find a region."; + + String META_LOOKUP_POOL_ACTIVE_THREAD_NAME = "metaLookupPoolActiveThreads"; + String META_LOOKUP_POOL_ACTIVE_THREAD_DESC = + "The approximate number of threads actively resolving region locations from META."; + String META_LOOKUP_POOL_LARGEST_SIZE_NAME = "metaLookupPoolLargestSize"; + String META_LOOKUP_POOL_LARGEST_SIZE_DESC = + "The largest number of threads that have ever simultaneously been in the pool."; + String BATCH_POOL_ID_NAME = "batchPoolId"; + String BATCH_POOL_ID_DESC = "The connection's batch pool's unique identifier."; + String BATCH_POOL_ACTIVE_THREAD_NAME = "batchPoolActiveThreads"; + String BATCH_POOL_ACTIVE_THREAD_DESC = + "The approximate number of threads executing table operations."; + String BATCH_POOL_LARGEST_SIZE_NAME = "batchPoolLargestSize"; + String BATCH_POOL_LARGEST_SIZE_DESC = + "The largest number of threads that have ever simultaneously been in the pool."; + + String RPC_CALL_DURATION_NAME_BASE = "rpcCallDurationMs_"; + String RPC_CALL_DURATION_DESC = "The duration of an RPC call in milliseconds."; + String RPC_CALL_REQUEST_SIZE_NAME_BASE = "rpcCallRequestSizeBytes_"; + String RPC_CALL_REQUEST_SIZE_DESC = "The size of an RPC call request in bytes."; + String RPC_CALL_RESPONSE_SIZE_NAME_BASE = "rpcCallResponseSizeBytes_"; + String RPC_CALL_RESPONSE_SIZE_DESC = "The size of an RPC call response in bytes."; +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactory.java new file mode 100644 index 0000000..21585ae --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +/** + * Interface of a factory to create Metrics Sources used inside of Connections. + */ +public interface MetricsConnectionSourceFactory { + + /** + * Given a wrapper create a {@link MetricsConnectionSource}. + * + * @param wrapper The wrapped Connection + * @return a Metrics Source. + */ + MetricsConnectionSource createConnection(MetricsConnectionWrapper wrapper); +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapper.java new file mode 100644 index 0000000..43410a8 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapper.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +/** + * This is the interface that will expose Connection information to hadoop1/hadoop2 + * implementations of the {@link MetricsConnectionSource}. + */ +public interface MetricsConnectionWrapper { + + /** Get the connection's unique identifier */ + String getId(); + + /** Get the User's name. */ + String getUserName(); + + /** Get the Cluster ID */ + String getClusterId(); + + /** Get the Zookeeper Quorum Info */ + String getZookeeperQuorum(); + + /** Get the base ZNode for this cluster. */ + String getZookeeperBaseNode(); + + int getMetaLookupPoolActiveCount(); + + int getMetaLookupPoolLargestPoolSize(); + + String getBatchPoolId(); + + int getBatchPoolActiveCount(); + + int getBatchPoolLargestPoolSize(); +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsRegionClientWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsRegionClientWrapper.java new file mode 100644 index 0000000..2301518 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsRegionClientWrapper.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +public interface MetricsRegionClientWrapper { + + String getConnectionId(); + + String getRegionName(); +} diff --git a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelper.java b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelper.java index 2eefcd2..14fa3b4 100644 --- a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelper.java +++ b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelper.java @@ -130,6 +130,13 @@ public interface MetricsAssertHelper { void assertCounterLt(String name, long expected, BaseSource source); /** + * Assert that a counter does not exist. + * @param name The name of the counter + * @param source The {@link BaseSource} that will provide the tags, gauges, and counters. + */ + void assertCounterNotExist(String name, BaseSource source); + + /** * Get the value of a counter. * * @param name name of the counter. diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSourceImpl.java new file mode 100644 index 0000000..4c7ee58 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSourceImpl.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.metrics2.lib.MutableHistogram; + +/** + * Reports Host-level metrics as observed by the client. + */ +@InterfaceAudience.Private +public class MetricsConnectionHostSourceImpl extends BaseSourceImpl + implements MetricsConnectionHostSource { + + public MetricsConnectionHostSourceImpl(MetricsConnectionWrapper wrapper, String serverName) { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, + String.format(METRICS_JMX_CONTEXT_FMT, wrapper.getId(), serverName)); + } + + public MetricsConnectionHostSourceImpl(String metricsName, String metricsDescription, + String metricsContext, String metricsJmxContext) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + } + + @Override public void addRpcCallDuration(String method, long ms) { + getHistogram(RPC_CALL_DURATION_NAME_BASE + method, RPC_CALL_DURATION_DESC).add(ms); + } + + @Override public void addRpcCallRequestSize(String method, long bytes) { + getHistogram(RPC_CALL_REQUEST_SIZE_NAME_BASE + method, RPC_CALL_REQUEST_SIZE_DESC) + .add(bytes); + } + + @Override public void addRpcCallResponseSize(String method, long bytes) { + getHistogram(RPC_CALL_RESPONSE_SIZE_NAME_BASE + method, RPC_CALL_RESPONSE_SIZE_DESC) + .add(bytes); + } +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactoryImpl.java new file mode 100644 index 0000000..d29c5c9 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactoryImpl.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class MetricsConnectionSourceFactoryImpl implements MetricsConnectionSourceFactory { + + @Override public MetricsConnectionSource createConnection(MetricsConnectionWrapper wrapper) { + return new MetricsConnectionSourceImpl(wrapper); + } + + public MetricsConnectionHostSource createHostClient(MetricsConnectionWrapper wrapper, + String serverName) { + return new MetricsConnectionHostSourceImpl(wrapper, serverName); + } +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceImpl.java new file mode 100644 index 0000000..415da29 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceImpl.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.*; + +import java.util.HashMap; +import java.util.Map; + +@InterfaceAudience.Private +public class MetricsConnectionSourceImpl + extends BaseSourceImpl implements MetricsConnectionSource { + + // wrapper for access statistics collected in Connection instance + private final MetricsConnectionWrapper wrapper; + + // Hadoop Metric2 objects for additional monitoring + + private final MutableCounterLong metaCacheHit; + private final MutableCounterLong metaCacheMiss; + + private final Map byHost; + + public MetricsConnectionSourceImpl(MetricsConnectionWrapper wrapper) { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, + String.format(METRICS_JMX_CONTEXT_FMT, wrapper.getId()), wrapper); + } + + public MetricsConnectionSourceImpl(String metricsName, String metricsDescription, + String metricsContext, String metricsJmxContext, MetricsConnectionWrapper wrapper) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + this.wrapper = wrapper; + + metaCacheHit = getMetricsRegistry().newCounter(META_CACHE_HIT_NAME, META_CACHE_HIT_DESC, 0l); + metaCacheMiss = + getMetricsRegistry().newCounter(META_CACHE_MISS_NAME, META_CACHE_MISS_DESC, 0l); + byHost = new HashMap<>(); + } + + /** + * Dynamically register metrics on first invocation. A new bean is registered for + * {@code serverName} upon initial encounter. RPC calls to said host are registered on that bean + * on first invocation. + */ + @VisibleForTesting + MetricsConnectionHostSource getOrCreate(String serverName) { + // TODO: how to handle expiration of metrics for transient hosts? + MetricsConnectionHostSource hostSource = byHost.get(serverName); + if (hostSource == null) { + synchronized (byHost) { + hostSource = byHost.get(serverName); + if (hostSource == null) { + hostSource = new MetricsConnectionHostSourceImpl(wrapper, serverName); + byHost.put(serverName, hostSource); + } + } + } + return hostSource; + } + + @Override + public void getMetrics(MetricsCollector metricsCollector, boolean all) { + + MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName); + + if (wrapper != null) { + mrb.addGauge(Interns.info(META_LOOKUP_POOL_LARGEST_SIZE_NAME, + META_LOOKUP_POOL_LARGEST_SIZE_DESC), wrapper.getMetaLookupPoolLargestPoolSize()) + .addGauge(Interns.info(META_LOOKUP_POOL_ACTIVE_THREAD_NAME, + META_LOOKUP_POOL_ACTIVE_THREAD_DESC), wrapper.getMetaLookupPoolActiveCount()) + .tag(Interns.info(BATCH_POOL_ID_NAME, BATCH_POOL_ID_DESC), wrapper.getBatchPoolId()) + .addGauge(Interns.info(BATCH_POOL_ACTIVE_THREAD_NAME, BATCH_POOL_ACTIVE_THREAD_DESC), + wrapper.getBatchPoolActiveCount()) + .addGauge(Interns.info(BATCH_POOL_LARGEST_SIZE_NAME, BATCH_POOL_LARGEST_SIZE_DESC), + wrapper.getBatchPoolLargestPoolSize()) + .tag(Interns.info(CONNECTION_ID_NAME, CONNECTION_ID_DESC), wrapper.getId()) + .tag(Interns.info(USER_NAME_NAME, USER_NAME_DESC), wrapper.getUserName()) + .tag(Interns.info(CLUSTER_ID_NAME, CLUSTER_ID_DESC), wrapper.getClusterId()) + .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), + wrapper.getZookeeperQuorum()) + .tag(Interns.info(ZOOKEEPER_ZNODE_NAME, ZOOKEEPER_ZNODE_DESC), + wrapper.getZookeeperBaseNode()); + } + + metricsRegistry.snapshot(mrb, all); + } + + @Override public void incrMetaCacheHit() { + metaCacheHit.incr(); + } + + @Override public void incrMetaCacheMiss() { + metaCacheMiss.incr(); + } + + @Override public void addRpcCallDuration(String method, String serverName, long ms) { + getHistogram(RPC_CALL_DURATION_NAME_BASE + method, RPC_CALL_DURATION_DESC).add(ms); + getOrCreate(serverName).addRpcCallDuration(method, ms); + } + + @Override public void addRpcCallRequestSize(String method, String serverName, long bytes) { + getHistogram(RPC_CALL_REQUEST_SIZE_NAME_BASE + method, RPC_CALL_REQUEST_SIZE_DESC).add(bytes); + getOrCreate(serverName).addRpcCallRequestSize(method, bytes); + } + + @Override public void addRpcCallResponseSize(String method, String serverName, long bytes) { + getHistogram(RPC_CALL_RESPONSE_SIZE_NAME_BASE + method, RPC_CALL_RESPONSE_SIZE_DESC) + .add(bytes); + getOrCreate(serverName).addRpcCallResponseSize(method, bytes); + } +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java index 6756a21..4c2a70b 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java @@ -77,49 +77,42 @@ public class BaseSourceImpl implements BaseSource, MetricsSource { } + /** + * Dynamically register a histogram metrics when it does not exist. Does right by descriptions, + * but probably generates a bit of unnecessary garbage while the registry is populated the first + * time. + */ + protected MutableHistogram getHistogram(String name, String description) { + if (getMetricsRegistry().get(name) == null) { + getMetricsRegistry().newHistogram(name, description); + } + return getMetricsRegistry().getHistogram(name); + } + + @Override public void init() { this.metricsRegistry.clearMetrics(); } - /** - * Set a single gauge to a value. - * - * @param gaugeName gauge name - * @param value the new value of the gauge. - */ + @Override public void setGauge(String gaugeName, long value) { MutableGaugeLong gaugeInt = metricsRegistry.getLongGauge(gaugeName, value); gaugeInt.set(value); } - /** - * Add some amount to a gauge. - * - * @param gaugeName The name of the gauge to increment. - * @param delta The amount to increment the gauge by. - */ + @Override public void incGauge(String gaugeName, long delta) { MutableGaugeLong gaugeInt = metricsRegistry.getLongGauge(gaugeName, 0l); gaugeInt.incr(delta); } - /** - * Decrease the value of a named gauge. - * - * @param gaugeName The name of the gauge. - * @param delta the ammount to subtract from a gauge value. - */ + @Override public void decGauge(String gaugeName, long delta) { MutableGaugeLong gaugeInt = metricsRegistry.getLongGauge(gaugeName, 0l); gaugeInt.decr(delta); } - /** - * Increment a named counter by some value. - * - * @param key the name of the counter - * @param delta the ammount to increment - */ + @Override public void incCounters(String key, long delta) { MutableCounterLong counter = metricsRegistry.getLongCounter(key, 0l); counter.incr(delta); @@ -138,11 +131,7 @@ public class BaseSourceImpl implements BaseSource, MetricsSource { histo.add(value); } - /** - * Remove a named gauge. - * - * @param key - */ + @Override public void removeMetric(String key) { metricsRegistry.removeMetric(key); JmxCacheBuster.clearJmxCache(); @@ -157,18 +146,22 @@ public class BaseSourceImpl implements BaseSource, MetricsSource { return metricsRegistry; } + @Override public String getMetricsContext() { return metricsContext; } + @Override public String getMetricsDescription() { return metricsDescription; } + @Override public String getMetricsJmxContext() { return metricsJmxContext; } + @Override public String getMetricsName() { return metricsName; } diff --git a/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.MetricsConnectionSourceFactory b/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.MetricsConnectionSourceFactory new file mode 100644 index 0000000..4adf4c8 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.MetricsConnectionSourceFactory @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.hadoop.hbase.client.MetricsConnectionSourceFactoryImpl diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelperImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelperImpl.java index 3481cc2..2c46a7b 100644 --- a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelperImpl.java +++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelperImpl.java @@ -198,6 +198,13 @@ public class MetricsAssertHelperImpl implements MetricsAssertHelper { } @Override + public void assertCounterNotExist(String name, BaseSource source) { + getMetrics(source); + String cName = canonicalizeMetricName(name); + assertNull(name + " should be null", counters.get(cName)); + } + + @Override public long getCounter(String name, BaseSource source) { getMetrics(source); String cName = canonicalizeMetricName(name); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5312338..c2c3f73 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -851,7 +851,7 @@ public class HRegionServer extends HasThread implements // Setup RPC client for master communication rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( - rpcServices.isa.getAddress(), 0)); + rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); boolean onlyMetaRefresh = false; int storefileRefreshPeriod = conf.getInt( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index b2cb772..91f494a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; @@ -48,7 +49,7 @@ public class MetricsRegionServer { this.serverSource = serverSource; } - // for unit-test usage + @VisibleForTesting public MetricsRegionServerSource getMetricsSource() { return serverSource; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 418a0ac..0ae16ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -134,7 +134,7 @@ public class TestClientTimeouts { /** * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel */ - public static class RandomTimeoutRpcClient extends RpcClientImpl{ + public static class RandomTimeoutRpcClient extends RpcClientImpl { public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { super(conf, clusterId, localAddr); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 32eb9f6..d427419 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -163,7 +164,8 @@ public abstract class AbstractTestIPC { final String message = "hello"; EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); Pair r = - client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address); + client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address, + new MetricsConnection.CallStats()); assertTrue(r.getSecond() == null); // Silly assertion that the message is in the returned pb. assertTrue(r.getFirst().toString().contains(message)); @@ -205,7 +207,8 @@ public abstract class AbstractTestIPC { PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); Pair r = - client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address); + client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address, + new MetricsConnection.CallStats()); int index = 0; while (r.getSecond().advance()) { assertTrue(CELL.equals(r.getSecond().current())); @@ -231,7 +234,8 @@ public abstract class AbstractTestIPC { InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - client.call(null, md, param, null, User.getCurrent(), address); + client.call(null, md, param, null, User.getCurrent(), address, + new MetricsConnection.CallStats()); fail("Expected an exception to have been thrown!"); } catch (Exception e) { LOG.info("Caught expected exception: " + e.toString()); @@ -255,10 +259,10 @@ public abstract class AbstractTestIPC { MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); for (int i = 0; i < 10; i++) { - client.call( - new PayloadCarryingRpcController( - CellUtil.createCellScanner(ImmutableList. of(CELL))), md, param, md - .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress()); + client.call(new PayloadCarryingRpcController( + CellUtil.createCellScanner(ImmutableList. of(CELL))), md, param, + md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), + new MetricsConnection.CallStats()); } verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java index 18e3798..d761ae9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; @@ -116,7 +117,7 @@ public class TestAsyncIPC extends AbstractTestIPC { @Override protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) { setConf(conf); - return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) { + return new AsyncRpcClient(conf) { @Override Codec getCodec() { @@ -129,15 +130,13 @@ public class TestAsyncIPC extends AbstractTestIPC { @Override protected AsyncRpcClient createRpcClient(Configuration conf) { setConf(conf); - return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + return new AsyncRpcClient(conf); } @Override protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) { setConf(conf); - return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null, - new ChannelInitializer() { - + return new AsyncRpcClient(conf, new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { @@ -248,7 +247,7 @@ public class TestAsyncIPC extends AbstractTestIPC { TestRpcServer rpcServer = new TestRpcServer(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + AsyncRpcClient client = new AsyncRpcClient(conf); KeyValue kv = BIG_CELL; Put p = new Put(CellUtil.cloneRow(kv)); for (int i = 0; i < cellcount; i++) { @@ -282,7 +281,8 @@ public class TestAsyncIPC extends AbstractTestIPC { PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); // Pair response = - client.call(pcrc, md, builder.build(), param, user, address); + client.call(pcrc, md, builder.build(), param, user, address, + new MetricsConnection.CallStats()); /* * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), * count); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java index 60dbd1b..52c77e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java @@ -37,15 +37,15 @@ public class TestGlobalEventLoopGroup { public void test() { Configuration conf = HBaseConfiguration.create(); conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true); - AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + AsyncRpcClient client = new AsyncRpcClient(conf); assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP); - AsyncRpcClient client1 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + AsyncRpcClient client1 = new AsyncRpcClient(conf); assertSame(client.bootstrap.group(), client1.bootstrap.group()); client1.close(); assertFalse(client.bootstrap.group().isShuttingDown()); conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false); - AsyncRpcClient client2 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + AsyncRpcClient client2 = new AsyncRpcClient(conf); assertNotSame(client.bootstrap.group(), client2.bootstrap.group()); client2.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 67e4e4f..d1b8202 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; @@ -145,7 +146,8 @@ public class TestIPC extends AbstractTestIPC { PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); // Pair response = - client.call(pcrc, md, builder.build(), param, user, address); + client.call(pcrc, md, builder.build(), param, user, address, + new MetricsConnection.CallStats()); /* * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), * count); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index 8a6b092..a4e55d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -181,7 +182,7 @@ public class TestRpcHandlerException { new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))); client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(), - rpcServer.getListenerAddress()); + rpcServer.getListenerAddress(), new MetricsConnection.CallStats()); } catch (Throwable e) { assert(abortable.isAborted() == true); } finally { -- 1.9.5 (Apple Git-50.3)