From c63c3bf3f7b64f659985dffee3bad4184ea54428 Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Mon, 14 Jan 2019 15:01:26 -0800 Subject: [PATCH] add a client side numActionPerServer metric to measure how actions are distributed in a MultiAction request --- .../hadoop/hbase/client/MetricsConnection.java | 113 ++++++++++++--------- .../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 12 +++ .../client/TestMultiActionMetricsFromClient.java | 89 ++++++++++++++++ 3 files changed, 164 insertions(+), 50 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiActionMetricsFromClient.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index e61ba24..11cddda 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -74,6 +74,7 @@ public class MetricsConnection implements StatisticTrackable { private long startTime = 0; private long callTimeMs = 0; private int concurrentCallsPerServer = 0; + private int numActionsPerServer = 0; public long getRequestSizeBytes() { return requestSizeBytes; @@ -114,6 +115,14 @@ public class MetricsConnection implements StatisticTrackable { public void setConcurrentCallsPerServer(int callsPerServer) { this.concurrentCallsPerServer = callsPerServer; } + + public int getNumActionsPerServer() { + return numActionsPerServer; + } + + public void setNumActionsPerServer(int numActionsPerServer) { + this.numActionsPerServer = numActionsPerServer; + } } @VisibleForTesting @@ -281,6 +290,7 @@ public class MetricsConnection implements StatisticTrackable { @VisibleForTesting protected final Counter hedgedReadOps; @VisibleForTesting protected final Counter hedgedReadWin; @VisibleForTesting protected final Histogram concurrentCallsPerServerHist; + @VisibleForTesting protected final Histogram numActionsPerServerHist; // dynamic metrics @@ -337,8 +347,10 @@ public class MetricsConnection implements StatisticTrackable { this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); this.multiTracker = new CallTracker(this.registry, "Multi", scope); this.runnerStats = new RunnerStats(this.registry); - this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class, + this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class, "concurrentCallsPerServer", scope)); + this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class, + "numActionsPerServer", scope)); this.reporter = JmxReporter.forRegistry(this.registry).build(); this.reporter.start(); @@ -442,59 +454,60 @@ public class MetricsConnection implements StatisticTrackable { // if we could dispatch based on something static, ie, request Message type. if (method.getService() == ClientService.getDescriptor()) { switch(method.getIndex()) { - case 0: - assert "Get".equals(method.getName()); - getTracker.updateRpc(stats); - return; - case 1: - assert "Mutate".equals(method.getName()); - final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); - switch(mutationType) { - case APPEND: - appendTracker.updateRpc(stats); - return; - case DELETE: - deleteTracker.updateRpc(stats); + case 0: + assert "Get".equals(method.getName()); + getTracker.updateRpc(stats); return; - case INCREMENT: - incrementTracker.updateRpc(stats); + case 1: + assert "Mutate".equals(method.getName()); + final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); + switch(mutationType) { + case APPEND: + appendTracker.updateRpc(stats); + return; + case DELETE: + deleteTracker.updateRpc(stats); + return; + case INCREMENT: + incrementTracker.updateRpc(stats); + return; + case PUT: + putTracker.updateRpc(stats); + return; + default: + throw new RuntimeException("Unrecognized mutation type " + mutationType); + } + case 2: + assert "Scan".equals(method.getName()); + scanTracker.updateRpc(stats); return; - case PUT: - putTracker.updateRpc(stats); + case 3: + assert "BulkLoadHFile".equals(method.getName()); + // use generic implementation + break; + case 4: + assert "PrepareBulkLoad".equals(method.getName()); + // use generic implementation + break; + case 5: + assert "CleanupBulkLoad".equals(method.getName()); + // use generic implementation + break; + case 6: + assert "ExecService".equals(method.getName()); + // use generic implementation + break; + case 7: + assert "ExecRegionServerService".equals(method.getName()); + // use generic implementation + break; + case 8: + assert "Multi".equals(method.getName()); + numActionsPerServerHist.update(stats.getNumActionsPerServer()); + multiTracker.updateRpc(stats); return; default: - throw new RuntimeException("Unrecognized mutation type " + mutationType); - } - case 2: - assert "Scan".equals(method.getName()); - scanTracker.updateRpc(stats); - return; - case 3: - assert "BulkLoadHFile".equals(method.getName()); - // use generic implementation - break; - case 4: - assert "PrepareBulkLoad".equals(method.getName()); - // use generic implementation - break; - case 5: - assert "CleanupBulkLoad".equals(method.getName()); - // use generic implementation - break; - case 6: - assert "ExecService".equals(method.getName()); - // use generic implementation - break; - case 7: - assert "ExecRegionServerService".equals(method.getName()); - // use generic implementation - break; - case 8: - assert "Multi".equals(method.getName()); - multiTracker.updateRpc(stats); - return; - default: - throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); + throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); } } // Fallback to dynamic registry lookup for DDL methods. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index c8904f4..9747ff7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIden import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.Threads; @@ -401,6 +402,17 @@ public abstract class AbstractRpcClient implements RpcC final RpcCallback callback) { final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); cs.setStartTime(EnvironmentEdgeManager.currentTime()); + + if (param instanceof ClientProtos.MultiRequest) { + ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param; + int numActions = 0; + for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { + numActions += regionAction.getActionCount(); + } + + cs.setNumActionsPerServer(numActions); + } + final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiActionMetricsFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiActionMetricsFromClient.java new file mode 100644 index 0000000..e4fb0f8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiActionMetricsFromClient.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class, ClientTests.class }) +public class TestMultiActionMetricsFromClient { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMultiActionMetricsFromClient.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final TableName TABLE_NAME = TableName.valueOf("test_table"); + private static final byte[] FAMILY = Bytes.toBytes("fam1"); + private static final byte[] QUALIFIER = Bytes.toBytes("qual"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testMultiMetrics() throws Exception { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.set(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, "true"); + ConnectionImplementation conn = + (ConnectionImplementation) ConnectionFactory.createConnection(conf); + + try { + BufferedMutator mutator = conn.getBufferedMutator(TABLE_NAME); + byte[][] keys = {Bytes.toBytes("aaa"), Bytes.toBytes("mmm"), Bytes.toBytes("zzz")}; + for (byte[] key : keys) { + Put p = new Put(key); + p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); + mutator.mutate(p); + } + + mutator.flush(); + mutator.close(); + + MetricsConnection metrics = conn.getConnectionMetrics(); + assertEquals(1, metrics.multiTracker.reqHist.getCount()); + assertEquals(3, metrics.numActionsPerServerHist.getSnapshot().getMean(), 1e-15); + assertEquals(1, metrics.numActionsPerServerHist.getCount()); + } finally { + conn.close(); + } + } +} + -- 2.7.4