From bf76ee25aba3b3c5c1ff332b5e55833e6399d9b3 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 8 Sep 2015 12:32:12 -0700 Subject: [PATCH] HBASE-12911 Client-side metrics Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java --- hbase-client/pom.xml | 22 ++++ .../apache/hadoop/hbase/client/HConnection.java | 5 + .../hadoop/hbase/client/HConnectionManager.java | 15 ++- .../hadoop/hbase/client/MetricsConnection.java | 112 ++++++++++++++++++ .../hbase/client/MetricsConnectionWrapperImpl.java | 79 +++++++++++++ .../org/apache/hadoop/hbase/ipc/RpcClient.java | 91 ++++++++++----- .../hadoop/hbase/client/TestMetricsConnection.java | 130 +++++++++++++++++++++ .../hbase/client/MetricsConnectionHostSource.java | 56 +++++++++ .../hbase/client/MetricsConnectionSource.java | 88 ++++++++++++++ .../client/MetricsConnectionSourceFactory.java | 32 +++++ .../hbase/client/MetricsConnectionWrapper.java | 46 ++++++++ .../hadoop/hbase/test/MetricsAssertHelper.java | 7 ++ .../client/MetricsConnectionHostSourceImpl.java | 53 +++++++++ .../client/MetricsConnectionSourceFactoryImpl.java | 33 ++++++ .../hbase/client/MetricsConnectionSourceImpl.java | 127 ++++++++++++++++++++ .../hadoop/hbase/metrics/BaseSourceImpl.java | 51 ++++---- ...oop.hbase.client.MetricsConnectionSourceFactory | 18 +++ .../hadoop/hbase/test/MetricsAssertHelperImpl.java | 7 ++ .../hadoop/hbase/regionserver/HRegionServer.java | 6 +- .../hbase/regionserver/MetricsRegionServer.java | 3 +- .../hadoop/hbase/client/TestClientTimeouts.java | 2 +- .../java/org/apache/hadoop/hbase/ipc/TestIPC.java | 7 +- 22 files changed, 926 insertions(+), 64 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapperImpl.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSource.java create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSource.java create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactory.java create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapper.java create mode 100644 hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSourceImpl.java create mode 100644 hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactoryImpl.java create mode 100644 hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceImpl.java create mode 100644 hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.MetricsConnectionSourceFactory diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index bb1a3c7..8dd04ea 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -128,6 +128,28 @@ org.apache.hbase + hbase-hadoop-compat + + + org.apache.hbase + hbase-hadoop-compat + test-jar + test + + + org.apache.hbase + ${compat.module} + ${project.version} + + + org.apache.hbase + ${compat.module} + ${project.version} + test-jar + test + + + org.apache.hbase hbase-common test-jar diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index e267c50..598c739 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -555,4 +555,9 @@ public interface HConnection extends Abortable, Closeable { * @return the configured client backoff policy */ ClientBackoffPolicy getBackoffPolicy(); + + /** + * @return the MetricsConnection instance associated with this connection. + */ + public MetricsConnection getConnectionMetrics(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 56ece6f..088aac5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -607,6 +607,7 @@ public class HConnectionManager { private NonceGenerator nonceGenerator = null; private final boolean usePrefetch; private final int prefetchRegionLimit; + private final MetricsConnection metrics; private volatile boolean closed; private volatile boolean aborted; @@ -666,7 +667,7 @@ public class HConnectionManager { // indicates whether this connection's life cycle is managed (by us) private boolean managed; - private User user; + protected User user; private RpcRetryingCallerFactory rpcCallerFactory; @@ -706,7 +707,7 @@ public class HConnectionManager { this.registry = setupRegistry(); retrieveClusterId(); - this.rpcClient = new RpcClient(this.conf, this.clusterId); + this.rpcClient = new RpcClient(this.conf, this.clusterId, this.metrics); // Do we publish the status? boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, @@ -782,7 +783,7 @@ public class HConnectionManager { this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); - + this.metrics = new MetricsConnection(new MetricsConnectionWrapperImpl(this)); } @Override @@ -1426,6 +1427,7 @@ public class HConnectionManager { Entry e = tableLocations.floorEntry(row); if (e == null) { + metrics.incrMetaCacheMiss(); return null; } HRegionLocation possibleRegion = e.getValue(); @@ -1439,10 +1441,12 @@ public class HConnectionManager { if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || tableName.getRowComparator().compareRows( endKey, 0, endKey.length, row, 0, row.length) > 0) { + metrics.incrMetaCacheHit(); return possibleRegion; } // Passed all the way through, so we got nothing - complete cache miss + metrics.incrMetaCacheMiss(); return null; } @@ -1783,6 +1787,11 @@ public class HConnectionManager { } @Override + public MetricsConnection getConnectionMetrics() { + return this.metrics; + } + + @Override // Nothing is done w/ the 'master' parameter. It is ignored. public AdminService.BlockingInterface getAdmin(final ServerName serverName, final boolean master) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java new file mode 100644 index 0000000..1f9b685 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * This class is for maintaining the various connection statistics and publishing them through + * the metrics interfaces. + */ +@InterfaceAudience.Private +public class MetricsConnection { + + /** A container class for collecting details about the RPC call as it percolates. */ + public static class CallStats { + private long requestSizeBytes = 0; + private long responseSizeBytes = 0; + private long startTime = 0; + private long callTimeMs = 0; + + public long getRequestSizeBytes() { + return requestSizeBytes; + } + + public void setRequestSizeBytes(long requestSizeBytes) { + this.requestSizeBytes = requestSizeBytes; + } + + public long getResponseSizeBytes() { + return responseSizeBytes; + } + + public void setResponseSizeBytes(long responseSizeBytes) { + this.responseSizeBytes = responseSizeBytes; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getCallTimeMs() { + return callTimeMs; + } + + public void setCallTimeMs(long callTimeMs) { + this.callTimeMs = callTimeMs; + } + } + + private final MetricsConnectionWrapper wrapper; + private final MetricsConnectionSource source; + + public MetricsConnection(MetricsConnectionWrapper wrapper) { + this.wrapper = wrapper; + this.source = CompatibilitySingletonFactory.getInstance(MetricsConnectionSourceFactory.class) + .createConnection(this.wrapper); + } + + @VisibleForTesting + MetricsConnectionSource getMetricsSource() { + return source; + } + + /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ + public static CallStats newCallStats() { + // TODO: instance pool to reduce GC? + return new CallStats(); + } + + /** Increment the number of meta cache hits. */ + public void incrMetaCacheHit() { + source.incrMetaCacheHit(); + } + + /** Increment the number of meta cache misses. */ + public void incrMetaCacheMiss() { + source.incrMetaCacheMiss(); + } + + /** Report RPC context to metrics system. */ + public void updateRpc(Descriptors.MethodDescriptor method, ServerName serverName, + CallStats stats) { + final String methodName = method.getService().getName() + "_" + method.getName(); + final String sn = serverName.toShortString().replace(':', '-'); + source.addRpcCallDuration(methodName, sn, stats.callTimeMs); + source.addRpcCallRequestSize(methodName, sn, stats.requestSizeBytes); + source.addRpcCallResponseSize(methodName, sn, stats.responseSizeBytes); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapperImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapperImpl.java new file mode 100644 index 0000000..63444ba --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapperImpl.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation; + +import java.util.concurrent.ThreadPoolExecutor; + +@InterfaceAudience.Private +public class MetricsConnectionWrapperImpl implements MetricsConnectionWrapper { + + private final HConnectionImplementation conn; + + public MetricsConnectionWrapperImpl(HConnectionImplementation connection) { + Preconditions.checkNotNull(connection); + this.conn = connection; + } + + @Override public String getId() { + return conn.toString(); + } + + @Override public String getUserName() { + return conn.user == null ? "" : conn.user.toString(); + } + + @Override public String getClusterId() { + return conn.clusterId; + } + + @Override public String getZookeeperQuorum() { + try { + return conn.getKeepAliveZooKeeperWatcher().getQuorum(); + } catch (Exception e) { + return ""; + } + } + + @Override public String getZookeeperBaseNode() { + try { + return conn.getKeepAliveZooKeeperWatcher().getBaseZNode(); + } catch (Exception e) { + return ""; + } + } + + @Override public int getExecutorPoolActiveCount() { + if (conn.getCurrentBatchPool() == null) { + return 0; + } + ThreadPoolExecutor tpe = (ThreadPoolExecutor) conn.getCurrentBatchPool(); + return tpe.getActiveCount(); + } + + @Override public int getExecutorPoolLargestPoolSize() { + if (conn.getCurrentBatchPool() == null) { + return 0; + } + ThreadPoolExecutor tpe = (ThreadPoolExecutor) conn.getCurrentBatchPool(); + return tpe.getLargestPoolSize(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 5c9f565..b383fc8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -134,6 +135,7 @@ public class RpcClient { protected final SocketFactory socketFactory; // how to create sockets protected String clusterId; protected final SocketAddress localAddr; + protected final MetricsConnection metrics; private final boolean fallbackAllowed; private UserProvider userProvider; @@ -277,15 +279,16 @@ public class RpcClient { Message responseDefaultType; IOException error; // exception, null if value boolean done; // true when call is done - long startTime; final MethodDescriptor md; + final MetricsConnection.CallStats callStats; protected Call(final MethodDescriptor md, Message param, final CellScanner cells, - final Message responseDefaultType) { + final Message responseDefaultType, MetricsConnection.CallStats callStats) { this.param = param; this.md = md; this.cells = cells; - this.startTime = System.currentTimeMillis(); + this.callStats = callStats; + this.callStats.setStartTime(System.currentTimeMillis()); this.responseDefaultType = responseDefaultType; synchronized (RpcClient.this) { this.id = counter++; @@ -329,7 +332,7 @@ public class RpcClient { } public long getStartTime() { - return this.startTime; + return this.callStats.getStartTime(); } } @@ -1049,7 +1052,8 @@ public class RpcClient { //noinspection SynchronizeOnNonFinalField RequestHeader header = builder.build(); synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC - IPCUtil.write(this.out, header, call.param, cellBlock); + call.callStats.setRequestSizeBytes( + IPCUtil.write(this.out, header, call.param, cellBlock)); } if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header)); @@ -1102,7 +1106,12 @@ public class RpcClient { if (isFatalConnectionException(exceptionResponse)) { markClosed(re); } else { - if (call != null) call.setException(re); + if (call != null) { + call.setException(re); + call.callStats.setResponseSizeBytes(totalSize); + call.callStats.setCallTimeMs( + System.currentTimeMillis() - call.callStats.getStartTime()); + } } } else { Message value = null; @@ -1121,7 +1130,12 @@ public class RpcClient { } // it's possible that this call may have been cleaned up due to a RPC // timeout, so check if it still exists before setting the value. - if (call != null) call.setResponse(value, cellBlockScanner); + if (call != null) { + call.setResponse(value, cellBlockScanner); + call.callStats.setResponseSizeBytes(totalSize); + call.callStats.setCallTimeMs( + System.currentTimeMillis() - call.callStats.getStartTime()); + } } if (call != null) calls.remove(id); } catch (IOException e) { @@ -1243,13 +1257,15 @@ public class RpcClient { } /** - * Construct an IPC cluster client whose values are of the {@link Message} class. + * Used in test only. Construct an IPC cluster client whose values are of the + * {@link Message} class. * @param conf configuration * @param clusterId * @param factory socket factory */ + @VisibleForTesting RpcClient(Configuration conf, String clusterId, SocketFactory factory) { - this(conf, clusterId, factory, null); + this(conf, clusterId, factory, null, null); } /** @@ -1258,8 +1274,10 @@ public class RpcClient { * @param clusterId * @param factory socket factory * @param localAddr client socket bind address + * @param metrics the connection metrics */ - RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) { + RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr, + MetricsConnection metrics) { this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, @@ -1279,6 +1297,8 @@ public class RpcClient { IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.localAddr = localAddr; this.userProvider = UserProvider.instantiate(conf); + this.metrics = metrics; + // login the server principal (if using secure Hadoop) if (LOG.isDebugEnabled()) { LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + @@ -1293,22 +1313,32 @@ public class RpcClient { } /** - * Construct an IPC client for the cluster clusterId with the default SocketFactory + * Helper method for tests only. Creates an {@code RpcClient} without metrics. + */ + @VisibleForTesting + public RpcClient(Configuration conf, String clusterId) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null); + } + + /** + * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory * @param conf configuration * @param clusterId */ - public RpcClient(Configuration conf, String clusterId) { - this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null); + public RpcClient(Configuration conf, String clusterId, MetricsConnection metrics) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, metrics); } /** - * Construct an IPC client for the cluster clusterId with the default SocketFactory + * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory * @param conf configuration * @param clusterId * @param localAddr client socket bind address. + * @param metrics the connection metrics. */ - public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { - this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr); + public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr, + MetricsConnection metrics) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics); } /** @@ -1411,10 +1441,12 @@ public class RpcClient { } } + @VisibleForTesting Pair call(MethodDescriptor md, Message param, CellScanner cells, Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout) throws InterruptedException, IOException { - return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS); + return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS, + MetricsConnection.newCallStats()); } /** Make a call, passing param, to the IPC server running at @@ -1437,9 +1469,9 @@ public class RpcClient { */ Pair call(MethodDescriptor md, Message param, CellScanner cells, Message returnType, User ticket, InetSocketAddress addr, - int rpcTimeout, int priority) + int rpcTimeout, int priority, MetricsConnection.CallStats callStats) throws InterruptedException, IOException { - Call call = new Call(md, param, cells, returnType); + Call call = new Call(md, param, cells, returnType, callStats); Connection connection = getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor); connection.writeRequest(call, priority); // send the parameter @@ -1644,12 +1676,8 @@ public class RpcClient { */ Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param, Message returnType, final User ticket, final InetSocketAddress isa, - final int rpcTimeout) + final int rpcTimeout, final ServerName serverName) throws ServiceException { - long startTime = 0; - if (LOG.isTraceEnabled()) { - startTime = System.currentTimeMillis(); - } PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller; CellScanner cells = null; if (pcrc != null) { @@ -1659,8 +1687,10 @@ public class RpcClient { } Pair val = null; try { + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + cs.setStartTime(System.currentTimeMillis()); val = call(md, param, cells, returnType, ticket, isa, rpcTimeout, - pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS); + pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS, cs); if (pcrc != null) { // Shove the results into controller so can be carried across the proxy/pb service void. if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond()); @@ -1668,10 +1698,13 @@ public class RpcClient { throw new ServiceException("Client dropping data on the floor!"); } + cs.setCallTimeMs(System.currentTimeMillis() - cs.getStartTime()); + if (metrics != null) { + metrics.updateRpc(md, serverName, cs); + } if (LOG.isTraceEnabled()) { - long callTime = System.currentTimeMillis() - startTime; if (LOG.isTraceEnabled()) { - LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms"); + LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); } } return val.getFirst(); @@ -1698,6 +1731,7 @@ public class RpcClient { */ // Public so can be subclassed for tests. public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { + private final ServerName sn; private final InetSocketAddress isa; private volatile RpcClient rpcClient; private final int rpcTimeout; @@ -1705,6 +1739,7 @@ public class RpcClient { protected BlockingRpcChannelImplementation(final RpcClient rpcClient, final ServerName sn, final User ticket, final int rpcTimeout) { + this.sn = sn; this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); this.rpcClient = rpcClient; // Set the rpc timeout to be the minimum of configured timeout and whatever the current @@ -1718,7 +1753,7 @@ public class RpcClient { Message param, Message returnType) throws ServiceException { return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket, - this.isa, this.rpcTimeout); + this.isa, this.rpcTimeout, this.sn); } } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java new file mode 100644 index 0000000..1ac645f --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.CompatibilityFactory; +import org.apache.hadoop.hbase.test.MetricsAssertHelper; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.apache.hadoop.metrics2.lib.MutableHistogram.MAX_METRIC_NAME; +import static org.apache.hadoop.metrics2.lib.MutableHistogram.MIN_METRIC_NAME; +import static org.apache.hadoop.metrics2.lib.MutableHistogram.NUM_OPS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.MetricsConnectionSource.*; +import static org.junit.Assert.*; + +/** Unit test over client metrics */ +@Category(SmallTests.class) +public class TestMetricsConnection { + + private static class MetricsConnectionWrapperStub implements MetricsConnectionWrapper { + @Override public String getId() { + return "testConnectionId"; + } + + @Override public String getUserName() { + return "testUser"; + } + + @Override public String getClusterId() { + return "testClusterId"; + } + + @Override public String getZookeeperQuorum() { + return "foo:bar:baz"; + } + + @Override public String getZookeeperBaseNode() { + return "/testing"; + } + + @Override public int getExecutorPoolActiveCount() { + return 52; + } + + @Override public int getExecutorPoolLargestPoolSize() { + return 53; + } + } + + public static MetricsAssertHelper HELPER = + CompatibilityFactory.getInstance(MetricsAssertHelper.class); + + private MetricsConnectionWrapper wrapper; + private MetricsConnectionSourceImpl source; + + @BeforeClass + public static void classSetUp() { + HELPER.init(); + } + + @Before + public void before() { + wrapper = new MetricsConnectionWrapperStub(); + source = (MetricsConnectionSourceImpl) new MetricsConnection(wrapper).getMetricsSource(); + } + + @Test + public void testWrapperSource() { + HELPER.assertTag(CONNECTION_ID_NAME, wrapper.getId(), source); + HELPER.assertTag(USER_NAME_NAME, wrapper.getUserName(), source); + HELPER.assertTag(CLUSTER_ID_NAME, wrapper.getClusterId(), source); + HELPER.assertTag(ZOOKEEPER_QUORUM_NAME, wrapper.getZookeeperQuorum(), source); + HELPER.assertTag(ZOOKEEPER_ZNODE_NAME, wrapper.getZookeeperBaseNode(), source); + HELPER.assertGauge(EXEC_POOL_ACTIVE_THREAD_NAME, wrapper.getExecutorPoolActiveCount(), source); + HELPER.assertGauge(EXEC_POOL_LARGEST_SIZE_NAME, wrapper.getExecutorPoolLargestPoolSize(), + source); + } + + /** Should really be in a TestMetricsConnectionSourceImpl */ + @Test + public void testDynamicMethodRegistration() { + source.addRpcCallDuration("MethodFoo", "Server1", 10); + source.addRpcCallDuration("MethodFoo", "Server1", 20); + source.addRpcCallDuration("MethodFoo", "Server2", 30); + source.addRpcCallDuration("MethodBar", "Server2", 40); + + HELPER.assertCounter(/* we should see 3 total calls to MethodFoo */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + NUM_OPS_METRIC_NAME, 3, source); + HELPER.assertCounter(/* we should see 1 total call to MethodBar */ + RPC_CALL_DURATION_NAME_BASE + "MethodBar" + NUM_OPS_METRIC_NAME, 1, source); + + MetricsConnectionHostSource server1 = source.getOrCreate("Server1"); + assertNotNull("getOrCreate should always return something", server1); + HELPER.assertCounter(/* 2 calls of MethodFoo were made to server1 */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + NUM_OPS_METRIC_NAME, 2, server1); + HELPER.assertGauge(/* the smallest value reported by server1 should be 10 */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + MIN_METRIC_NAME, 10, server1); + HELPER.assertGauge(/* the largest value reported by server1 should be 20 */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + MAX_METRIC_NAME, 20, server1); + HELPER.assertCounterNotExist(/* assert no calls of MethodBar were made to server1 */ + RPC_CALL_DURATION_NAME_BASE + "MethodBar" + NUM_OPS_METRIC_NAME, server1); + + MetricsConnectionHostSource server2 = source.getOrCreate("Server2"); + assertNotNull("getOrCreate should always return something", server2); + HELPER.assertCounter(/* 1 call of MethodFoo was made to server2 */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + NUM_OPS_METRIC_NAME, 1, server2); + HELPER.assertGauge(/* the smallest value reported by server2 should be 30 */ + RPC_CALL_DURATION_NAME_BASE + "MethodFoo" + MIN_METRIC_NAME, 30, server2); + HELPER.assertCounter(/* 1 call of MethodBar was made to server2 */ + RPC_CALL_DURATION_NAME_BASE + "MethodBar" + NUM_OPS_METRIC_NAME, 1, server2); + } +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSource.java new file mode 100644 index 0000000..42ed660 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSource.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.metrics.BaseSource; + +/** + * Reports Host-level metrics as observed by the client. + */ +public interface MetricsConnectionHostSource extends BaseSource { + /** The name of the metrics. */ + String METRICS_NAME = "Host"; + + /** The name of the metrics context that metrics will be under. */ + String METRICS_CONTEXT = "connection"; + + /** Description */ + String METRICS_DESCRIPTION = "Metrics describing interactions with target host"; + + /** The name of the metrics context that metrics will be under in jmx. */ + String METRICS_JMX_CONTEXT_FMT = "Client,sub01=%s,sub02=%s"; + + /** Prefix string used for describing the rpc call time for a specific method. */ + String RPC_CALL_DURATION_NAME_BASE = "rpcCallDurationMs_"; + String RPC_CALL_DURATION_DESC = "The duration of an RPC call in milliseconds."; + /** Prefix string used for describing the rpc call time for a specific method. */ + String RPC_CALL_REQUEST_SIZE_NAME_BASE = "rpcCallRequestSizeBytes_"; + String RPC_CALL_REQUEST_SIZE_DESC = "The size of an RPC call request in bytes."; + /** Prefix string used for describing the rpc call time for a specific method. */ + String RPC_CALL_RESPONSE_SIZE_NAME_BASE = "rpcCallResponseSizeBytes_"; + String RPC_CALL_RESPONSE_SIZE_DESC = "The size of an RPC call response in bytes."; + + /** Record an RPC call duration. */ + void addRpcCallDuration(String method, long ms); + + /** Record an RPC request size. */ + void addRpcCallRequestSize(String method, long bytes); + + /** Record an RPC response size. */ + void addRpcCallResponseSize(String method, long bytes); +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSource.java new file mode 100644 index 0000000..79a75dc --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSource.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.metrics.BaseSource; + +public interface MetricsConnectionSource extends BaseSource { + + /** The name of the metrics. */ + String METRICS_NAME = "Connection"; + + /** The name of the metrics context that metrics will be under. */ + String METRICS_CONTEXT = "connection"; + + /** Description. */ + String METRICS_DESCRIPTION = "Metrics about HBase Connection"; + + /** + * The name of the metrics context that metrics will be under in jmx. + * + * The "sub01, sub02" stuff is how we create a nested bean structure. See explanation over on + * http://stackoverflow.com/questions/20669928/is-it-possible-to-create-jmx-subdomains + */ + String METRICS_JMX_CONTEXT_FMT = "Client,sub01=%s,sub02=connection"; + + /** Increment number of meta cache hits. */ + void incrMetaCacheHit(); + + /** Increment number of meta cache misses. */ + void incrMetaCacheMiss(); + + /** Add a new rpc call observation, duration of call in milliseconds. */ + void addRpcCallDuration(String method, String serverName, long ms); + + /** Add a new rpc call observation, request size in bytes. */ + void addRpcCallRequestSize(String method, String serverName, long bytes); + + /** Add a new rpc call observation, response size in bytes. */ + void addRpcCallResponseSize(String method, String serverName, long bytes); + + // Strings used for exporting to metrics system. + String CONNECTION_ID_NAME = "connectionId"; + String CONNECTION_ID_DESC = "The connection's process-unique identifier."; + String USER_NAME_NAME = "userName"; + String USER_NAME_DESC = "The user on behalf of whom the Connection is acting."; + String CLUSTER_ID_NAME = "clusterId"; + String CLUSTER_ID_DESC = "Cluster Id"; + String ZOOKEEPER_QUORUM_NAME = "zookeeperQuorum"; + String ZOOKEEPER_QUORUM_DESC = "Zookeeper Quorum"; + String ZOOKEEPER_ZNODE_NAME = "zookeeperBaseZNode"; + String ZOOKEEPER_ZNODE_DESC = "Base ZNode for this cluster."; + + String META_CACHE_HIT_NAME = "metaCacheHit"; + String META_CACHE_HIT_DESC = + "A counter on the number of times this connection's meta cache has a valid region location."; + String META_CACHE_MISS_NAME = "metaCacheMiss"; + String META_CACHE_MISS_DESC = + "A counter on the number of times this connection does not know where to find a region."; + + String EXEC_POOL_ACTIVE_THREAD_NAME = "executorPoolActiveThreads"; + String EXEC_POOL_ACTIVE_THREAD_DESC = + "The approximate number of threads executing table operations."; + String EXEC_POOL_LARGEST_SIZE_NAME = "executorPoolLargestSize"; + String EXEC_POOL_LARGEST_SIZE_DESC = + "The largest number of threads that have ever simultaneously been in the pool."; + + String RPC_CALL_DURATION_NAME_BASE = "rpcCallDurationMs_"; + String RPC_CALL_DURATION_DESC = "The duration of an RPC call in milliseconds."; + String RPC_CALL_REQUEST_SIZE_NAME_BASE = "rpcCallRequestSizeBytes_"; + String RPC_CALL_REQUEST_SIZE_DESC = "The size of an RPC call request in bytes."; + String RPC_CALL_RESPONSE_SIZE_NAME_BASE = "rpcCallResponseSizeBytes_"; + String RPC_CALL_RESPONSE_SIZE_DESC = "The size of an RPC call response in bytes."; +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactory.java new file mode 100644 index 0000000..21585ae --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +/** + * Interface of a factory to create Metrics Sources used inside of Connections. + */ +public interface MetricsConnectionSourceFactory { + + /** + * Given a wrapper create a {@link MetricsConnectionSource}. + * + * @param wrapper The wrapped Connection + * @return a Metrics Source. + */ + MetricsConnectionSource createConnection(MetricsConnectionWrapper wrapper); +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapper.java new file mode 100644 index 0000000..03bed1a --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionWrapper.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +/** + * This is the interface that will expose Connection information to hadoop1/hadoop2 + * implementations of the {@link MetricsConnectionSource}. + */ +public interface MetricsConnectionWrapper { + + /** Get the connection's unique identifier */ + String getId(); + + /** Get the User's name. */ + String getUserName(); + + /** Get the Cluster ID */ + String getClusterId(); + + /** Get the Zookeeper Quorum Info */ + String getZookeeperQuorum(); + + /** Get the base ZNode for this cluster. */ + String getZookeeperBaseNode(); + + /** The approximate number of threads executing table operations. */ + int getExecutorPoolActiveCount(); + + /** The largest number of threads that have ever simultaneously been in the pool. */ + int getExecutorPoolLargestPoolSize(); +} diff --git a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelper.java b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelper.java index 2eefcd2..14fa3b4 100644 --- a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelper.java +++ b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelper.java @@ -130,6 +130,13 @@ public interface MetricsAssertHelper { void assertCounterLt(String name, long expected, BaseSource source); /** + * Assert that a counter does not exist. + * @param name The name of the counter + * @param source The {@link BaseSource} that will provide the tags, gauges, and counters. + */ + void assertCounterNotExist(String name, BaseSource source); + + /** * Get the value of a counter. * * @param name name of the counter. diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSourceImpl.java new file mode 100644 index 0000000..1ca6bd3 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionHostSourceImpl.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; + +/** + * Reports Host-level metrics as observed by the client. + */ +@InterfaceAudience.Private +public class MetricsConnectionHostSourceImpl extends BaseSourceImpl + implements MetricsConnectionHostSource { + + public MetricsConnectionHostSourceImpl(MetricsConnectionWrapper wrapper, String serverName) { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, + String.format(METRICS_JMX_CONTEXT_FMT, wrapper.getId(), serverName)); + } + + public MetricsConnectionHostSourceImpl(String metricsName, String metricsDescription, + String metricsContext, String metricsJmxContext) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + } + + @Override public void addRpcCallDuration(String method, long ms) { + getHistogram(RPC_CALL_DURATION_NAME_BASE + method, RPC_CALL_DURATION_DESC).add(ms); + } + + @Override public void addRpcCallRequestSize(String method, long bytes) { + getHistogram(RPC_CALL_REQUEST_SIZE_NAME_BASE + method, RPC_CALL_REQUEST_SIZE_DESC) + .add(bytes); + } + + @Override public void addRpcCallResponseSize(String method, long bytes) { + getHistogram(RPC_CALL_RESPONSE_SIZE_NAME_BASE + method, RPC_CALL_RESPONSE_SIZE_DESC) + .add(bytes); + } +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactoryImpl.java new file mode 100644 index 0000000..d29c5c9 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceFactoryImpl.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class MetricsConnectionSourceFactoryImpl implements MetricsConnectionSourceFactory { + + @Override public MetricsConnectionSource createConnection(MetricsConnectionWrapper wrapper) { + return new MetricsConnectionSourceImpl(wrapper); + } + + public MetricsConnectionHostSource createHostClient(MetricsConnectionWrapper wrapper, + String serverName) { + return new MetricsConnectionHostSourceImpl(wrapper, serverName); + } +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceImpl.java new file mode 100644 index 0000000..f383ac6 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/client/MetricsConnectionSourceImpl.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +import java.util.HashMap; +import java.util.Map; + +@InterfaceAudience.Private +public class MetricsConnectionSourceImpl + extends BaseSourceImpl implements MetricsConnectionSource { + + // wrapper for access statistics collected in Connection instance + private final MetricsConnectionWrapper wrapper; + + // Hadoop Metric2 objects for additional monitoring + + private final MutableCounterLong metaCacheHit; + private final MutableCounterLong metaCacheMiss; + + private final Map byHost; + + public MetricsConnectionSourceImpl(MetricsConnectionWrapper wrapper) { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, + String.format(METRICS_JMX_CONTEXT_FMT, wrapper.getId()), wrapper); + } + + public MetricsConnectionSourceImpl(String metricsName, String metricsDescription, + String metricsContext, String metricsJmxContext, MetricsConnectionWrapper wrapper) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + this.wrapper = wrapper; + + metaCacheHit = getMetricsRegistry().newCounter(META_CACHE_HIT_NAME, META_CACHE_HIT_DESC, 0L); + metaCacheMiss = + getMetricsRegistry().newCounter(META_CACHE_MISS_NAME, META_CACHE_MISS_DESC, 0L); + byHost = new HashMap(); + } + + /** + * Dynamically register metrics on first invocation. A new bean is registered for + * {@code serverName} upon initial encounter. RPC calls to said host are registered on that bean + * on first invocation. + */ + @VisibleForTesting + MetricsConnectionHostSource getOrCreate(String serverName) { + // TODO: how to handle expiration of metrics for transient hosts? + MetricsConnectionHostSource hostSource = byHost.get(serverName); + if (hostSource == null) { + synchronized (byHost) { + hostSource = byHost.get(serverName); + if (hostSource == null) { + hostSource = new MetricsConnectionHostSourceImpl(wrapper, serverName); + byHost.put(serverName, hostSource); + } + } + } + return hostSource; + } + + @Override + public void getMetrics(MetricsCollector metricsCollector, boolean all) { + + MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName); + + if (wrapper != null) { + mrb.addGauge(Interns.info(EXEC_POOL_ACTIVE_THREAD_NAME, EXEC_POOL_ACTIVE_THREAD_DESC), + wrapper.getExecutorPoolActiveCount()) + .addGauge(Interns.info(EXEC_POOL_LARGEST_SIZE_NAME, EXEC_POOL_LARGEST_SIZE_DESC), + wrapper.getExecutorPoolLargestPoolSize()) + .tag(Interns.info(CONNECTION_ID_NAME, CONNECTION_ID_DESC), wrapper.getId()) + .tag(Interns.info(USER_NAME_NAME, USER_NAME_DESC), wrapper.getUserName()) + .tag(Interns.info(CLUSTER_ID_NAME, CLUSTER_ID_DESC), wrapper.getClusterId()) + .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), + wrapper.getZookeeperQuorum()) + .tag(Interns.info(ZOOKEEPER_ZNODE_NAME, ZOOKEEPER_ZNODE_DESC), + wrapper.getZookeeperBaseNode()); + } + + metricsRegistry.snapshot(mrb, all); + } + + @Override public void incrMetaCacheHit() { + metaCacheHit.incr(); + } + + @Override public void incrMetaCacheMiss() { + metaCacheMiss.incr(); + } + + @Override public void addRpcCallDuration(String method, String serverName, long ms) { + getHistogram(RPC_CALL_DURATION_NAME_BASE + method, RPC_CALL_DURATION_DESC).add(ms); + getOrCreate(serverName).addRpcCallDuration(method, ms); + } + + @Override public void addRpcCallRequestSize(String method, String serverName, long bytes) { + getHistogram(RPC_CALL_REQUEST_SIZE_NAME_BASE + method, RPC_CALL_REQUEST_SIZE_DESC).add(bytes); + getOrCreate(serverName).addRpcCallRequestSize(method, bytes); + } + + @Override public void addRpcCallResponseSize(String method, String serverName, long bytes) { + getHistogram(RPC_CALL_RESPONSE_SIZE_NAME_BASE + method, RPC_CALL_RESPONSE_SIZE_DESC) + .add(bytes); + getOrCreate(serverName).addRpcCallResponseSize(method, bytes); + } +} diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java index 7d76574..75b4124 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java @@ -79,49 +79,42 @@ public class BaseSourceImpl implements BaseSource, MetricsSource { } + /** + * Dynamically register a histogram metrics when it does not exist. Does right by descriptions, + * but probably generates a bit of unnecessary garbage while the registry is populated the first + * time. + */ + protected MutableHistogram getHistogram(String name, String description) { + if (getMetricsRegistry().get(name) == null) { + getMetricsRegistry().newHistogram(name, description); + } + return getMetricsRegistry().getHistogram(name); + } + + @Override public void init() { this.metricsRegistry.clearMetrics(); } - /** - * Set a single gauge to a value. - * - * @param gaugeName gauge name - * @param value the new value of the gauge. - */ + @Override public void setGauge(String gaugeName, long value) { MutableGaugeLong gaugeInt = metricsRegistry.getLongGauge(gaugeName, value); gaugeInt.set(value); } - /** - * Add some amount to a gauge. - * - * @param gaugeName The name of the gauge to increment. - * @param delta The amount to increment the gauge by. - */ + @Override public void incGauge(String gaugeName, long delta) { MutableGaugeLong gaugeInt = metricsRegistry.getLongGauge(gaugeName, 0l); gaugeInt.incr(delta); } - /** - * Decrease the value of a named gauge. - * - * @param gaugeName The name of the gauge. - * @param delta the ammount to subtract from a gauge value. - */ + @Override public void decGauge(String gaugeName, long delta) { MutableGaugeLong gaugeInt = metricsRegistry.getLongGauge(gaugeName, 0l); gaugeInt.decr(delta); } - /** - * Increment a named counter by some value. - * - * @param key the name of the counter - * @param delta the ammount to increment - */ + @Override public void incCounters(String key, long delta) { MutableCounterLong counter = metricsRegistry.getLongCounter(key, 0l); counter.incr(delta); @@ -140,11 +133,7 @@ public class BaseSourceImpl implements BaseSource, MetricsSource { histo.add(value); } - /** - * Remove a named gauge. - * - * @param key - */ + @Override public void removeMetric(String key) { metricsRegistry.removeMetric(key); JmxCacheBuster.clearJmxCache(); @@ -159,18 +148,22 @@ public class BaseSourceImpl implements BaseSource, MetricsSource { return metricsRegistry; } + @Override public String getMetricsContext() { return metricsContext; } + @Override public String getMetricsDescription() { return metricsDescription; } + @Override public String getMetricsJmxContext() { return metricsJmxContext; } + @Override public String getMetricsName() { return metricsName; } diff --git a/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.MetricsConnectionSourceFactory b/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.MetricsConnectionSourceFactory new file mode 100644 index 0000000..4adf4c8 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.client.MetricsConnectionSourceFactory @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.hadoop.hbase.client.MetricsConnectionSourceFactoryImpl diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelperImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelperImpl.java index 3481cc2..2c46a7b 100644 --- a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelperImpl.java +++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/test/MetricsAssertHelperImpl.java @@ -198,6 +198,13 @@ public class MetricsAssertHelperImpl implements MetricsAssertHelper { } @Override + public void assertCounterNotExist(String name, BaseSource source) { + getMetrics(source); + String cName = canonicalizeMetricName(name); + assertNull(name + " should be null", counters.get(cName)); + } + + @Override public long getCounter(String name, BaseSource source) { getMetrics(source); String cName = canonicalizeMetricName(name); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 95aa124..01f206b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -57,6 +57,9 @@ import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.client.MetricsConnectionWrapper; +import org.apache.hadoop.hbase.client.MetricsConnectionWrapperImpl; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -873,8 +876,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } // Setup RPC client for master communication + // TODO: no single connection managed anywhere, so no central metrics object to obtain. rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress( - this.isa.getAddress(), 0)); + this.isa.getAddress(), 0), null); this.pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index 9f98ba6..857ea55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; @@ -47,7 +48,7 @@ public class MetricsRegionServer { this.serverSource = serverSource; } - // for unit-test usage + @VisibleForTesting public MetricsRegionServerSource getMetricsSource() { return serverSource; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 998f7ad..2e1a9b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -162,4 +162,4 @@ public class TestClientTimeouts { return super.callBlockingMethod(md, controller, param, returnType); } } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 14898b0..9dfff46 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -48,6 +48,10 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; @@ -342,7 +346,8 @@ public class TestIPC { } CellScanner cellScanner = CellUtil.createCellScanner(cells); Pair response = - client.call(md, builder.build(), cellScanner, param, user, address, 0); + client.call(md, builder.build(), cellScanner, param, user, address, 0, + HConstants.NORMAL_QOS, new MetricsConnection.CallStats()); /* int count = 0; while (p.getSecond().advance()) { -- 1.9.5 (Apple Git-50.3)