From 6acd432fc0b9571499939750ed4a114a0c8749b8 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Mon, 5 Oct 2015 10:19:40 -0700 Subject: [PATCH] HBASE-12911 Client-side metrics --- hbase-client/pom.xml | 4 + .../apache/hadoop/hbase/client/HConnection.java | 5 + .../hadoop/hbase/client/HConnectionManager.java | 25 +- .../hadoop/hbase/client/MetricsConnection.java | 315 +++++++++++++++++++++ .../org/apache/hadoop/hbase/ipc/RpcClient.java | 84 ++++-- .../hadoop/hbase/client/TestMetricsConnection.java | 119 ++++++++ .../hadoop/hbase/regionserver/HRegionServer.java | 3 +- .../hbase/regionserver/MetricsRegionServer.java | 3 +- .../hadoop/hbase/client/TestClientTimeouts.java | 2 +- .../java/org/apache/hadoop/hbase/ipc/TestIPC.java | 4 +- 10 files changed, 530 insertions(+), 34 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index bb1a3c7..afa7ccf 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -189,6 +189,10 @@ log4j test + + com.yammer.metrics + metrics-core + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index e267c50..598c739 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -555,4 +555,9 @@ public interface HConnection extends Abortable, Closeable { * @return the configured client backoff policy */ ClientBackoffPolicy getBackoffPolicy(); + + /** + * @return the MetricsConnection instance associated with this connection. + */ + public MetricsConnection getConnectionMetrics(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 56ece6f..6c8b986 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; + import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -642,6 +644,8 @@ public class HConnectionManager { // Client rpc instance. private RpcClient rpcClient; + private final MetricsConnection metrics; + /** * Map of table to table {@link HRegionLocation}s. */ @@ -666,7 +670,7 @@ public class HConnectionManager { // indicates whether this connection's life cycle is managed (by us) private boolean managed; - private User user; + protected User user; private RpcRetryingCallerFactory rpcCallerFactory; @@ -706,7 +710,7 @@ public class HConnectionManager { this.registry = setupRegistry(); retrieveClusterId(); - this.rpcClient = new RpcClient(this.conf, this.clusterId); + this.rpcClient = new RpcClient(this.conf, this.clusterId, this.metrics); // Do we publish the status? boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, @@ -782,7 +786,11 @@ public class HConnectionManager { this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); - + if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { + this.metrics = new MetricsConnection(this); + } else { + this.metrics = null; + } } @Override @@ -819,6 +827,11 @@ public class HConnectionManager { pool); } + @Override + public MetricsConnection getConnectionMetrics() { + return this.metrics; + } + private ExecutorService getBatchPool() { if (batchPool == null) { // shared HTable thread executor not yet initialized @@ -1426,6 +1439,7 @@ public class HConnectionManager { Entry e = tableLocations.floorEntry(row); if (e == null) { + if (metrics != null) metrics.incrMetaCacheMiss(); return null; } HRegionLocation possibleRegion = e.getValue(); @@ -1439,10 +1453,12 @@ public class HConnectionManager { if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || tableName.getRowComparator().compareRows( endKey, 0, endKey.length, row, 0, row.length) > 0) { + if (metrics != null) metrics.incrMetaCacheHit(); return possibleRegion; } // Passed all the way through, so we got nothing - complete cache miss + if (metrics != null) metrics.incrMetaCacheMiss(); return null; } @@ -2657,6 +2673,9 @@ public class HConnectionManager { delayedClosing.stop("Closing connection"); closeMaster(); shutdownBatchPool(); + if (this.metrics != null) { + this.metrics.shutdown(); + } this.closed = true; closeZooKeeperWatcher(); this.stubs.clear(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java new file mode 100644 index 0000000..febf03f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.core.Timer; +import com.yammer.metrics.reporting.JmxReporter; +import com.yammer.metrics.util.RatioGauge; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * This class is for maintaining the various connection statistics and publishing them through + * the metrics interfaces. + * + * This class manages its own {@link MetricsRegistry} and {@link JmxReporter} so as to not + * conflict with other uses of Yammer Metrics within the client application. Instantiating + * this class implicitly creates and "starts" instances of these classes; be sure to call + * {@link #shutdown()} to terminate the thread pools they allocate. + */ +@InterfaceAudience.Private +public class MetricsConnection { + + /** Set this key to {@code true} to enable metrics collection of client requests. */ + public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; + + private static final String DRTN_BASE = "rpcCallDurationMs_"; + private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; + private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; + private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); + + /** A container class for collecting details about the RPC call as it percolates. */ + public static class CallStats { + private long requestSizeBytes = 0; + private long responseSizeBytes = 0; + private long startTime = 0; + private long callTimeMs = 0; + + public long getRequestSizeBytes() { + return requestSizeBytes; + } + + public void setRequestSizeBytes(long requestSizeBytes) { + this.requestSizeBytes = requestSizeBytes; + } + + public long getResponseSizeBytes() { + return responseSizeBytes; + } + + public void setResponseSizeBytes(long responseSizeBytes) { + this.responseSizeBytes = responseSizeBytes; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getCallTimeMs() { + return callTimeMs; + } + + public void setCallTimeMs(long callTimeMs) { + this.callTimeMs = callTimeMs; + } + } + + @VisibleForTesting + protected final class CallTracker { + private final String name; + @VisibleForTesting final Timer callTimer; + @VisibleForTesting final Histogram reqHist; + @VisibleForTesting final Histogram respHist; + + private CallTracker(MetricsRegistry registry, String name, String subName, String scope) { + StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); + if (subName != null) { + sb.append("(").append(subName).append(")"); + } + this.name = sb.toString(); + this.callTimer = registry.newTimer(MetricsConnection.class, DRTN_BASE + this.name, scope); + this.reqHist = registry.newHistogram(MetricsConnection.class, REQ_BASE + this.name, scope); + this.respHist = registry.newHistogram(MetricsConnection.class, RESP_BASE + this.name, scope); + } + + private CallTracker(MetricsRegistry registry, String name, String scope) { + this(registry, name, null, scope); + } + + public void updateRpc(CallStats stats) { + this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); + this.reqHist.update(stats.getRequestSizeBytes()); + this.respHist.update(stats.getResponseSizeBytes()); + } + + @Override + public String toString() { + return "CallTracker:" + name; + } + } + + /** A lambda for dispatching to the appropriate metric factory method */ + private static interface NewMetric { + T newMetric(Class clazz, String name, String scope); + } + + /** Anticipated number of metric entries */ + private static final int CAPACITY = 50; + /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ + private static final float LOAD_FACTOR = 0.75f; + /** + * Anticipated number of concurrent accessor threads, from + * {@link HConnectionManager.HConnectionImplementation#getBatchPool()} + */ + private static final int CONCURRENCY_LEVEL = 256; + + private final MetricsRegistry registry; + private final JmxReporter reporter; + private final String scope; + + private final NewMetric timerFactory = new NewMetric() { + @Override public Timer newMetric(Class clazz, String name, String scope) { + return registry.newTimer(clazz, name, scope); + } + }; + + private final NewMetric histogramFactory = new NewMetric() { + @Override public Histogram newMetric(Class clazz, String name, String scope) { + return registry.newHistogram(clazz, name, scope); + } + }; + + // static metrics + + @VisibleForTesting protected final Counter metaCacheHits; + @VisibleForTesting protected final Counter metaCacheMisses; + @VisibleForTesting protected final CallTracker getTracker; + @VisibleForTesting protected final CallTracker scanTracker; + @VisibleForTesting protected final CallTracker appendTracker; + @VisibleForTesting protected final CallTracker deleteTracker; + @VisibleForTesting protected final CallTracker incrementTracker; + @VisibleForTesting protected final CallTracker putTracker; + @VisibleForTesting protected final CallTracker multiTracker; + + // dynamic metrics + + // These maps are used to cache references to the metric instances that are managed by the + // registry. I don't think their use perfectly removes redundant allocations, but it's + // a big improvement over calling registry.newMetric each time. + @VisibleForTesting protected final ConcurrentMap rpcTimers = + new ConcurrentHashMap(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); + @VisibleForTesting protected final ConcurrentMap rpcHistograms = + new ConcurrentHashMap( + CAPACITY * 2 /* tracking both request and response sizes */, + LOAD_FACTOR, CONCURRENCY_LEVEL); + + public MetricsConnection(final HConnectionManager.HConnectionImplementation conn) { + this.scope = conn.toString(); + this.registry = new MetricsRegistry(); + final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool(); + + this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope, + new RatioGauge() { + @Override protected double getNumerator() { + return batchPool.getActiveCount(); + } + @Override protected double getDenominator() { + return batchPool.getMaximumPoolSize(); + } + }); + this.metaCacheHits = registry.newCounter(this.getClass(), "metaCacheHits", scope); + this.metaCacheMisses = registry.newCounter(this.getClass(), "metaCacheMisses", scope); + this.getTracker = new CallTracker(this.registry, "Get", scope); + this.scanTracker = new CallTracker(this.registry, "Scan", scope); + this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); + this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); + this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); + this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); + this.multiTracker = new CallTracker(this.registry, "Multi", scope); + this.reporter = new JmxReporter(this.registry); + this.reporter.start(); + } + + public void shutdown() { + this.reporter.shutdown(); + this.registry.shutdown(); + } + + /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ + public static CallStats newCallStats() { + // TODO: instance pool to reduce GC? + return new CallStats(); + } + + /** Increment the number of meta cache hits. */ + public void incrMetaCacheHit() { + metaCacheHits.inc(); + } + + /** Increment the number of meta cache misses. */ + public void incrMetaCacheMiss() { + metaCacheMisses.inc(); + } + + /** + * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. + */ + private T getMetric(String key, ConcurrentMap map, NewMetric factory) { + T t = map.get(key); + if (t == null) { + t = factory.newMetric(this.getClass(), key, scope); + map.putIfAbsent(key, t); + } + return t; + } + + /** Update call stats for non-critical-path methods */ + private void updateRpcGeneric(MethodDescriptor method, CallStats stats) { + final String methodName = method.getService().getName() + "_" + method.getName(); + getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory) + .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); + getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) + .update(stats.getRequestSizeBytes()); + getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) + .update(stats.getResponseSizeBytes()); + } + + /** Report RPC context to metrics system. */ + public void updateRpc(MethodDescriptor method, Message param, CallStats stats) { + // this implementation is tied directly to protobuf implementation details. would be better + // if we could dispatch based on something static, ie, request Message type. + if (method.getService() == ClientService.getDescriptor()) { + switch(method.getIndex()) { + case 0: + assert "Get".equals(method.getName()); + getTracker.updateRpc(stats); + return; + case 1: + assert "Mutate".equals(method.getName()); + final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); + switch(mutationType) { + case APPEND: + appendTracker.updateRpc(stats); + return; + case DELETE: + deleteTracker.updateRpc(stats); + return; + case INCREMENT: + incrementTracker.updateRpc(stats); + return; + case PUT: + putTracker.updateRpc(stats); + return; + default: + throw new RuntimeException("Unrecognized mutation type " + mutationType); + } + case 2: + assert "Scan".equals(method.getName()); + scanTracker.updateRpc(stats); + return; + case 3: + assert "BulkLoadHFile".equals(method.getName()); + // use generic implementation + break; + case 4: + assert "ExecService".equals(method.getName()); + // use generic implementation + break; + case 5: + assert "ExecRegionServerService".equals(method.getName()); + // use generic implementation + break; + case 6: + assert "Multi".equals(method.getName()); + multiTracker.updateRpc(stats); + return; + default: + throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); + } + } + // Fallback to dynamic registry lookup for DDL methods. + updateRpcGeneric(method, stats); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 5c9f565..7f11038 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -134,6 +135,7 @@ public class RpcClient { protected final SocketFactory socketFactory; // how to create sockets protected String clusterId; protected final SocketAddress localAddr; + protected final MetricsConnection metrics; private final boolean fallbackAllowed; private UserProvider userProvider; @@ -277,15 +279,16 @@ public class RpcClient { Message responseDefaultType; IOException error; // exception, null if value boolean done; // true when call is done - long startTime; final MethodDescriptor md; + final MetricsConnection.CallStats callStats; protected Call(final MethodDescriptor md, Message param, final CellScanner cells, - final Message responseDefaultType) { + final Message responseDefaultType, MetricsConnection.CallStats callStats) { this.param = param; this.md = md; this.cells = cells; - this.startTime = System.currentTimeMillis(); + this.callStats = callStats; + this.callStats.setStartTime(System.currentTimeMillis()); this.responseDefaultType = responseDefaultType; synchronized (RpcClient.this) { this.id = counter++; @@ -329,7 +332,7 @@ public class RpcClient { } public long getStartTime() { - return this.startTime; + return this.callStats.getStartTime(); } } @@ -1049,7 +1052,8 @@ public class RpcClient { //noinspection SynchronizeOnNonFinalField RequestHeader header = builder.build(); synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC - IPCUtil.write(this.out, header, call.param, cellBlock); + call.callStats.setRequestSizeBytes( + IPCUtil.write(this.out, header, call.param, cellBlock)); } if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header)); @@ -1102,7 +1106,12 @@ public class RpcClient { if (isFatalConnectionException(exceptionResponse)) { markClosed(re); } else { - if (call != null) call.setException(re); + if (call != null) { + call.setException(re); + call.callStats.setResponseSizeBytes(totalSize); + call.callStats.setCallTimeMs( + System.currentTimeMillis() - call.callStats.getStartTime()); + } } } else { Message value = null; @@ -1121,7 +1130,12 @@ public class RpcClient { } // it's possible that this call may have been cleaned up due to a RPC // timeout, so check if it still exists before setting the value. - if (call != null) call.setResponse(value, cellBlockScanner); + if (call != null) { + call.setResponse(value, cellBlockScanner); + call.callStats.setResponseSizeBytes(totalSize); + call.callStats.setCallTimeMs( + System.currentTimeMillis() - call.callStats.getStartTime()); + } } if (call != null) calls.remove(id); } catch (IOException e) { @@ -1243,13 +1257,15 @@ public class RpcClient { } /** - * Construct an IPC cluster client whose values are of the {@link Message} class. + * Used in test only. Construct an IPC cluster client whose values are of the + * {@link Message} class. * @param conf configuration * @param clusterId * @param factory socket factory */ + @VisibleForTesting RpcClient(Configuration conf, String clusterId, SocketFactory factory) { - this(conf, clusterId, factory, null); + this(conf, clusterId, factory, null, null); } /** @@ -1258,8 +1274,10 @@ public class RpcClient { * @param clusterId * @param factory socket factory * @param localAddr client socket bind address + * @param metrics the connection metrics */ - RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) { + RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr, + MetricsConnection metrics) { this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, @@ -1279,6 +1297,8 @@ public class RpcClient { IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.localAddr = localAddr; this.userProvider = UserProvider.instantiate(conf); + this.metrics = metrics; + // login the server principal (if using secure Hadoop) if (LOG.isDebugEnabled()) { LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + @@ -1293,12 +1313,20 @@ public class RpcClient { } /** - * Construct an IPC client for the cluster clusterId with the default SocketFactory + * Helper method for tests only. Creates an {@code RpcClient} without metrics. + */ + @VisibleForTesting + public RpcClient(Configuration conf, String clusterId) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null); + } + + /** + * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory * @param conf configuration * @param clusterId */ - public RpcClient(Configuration conf, String clusterId) { - this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null); + public RpcClient(Configuration conf, String clusterId, MetricsConnection metrics) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, metrics); } /** @@ -1307,8 +1335,9 @@ public class RpcClient { * @param clusterId * @param localAddr client socket bind address. */ - public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { - this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr); + public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr, + MetricsConnection metrics) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics); } /** @@ -1411,10 +1440,12 @@ public class RpcClient { } } + @VisibleForTesting Pair call(MethodDescriptor md, Message param, CellScanner cells, Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout) throws InterruptedException, IOException { - return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS); + return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS, + MetricsConnection.newCallStats()); } /** Make a call, passing param, to the IPC server running at @@ -1437,9 +1468,9 @@ public class RpcClient { */ Pair call(MethodDescriptor md, Message param, CellScanner cells, Message returnType, User ticket, InetSocketAddress addr, - int rpcTimeout, int priority) + int rpcTimeout, int priority, MetricsConnection.CallStats callStats) throws InterruptedException, IOException { - Call call = new Call(md, param, cells, returnType); + Call call = new Call(md, param, cells, returnType, callStats); Connection connection = getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor); connection.writeRequest(call, priority); // send the parameter @@ -1646,10 +1677,6 @@ public class RpcClient { Message param, Message returnType, final User ticket, final InetSocketAddress isa, final int rpcTimeout) throws ServiceException { - long startTime = 0; - if (LOG.isTraceEnabled()) { - startTime = System.currentTimeMillis(); - } PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller; CellScanner cells = null; if (pcrc != null) { @@ -1659,8 +1686,10 @@ public class RpcClient { } Pair val = null; try { + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + cs.setStartTime(System.currentTimeMillis()); val = call(md, param, cells, returnType, ticket, isa, rpcTimeout, - pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS); + pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS, cs); if (pcrc != null) { // Shove the results into controller so can be carried across the proxy/pb service void. if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond()); @@ -1668,11 +1697,12 @@ public class RpcClient { throw new ServiceException("Client dropping data on the floor!"); } + cs.setCallTimeMs(System.currentTimeMillis() - cs.getStartTime()); + if (metrics != null) { + metrics.updateRpc(md, param, cs); + } if (LOG.isTraceEnabled()) { - long callTime = System.currentTimeMillis() - startTime; - if (LOG.isTraceEnabled()) { - LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms"); - } + LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); } return val.getFirst(); } catch (Throwable e) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java new file mode 100644 index 0000000..c056da3 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import java.io.IOException; + +@Category(SmallTests.class) +public class TestMetricsConnection { + + private static MetricsConnection METRICS; + + @BeforeClass + public static void beforeClass() { + HConnectionImplementation mocked = Mockito.mock(HConnectionImplementation.class); + Mockito.when(mocked.toString()).thenReturn("mocked-connection"); + METRICS = new MetricsConnection(Mockito.mock(HConnectionImplementation.class)); + } + + @AfterClass + public static void afterClass() { + METRICS.shutdown(); + } + + @Test + public void testStaticMetrics() throws IOException { + final byte[] foo = Bytes.toBytes("foo"); + final RegionSpecifier region = RegionSpecifier.newBuilder() + .setValue(ByteString.EMPTY) + .setType(RegionSpecifierType.REGION_NAME) + .build(); + final int loop = 5; + + for (int i = 0; i < loop; i++) { + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Get"), + GetRequest.getDefaultInstance(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Scan"), + ScanRequest.getDefaultInstance(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Multi"), + MultiRequest.getDefaultInstance(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Mutate"), + MutateRequest.newBuilder() + .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo))) + .setRegion(region) + .build(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Mutate"), + MutateRequest.newBuilder() + .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo))) + .setRegion(region) + .build(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Mutate"), + MutateRequest.newBuilder() + .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo))) + .setRegion(region) + .build(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Mutate"), + MutateRequest.newBuilder() + .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))) + .setRegion(region) + .build(), + MetricsConnection.newCallStats()); + } + for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] { + METRICS.getTracker, METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker, + METRICS.deleteTracker, METRICS.incrementTracker, METRICS.putTracker + }) { + Assert.assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.count()); + Assert.assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.count()); + Assert.assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.count()); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3240ea2..6d5a850 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -873,8 +873,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } // Setup RPC client for master communication + // TODO: no single connection managed anywhere, so no central metrics object to obtain. rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress( - this.isa.getAddress(), 0)); + this.isa.getAddress(), 0), null); this.pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index 9f98ba6..857ea55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; @@ -47,7 +48,7 @@ public class MetricsRegionServer { this.serverSource = serverSource; } - // for unit-test usage + @VisibleForTesting public MetricsRegionServerSource getMetricsSource() { return serverSource; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 998f7ad..2e1a9b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -162,4 +162,4 @@ public class TestClientTimeouts { return super.callBlockingMethod(md, controller, param, returnType); } } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 14898b0..7dcc37a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; @@ -342,7 +343,8 @@ public class TestIPC { } CellScanner cellScanner = CellUtil.createCellScanner(cells); Pair response = - client.call(md, builder.build(), cellScanner, param, user, address, 0); + client.call(md, builder.build(), cellScanner, param, user, address, 0, + HConstants.NORMAL_QOS, new MetricsConnection.CallStats()); /* int count = 0; while (p.getSecond().advance()) { -- 1.9.5 (Apple Git-50.3)