From 6e27e2d8ea79fba8d53c15eb179edd0f5c815b2e Mon Sep 17 00:00:00 2001 From: chenheng Date: Mon, 26 Oct 2015 14:15:18 +0800 Subject: [PATCH] HBASE-14693 Add client-side metrics for received pushback signals --- .../apache/hadoop/hbase/client/AsyncProcess.java | 5 ++ .../hadoop/hbase/client/MetricsConnection.java | 60 ++++++++++++++++++++++ .../hadoop/hbase/client/TestClientPushback.java | 9 +++- 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 7c7fc3e..0b9d7bc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -1267,6 +1267,11 @@ class AsyncProcess { ++failed; } } else { + // we must do client metrics first, because after ResultStatsUtil.updateStats result will be changed + if (AsyncProcess.this.connection.getConnectionMetrics() != null) { + AsyncProcess.this.connection.getConnectionMetrics().updateServerStats(server, regionName, result); + } + // update the stats about the region, if its a user table. We don't want to slow down // updates to meta tables, especially from internal updates (master, etc). if (AsyncProcess.this.connection.getStatisticsTracker() != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index f34fb8a..d88779c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -26,11 +26,16 @@ import com.yammer.metrics.core.MetricsRegistry; import com.yammer.metrics.core.Timer; import com.yammer.metrics.reporting.JmxReporter; import com.yammer.metrics.util.RatioGauge; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.util.Bytes; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadPoolExecutor; @@ -130,6 +135,61 @@ public class MetricsConnection { } } + public static class RegionStats { + final String name; + final Histogram memstoreLoadHist; + final Histogram heapOccupancyHist; + + public RegionStats(MetricsRegistry registry, String name) { + this.name = name; + this.memstoreLoadHist = registry.newHistogram(MetricsConnection.class, this.name); + this.heapOccupancyHist = registry.newHistogram(MetricsConnection.class, this.name); + } + + public void update(ClientProtos.RegionLoadStats regionStatistics) { + this.memstoreLoadHist.update(regionStatistics.getMemstoreLoad()); + this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); + } + } + + @VisibleForTesting + protected ConcurrentHashMap> serverStats + = new ConcurrentHashMap>(); + + public void updateServerStats(ServerName serverName, byte[] regionName, + Object r) { + if (!(r instanceof Result)) { + return; + } + Result result = (Result) r; + ClientProtos.RegionLoadStats stats = result.getStats(); + if(stats == null){ + return; + } + String regionNameStr = Bytes.toStringBinary(regionName); + String name = serverName.getServerName() + "," + regionNameStr; + ConcurrentHashMap rsStats = null; + if (serverStats.containsKey(serverName)) { + rsStats = serverStats.get(serverName); + } else { + rsStats = serverStats.putIfAbsent(serverName, new ConcurrentHashMap()); + if (rsStats == null) { + rsStats = serverStats.get(serverName); + } + } + RegionStats regionStats = null; + if (rsStats.containsKey(regionName)) { + regionStats = rsStats.get(regionName); + } else { + regionStats = rsStats.putIfAbsent(regionNameStr, new RegionStats(this.registry, name)); + if (regionStats == null) { + regionStats = rsStats.get(regionNameStr); + } + } + regionStats.update(stats); + } + + /** A lambda for dispatching to the appropriate metric factory method */ private static interface NewMetric { T newMetric(Class clazz, String name, String scope); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index 995e3e5..5e5de07 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -42,6 +42,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; @@ -74,7 +75,7 @@ public class TestClientPushback { conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes); // ensure we block the flushes when we are double that flushsize conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); - + conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true); UTIL.startMiniCluster(1); UTIL.createTable(tableName, family); } @@ -87,6 +88,7 @@ public class TestClientPushback { @Test(timeout=60000) public void testClientTracksServerPushback() throws Exception{ Configuration conf = UTIL.getConfiguration(); + ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf); HTable table = (HTable) conn.getTable(tableName); @@ -145,6 +147,11 @@ public class TestClientPushback { // produces a backoffTime of 151 milliseconds. This is long enough so the // wait and related checks below are reasonable. Revisit if the backoff // time reported by above debug logging has significantly deviated. + String name = server.getServerName() + "," + Bytes.toStringBinary(regionName); + MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics().serverStats.get(server).get(Bytes.toStringBinary(regionName)); + assertTrue(name.equals(rsStats.name)); + assertTrue(rsStats.heapOccupancyHist.max() > 0); + assertTrue(rsStats.memstoreLoadHist.mean() > 0); latch.await(backoffTime * 2, TimeUnit.MILLISECONDS); assertNotEquals("AsyncProcess did not submit the work time", endTime.get(), 0); assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime); -- 1.9.3 (Apple Git-50)