diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index 425bd05..401e28e 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -189,6 +189,10 @@
log4j
test
+
+ com.yammer.metrics
+ metrics-core
+
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index b3d99ae..99071fa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -297,4 +297,9 @@ public interface ClusterConnection extends HConnection {
*/
ClientBackoffPolicy getBackoffPolicy();
+ /**
+ * @return the MetricsConnection instance associated with this connection.
+ */
+ public MetricsConnection getConnectionMetrics();
+
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
index dab4905..a3f6fe6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
@@ -174,5 +174,4 @@ public interface Connection extends Abortable, Closeable {
* @return true if this connection is closed
*/
boolean isClosed();
-
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index ade32a8..92afbe5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -121,6 +121,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
private final AsyncProcess asyncProcess;
// single tracker per connection
private final ServerStatisticTracker stats;
+ private final MetricsConnection metrics;
private volatile boolean closed;
private volatile boolean aborted;
@@ -157,11 +158,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// Client rpc instance.
private RpcClient rpcClient;
- private MetaCache metaCache = new MetaCache();
+ private final MetaCache metaCache;
private int refCount;
- private User user;
+ protected User user;
private RpcRetryingCallerFactory rpcCallerFactory;
@@ -188,7 +189,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.registry = setupRegistry();
retrieveClusterId();
- this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
+ this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
// Do we publish the status?
@@ -239,11 +240,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} else {
nonceGenerator = new NoNonceGenerator();
}
- stats = ServerStatisticTracker.create(conf);
+ this.stats = ServerStatisticTracker.create(conf);
this.asyncProcess = createAsyncProcess(this.conf);
this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
+ this.metrics = new MetricsConnection(this);
+ this.metaCache = new MetaCache(this.metrics);
}
/**
@@ -365,6 +368,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return new HBaseAdmin(this);
}
+ @Override
+ public MetricsConnection getConnectionMetrics() {
+ return this.metrics;
+ }
+
private ExecutorService getBatchPool() {
if (batchPool == null) {
synchronized (this) {
@@ -1320,7 +1328,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// InetSocketAddress, and this way we will rightfully get a new stubKey.
// Also, include the hostname in the key so as to take care of those cases where the
// DNS name is different but IP address remains the same.
- InetAddress i = new InetSocketAddress(rsHostname, port).getAddress();
+ InetAddress i = new InetSocketAddress(rsHostname, port).getAddress();
String address = rsHostname;
if (i != null) {
address = i.getHostAddress() + "-" + rsHostname;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
index 8e1c93c..e763dd9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
@@ -59,6 +59,12 @@ public class MetaCache {
// The access to this attribute must be protected by a lock on cachedRegionLocations
private final Set cachedServers = new ConcurrentSkipListSet();
+ private final MetricsConnection metrics;
+
+ public MetaCache(MetricsConnection metrics) {
+ this.metrics = metrics;
+ }
+
/**
* Search the cache for a location that fits our table and row key.
* Return null if no suitable region is located.
@@ -74,6 +80,7 @@ public class MetaCache {
Entry e = tableLocations.floorEntry(row);
if (e == null) {
+ metrics.incrMetaCacheMiss();
return null;
}
RegionLocations possibleRegion = e.getValue();
@@ -94,10 +101,12 @@ public class MetaCache {
// HConstants.EMPTY_END_ROW) check itself will pass.
if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
Bytes.compareTo(endKey, 0, endKey.length, row, 0, row.length) > 0) {
+ metrics.incrMetaCacheHit();
return possibleRegion;
}
// Passed all the way through, so we got nothing - complete cache miss
+ metrics.incrMetaCacheMiss();
return null;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
new file mode 100644
index 0000000..62c118a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.Descriptors;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.reporting.JmxReporter;
+import com.yammer.metrics.util.PercentGauge;
+import com.yammer.metrics.util.RatioGauge;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is for maintaining the various connection statistics and publishing them through
+ * the metrics interfaces.
+ */
+@InterfaceAudience.Private
+public class MetricsConnection {
+
+ /** A container class for collecting details about the RPC call as it percolates. */
+ public static class CallStats {
+ private long requestSizeBytes = 0;
+ private long responseSizeBytes = 0;
+ private long startTime = 0;
+ private long callTimeMs = 0;
+
+ public long getRequestSizeBytes() {
+ return requestSizeBytes;
+ }
+
+ public void setRequestSizeBytes(long requestSizeBytes) {
+ this.requestSizeBytes = requestSizeBytes;
+ }
+
+ public long getResponseSizeBytes() {
+ return responseSizeBytes;
+ }
+
+ public void setResponseSizeBytes(long responseSizeBytes) {
+ this.responseSizeBytes = responseSizeBytes;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getCallTimeMs() {
+ return callTimeMs;
+ }
+
+ public void setCallTimeMs(long callTimeMs) {
+ this.callTimeMs = callTimeMs;
+ }
+ }
+
+ private final MetricsRegistry registry = Metrics.defaultRegistry();
+ private final String scope;
+
+ private final Counter metaCacheHits;
+ private final Counter metaCacheMisses;
+
+ public MetricsConnection(final ConnectionImplementation conn) {
+ this.scope = conn.toString();
+ final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
+ final ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
+
+ this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope,
+ new RatioGauge() {
+ @Override protected double getNumerator() {
+ return batchPool.getActiveCount();
+ }
+ @Override protected double getDenominator() {
+ return batchPool.getMaximumPoolSize();
+ }
+ });
+ this.registry.newGauge(this.getClass(), "metaPoolActiveThreads", scope,
+ new RatioGauge() {
+ @Override protected double getNumerator() {
+ return metaPool.getActiveCount();
+ }
+ @Override protected double getDenominator() {
+ return metaPool.getMaximumPoolSize();
+ }
+ });
+ this.metaCacheHits = registry.newCounter(this.getClass(), "metaCacheHits", scope);
+ this.metaCacheMisses = registry.newCounter(this.getClass(), "metaCacheMisses", scope);
+ JmxReporter.startDefault(this.registry);
+ }
+
+ /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
+ public static CallStats newCallStats() {
+ // TODO: instance pool to reduce GC?
+ return new CallStats();
+ }
+
+ /** Increment the number of meta cache hits. */
+ public void incrMetaCacheHit() {
+ metaCacheHits.inc();
+ }
+
+ /** Increment the number of meta cache misses. */
+ public void incrMetaCacheMiss() {
+ metaCacheMisses.inc();
+ }
+
+ /** Report RPC context to metrics system. */
+ public void updateRpc(Descriptors.MethodDescriptor method, ServerName serverName,
+ CallStats stats) {
+ final String methodName = method.getService().getName() + "_" + method.getName();
+ final String sn = serverName.toShortString().replace(':', '-');
+
+ // TODO: manage our own cache of references to these Histograms to avoid excess allocations
+ registry.newTimer(this.getClass(), "rpcCallDurationMs_" + methodName, scope)
+ .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
+ registry.newHistogram(this.getClass(), "rpcCallRequestSizeBytes_" + methodName, scope)
+ .update(stats.getRequestSizeBytes());
+ registry.newHistogram(this.getClass(), "rpcCallResponseSizeBytes_" + methodName, scope)
+ .update(stats.getResponseSizeBytes());
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 9be370d..95cfc84 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.security.User;
@@ -55,6 +56,7 @@ public abstract class AbstractRpcClient implements RpcClient {
protected final Configuration conf;
protected String clusterId;
protected final SocketAddress localAddr;
+ protected final MetricsConnection metrics;
protected UserProvider userProvider;
protected final IPCUtil ipcUtil;
@@ -79,8 +81,10 @@ public abstract class AbstractRpcClient implements RpcClient {
* @param conf configuration
* @param clusterId the cluster id
* @param localAddr client socket bind address.
+ * @param metrics the connection metrics
*/
- public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
+ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
+ MetricsConnection metrics) {
this.userProvider = UserProvider.instantiate(conf);
this.localAddr = localAddr;
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
@@ -100,6 +104,7 @@ public abstract class AbstractRpcClient implements RpcClient {
this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
+ this.metrics = metrics;
// login the server principal (if using secure Hadoop)
if (LOG.isDebugEnabled()) {
@@ -199,25 +204,27 @@ public abstract class AbstractRpcClient implements RpcClient {
* @return A pair with the Message response and the Cell data (if any).
*/
Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
- Message param, Message returnType, final User ticket, final InetSocketAddress isa)
+ Message param, Message returnType, final User ticket, final InetSocketAddress isa,
+ final ServerName serverName)
throws ServiceException {
if (pcrc == null) {
pcrc = new PayloadCarryingRpcController();
}
- long startTime = 0;
- if (LOG.isTraceEnabled()) {
- startTime = EnvironmentEdgeManager.currentTime();
- }
Pair val;
try {
- val = call(pcrc, md, param, returnType, ticket, isa);
+ final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
+ cs.setStartTime(EnvironmentEdgeManager.currentTime());
+ val = call(pcrc, md, param, returnType, ticket, isa, cs);
// Shove the results into controller so can be carried across the proxy/pb service void.
pcrc.setCellScanner(val.getSecond());
+ cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
+ if (metrics != null) {
+ metrics.updateRpc(md, serverName, cs);
+ }
if (LOG.isTraceEnabled()) {
- long callTime = EnvironmentEdgeManager.currentTime() - startTime;
- LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
+ LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
}
return val.getFirst();
} catch (Throwable e) {
@@ -242,7 +249,8 @@ public abstract class AbstractRpcClient implements RpcClient {
*/
protected abstract Pair call(PayloadCarryingRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
- InetSocketAddress isa) throws IOException, InterruptedException;
+ InetSocketAddress isa, MetricsConnection.CallStats callStats)
+ throws IOException, InterruptedException;
@Override
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
@@ -255,6 +263,7 @@ public abstract class AbstractRpcClient implements RpcClient {
*/
@VisibleForTesting
public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
+ private final ServerName sn;
private final InetSocketAddress isa;
private final AbstractRpcClient rpcClient;
private final User ticket;
@@ -265,6 +274,7 @@ public abstract class AbstractRpcClient implements RpcClient {
*/
protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
final ServerName sn, final User ticket, int channelOperationTimeout) {
+ this.sn = sn;
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
this.rpcClient = rpcClient;
this.ticket = ticket;
@@ -285,7 +295,8 @@ public abstract class AbstractRpcClient implements RpcClient {
pcrc.setCallTimeout(channelOperationTimeout);
}
- return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
+ return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa,
+ this.sn);
}
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
index 431c669..a5da0dc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
@@ -49,6 +50,7 @@ public class AsyncCall extends DefaultPromise {
final Message responseDefaultType;
final long startTime;
final long rpcTimeout;
+ final MetricsConnection.CallStats callStats;
/**
* Constructor
@@ -61,7 +63,8 @@ public class AsyncCall extends DefaultPromise {
* @param responseDefaultType the default response type
*/
public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message
- param, PayloadCarryingRpcController controller, Message responseDefaultType) {
+ param, PayloadCarryingRpcController controller, Message responseDefaultType,
+ MetricsConnection.CallStats callStats) {
super(eventLoop);
this.id = connectId;
@@ -73,6 +76,7 @@ public class AsyncCall extends DefaultPromise {
this.startTime = EnvironmentEdgeManager.currentTime();
this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
+ this.callStats = callStats;
}
/**
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 43d75f9..44e8322 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -49,6 +49,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
@@ -310,10 +311,10 @@ public class AsyncRpcChannel {
*/
public Promise callMethod(final Descriptors.MethodDescriptor method,
final PayloadCarryingRpcController controller, final Message request,
- final Message responsePrototype) {
+ final Message responsePrototype, MetricsConnection.CallStats callStats) {
final AsyncCall call =
new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
- controller, responsePrototype);
+ controller, responsePrototype, callStats);
controller.notifyOnCancel(new RpcCallback