From 757151a3ef38b361e6628522ec9668f933ce7441 Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 3 Aug 2018 11:55:29 +0800 Subject: [PATCH] HBASE-20965 Separate region server report requests to new handlers --- .../apache/hadoop/hbase/ipc/FifoRpcScheduler.java | 36 +++-- .../hadoop/hbase/ipc/MasterFifoRpcScheduler.java | 102 +++++++++++++ .../hadoop/hbase/master/MasterRpcServices.java | 11 ++ .../MasterFifoRpcSchedulerFactory.java | 41 +++++ .../hadoop/hbase/regionserver/RSRpcServices.java | 20 ++- .../hbase/ipc/TestMasterFifoRpcScheduler.java | 166 +++++++++++++++++++++ 6 files changed, 362 insertions(+), 14 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index bd8bdce..5e96cdf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -39,10 +40,10 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; @InterfaceAudience.Private public class FifoRpcScheduler extends RpcScheduler { private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class); - private final int handlerCount; - private final int maxQueueLength; - private final AtomicInteger queueSize = new AtomicInteger(0); - private ThreadPoolExecutor executor; + protected final int handlerCount; + protected final int maxQueueLength; + protected final AtomicInteger queueSize = new AtomicInteger(0); + protected ThreadPoolExecutor executor; public FifoRpcScheduler(Configuration conf, int handlerCount) { this.handlerCount = handlerCount; @@ -94,6 +95,11 @@ public class FifoRpcScheduler extends RpcScheduler { @Override public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { + return executeRpcCall(executor, queueSize, task); + } + + protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize, + final CallRunner task) { // Executors provide no offer, so make our own. int queued = queueSize.getAndIncrement(); if (maxQueueLength > 0 && queued >= maxQueueLength) { @@ -199,15 +205,19 @@ public class FifoRpcScheduler extends RpcScheduler { callQueueInfo.setCallMethodCount(queueName, methodCount); callQueueInfo.setCallMethodSize(queueName, methodSize); + getCallQueueMethodInfo(executor.getQueue(), methodCount, methodSize); + + return callQueueInfo; + } - for (Runnable r:executor.getQueue()) { + protected void getCallQueueMethodInfo(BlockingQueue queue, + HashMap methodCount, HashMap methodSize) { + for (Runnable r : queue) { FifoCallRunner mcr = (FifoCallRunner) r; RpcCall rpcCall = mcr.getCallRunner().getRpcCall(); - String method; - - if (null==rpcCall.getMethod() || - StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { + String method = getCallMethod(mcr.getCallRunner()); + if (StringUtil.isNullOrEmpty(method)) { method = "Unknown"; } @@ -216,7 +226,13 @@ public class FifoRpcScheduler extends RpcScheduler { methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L)); methodSize.put(method, size + methodSize.getOrDefault(method, 0L)); } + } - return callQueueInfo; + protected String getCallMethod(final CallRunner task) { + RpcCall call = task.getRpcCall(); + if (call != null && call.getMethod() != null) { + return call.getMethod().getName(); + } + return null; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java new file mode 100644 index 0000000..79196fa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Special rpc scheduler only used for master. + */ +@InterfaceAudience.Private +public class MasterFifoRpcScheduler extends FifoRpcScheduler { + private static final Logger LOG = LoggerFactory.getLogger(MasterFifoRpcScheduler.class); + + private static final String REGION_SERVER_REPORT = "RegionServerReport"; + private ThreadPoolExecutor rsReportExecutor; + private final AtomicInteger rsReportQueueSize = new AtomicInteger(0); + + public MasterFifoRpcScheduler(Configuration conf, int handlerCount) { + super(conf, Math.max(1, handlerCount / 2)); + } + + @Override + public void start() { + this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, + new ArrayBlockingQueue(maxQueueLength), + new DaemonThreadFactory("MasterFifoRpcScheduler.call.handler"), + new ThreadPoolExecutor.CallerRunsPolicy()); + this.rsReportExecutor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, + new ArrayBlockingQueue(maxQueueLength), + new DaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + + @Override + public void stop() { + this.executor.shutdown(); + this.rsReportExecutor.shutdown(); + } + + @Override + public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { + String method = getCallMethod(task); + if (rsReportExecutor != null && method != null && method.equals(REGION_SERVER_REPORT)) { + return executeRpcCall(rsReportExecutor, rsReportQueueSize, task); + } else { + return executeRpcCall(executor, queueSize, task); + } + } + + @Override + public int getGeneralQueueLength() { + return executor.getQueue().size() + rsReportExecutor.getQueue().size(); + } + + @Override + public int getActiveRpcHandlerCount() { + return executor.getActiveCount() + rsReportExecutor.getActiveCount(); + } + + @Override + public CallQueueInfo getCallQueueInfo() { + String queueName = "Master Fifo Queue"; + + HashMap methodCount = new HashMap<>(); + HashMap methodSize = new HashMap<>(); + + CallQueueInfo callQueueInfo = new CallQueueInfo(); + callQueueInfo.setCallMethodCount(queueName, methodCount); + callQueueInfo.setCallMethodSize(queueName, methodSize); + + getCallQueueMethodInfo(executor.getQueue(), methodCount, methodSize); + getCallQueueMethodInfo(rsReportExecutor.getQueue(), methodCount, methodSize); + + return callQueueInfo; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index a4d9ff8..474d42c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; @@ -336,6 +337,16 @@ public class MasterRpcServices extends RSRpcServices } @Override + protected Class getRpcSchedulerFactoryClass() { + if (getConfiguration() != null) { + return getConfiguration().getClass(MASTER_RPC_SCHEDULER_FACTORY_CLASS, + super.getRpcSchedulerFactoryClass()); + } else { + return super.getRpcSchedulerFactoryClass(); + } + } + + @Override protected RpcServerInterface createRpcServer(Server server, Configuration conf, RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java new file mode 100644 index 0000000..ec0a4fd --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.MasterFifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Factory to use when you want to use the {@link MasterFifoRpcSchedulerFactory} + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MasterFifoRpcSchedulerFactory extends FifoRpcSchedulerFactory { + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { + int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + return new MasterFifoRpcScheduler(conf, handlerCount); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e292ce1..cb97d35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcCallback; +import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerFactory; @@ -257,6 +258,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = "hbase.region.server.rpc.scheduler.factory.class"; + /** RPC scheduler to use for the master. */ + public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS = + "hbase.master.rpc.scheduler.factory.class"; + /** * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This * configuration exists to prevent the scenario where a time limit is specified to be so @@ -1203,10 +1208,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); RpcSchedulerFactory rpcSchedulerFactory; try { - Class cls = rs.conf.getClass( - REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - SimpleRpcSchedulerFactory.class); - rpcSchedulerFactory = cls.asSubclass(RpcSchedulerFactory.class) + rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) .getDeclaredConstructor().newInstance(); } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { @@ -1283,6 +1285,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + protected Class getRpcSchedulerFactoryClass() { + return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + SimpleRpcSchedulerFactory.class); + } + @Override public void onConfigurationChange(Configuration newConf) { if (rpcServer instanceof ConfigurationObserver) { @@ -3700,4 +3707,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throw new ServiceException(e); } } + + @VisibleForTesting + public RpcScheduler getRpcScheduler() { + return rpcServer.getScheduler(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java new file mode 100644 index 0000000..88eed69 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterRpcServices; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ RPCTests.class, LargeTests.class }) +public class TestMasterFifoRpcScheduler { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMasterFifoRpcScheduler.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestMasterFifoRpcScheduler.class); + + private static final String REGION_SERVER_REPORT = "RegionServerReport"; + private static final String OTHER = "Other"; + private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(RSRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS, + "org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory"); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testMasterRpcScheduler() { + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + MasterRpcServices masterRpcServices = master.getMasterRpcServices(); + RpcScheduler masterRpcScheduler = masterRpcServices.getRpcScheduler(); + Assert.assertTrue(masterRpcScheduler instanceof MasterFifoRpcScheduler); + } + + @Test + public void testCallQueueInfo() throws Exception { + Configuration conf = HBaseConfiguration.create(); + AtomicInteger callExecutionCount = new AtomicInteger(0); + + RpcScheduler scheduler = new MockMasterFifoRpcScheduler(conf, 2); + scheduler.start(); + + int totalCallMethods = 30; + int unableToDispatch = 0; + + for (int i = totalCallMethods; i > 0; i--) { + CallRunner task = createMockTask(callExecutionCount, i < 20); + if (!scheduler.dispatch(task)) { + unableToDispatch++; + } + Thread.sleep(10); + } + + CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); + int executionCount = callExecutionCount.get(); + + String expectedQueueName = "Master Fifo Queue"; + assertEquals(1, callQueueInfo.getCallQueueNames().size()); + + int callQueueSize = 0; + for (String queueName : callQueueInfo.getCallQueueNames()) { + assertEquals(expectedQueueName, queueName); + Set methodNames = callQueueInfo.getCalledMethodNames(queueName); + if (methodNames.size() == 2) { + assertTrue(methodNames.contains(REGION_SERVER_REPORT)); + assertTrue(methodNames.contains(OTHER)); + } + for (String methodName : callQueueInfo.getCalledMethodNames(queueName)) { + callQueueSize += callQueueInfo.getCallMethodCount(queueName, methodName); + } + } + + assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount); + scheduler.stop(); + } + + private CallRunner createMockTask(AtomicInteger callExecutionCount, + boolean isRegionServerReportTask) { + CallRunner task = mock(CallRunner.class); + ServerCall call = mock(ServerCall.class); + when(task.getRpcCall()).thenReturn(call); + when(call.getHeader()).thenReturn(RPCProtos.RequestHeader.newBuilder() + .setMethodName(isRegionServerReportTask ? REGION_SERVER_REPORT : OTHER).build()); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + callExecutionCount.incrementAndGet(); + Thread.sleep(1000); + return null; + } + }).when(task).run(); + + return task; + } + + private class MockMasterFifoRpcScheduler extends MasterFifoRpcScheduler { + + public MockMasterFifoRpcScheduler(Configuration conf, int handlerCount) { + super(conf, handlerCount); + } + + /** + * Override this method because we can't mock a Descriptors.MethodDescriptor + * @param task + * @return + */ + @Override + protected String getCallMethod(final CallRunner task) { + RpcCall call = task.getRpcCall(); + if (call.getHeader() != null) { + return call.getHeader().getMethodName(); + } + return null; + } + } +} -- 2.7.4