From 3714f2023f23f1619c0a193a4d849e94268aa99d Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 27 Jul 2018 18:58:50 +0800 Subject: [PATCH] HBASE-20965 Separate region server report requests to new handlers --- .../apache/hadoop/hbase/ipc/FifoRpcScheduler.java | 28 +++-- .../hadoop/hbase/ipc/MasterFifoRpcScheduler.java | 119 +++++++++++++++++++++ .../hadoop/hbase/master/MasterRpcServices.java | 9 ++ .../MasterFifoRpcSchedulerFactory.java | 47 ++++++++ .../hadoop/hbase/regionserver/RSRpcServices.java | 13 ++- 5 files changed, 200 insertions(+), 16 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index bd8bdce..0fabde5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -39,10 +40,10 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; @InterfaceAudience.Private public class FifoRpcScheduler extends RpcScheduler { private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class); - private final int handlerCount; - private final int maxQueueLength; - private final AtomicInteger queueSize = new AtomicInteger(0); - private ThreadPoolExecutor executor; + protected int handlerCount; + protected final int maxQueueLength; + protected final AtomicInteger queueSize = new AtomicInteger(0); + protected ThreadPoolExecutor executor; public FifoRpcScheduler(Configuration conf, int handlerCount) { this.handlerCount = handlerCount; @@ -74,7 +75,7 @@ public class FifoRpcScheduler extends RpcScheduler { this.executor.shutdown(); } - private static class FifoCallRunner implements Runnable { + protected static class FifoCallRunner implements Runnable { private final CallRunner callRunner; FifoCallRunner(CallRunner cr) { @@ -199,16 +200,23 @@ public class FifoRpcScheduler extends RpcScheduler { callQueueInfo.setCallMethodCount(queueName, methodCount); callQueueInfo.setCallMethodSize(queueName, methodSize); + getCallQueueMethodInfo(executor.getQueue(), methodCount, methodSize); - for (Runnable r:executor.getQueue()) { + return callQueueInfo; + } + + protected void getCallQueueMethodInfo(BlockingQueue queue, + HashMap methodCount, HashMap methodSize) { + for (Runnable r : queue) { FifoCallRunner mcr = (FifoCallRunner) r; RpcCall rpcCall = mcr.getCallRunner().getRpcCall(); String method; - - if (null==rpcCall.getMethod() || - StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { + if (null == rpcCall.getMethod() + || StringUtil.isNullOrEmpty(rpcCall.getMethod().getName())) { method = "Unknown"; + } else { + method = rpcCall.getMethod().getName(); } long size = rpcCall.getSize(); @@ -216,7 +224,5 @@ public class FifoRpcScheduler extends RpcScheduler { methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L)); methodSize.put(method, size + methodSize.getOrDefault(method, 0L)); } - - return callQueueInfo; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java new file mode 100644 index 0000000..f7e875e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.HashMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Special rpc scheduler only used for master. + */ +@InterfaceAudience.Private +public class MasterFifoRpcScheduler extends FifoRpcScheduler { + + private ThreadPoolExecutor rsReportExecutor; + + public MasterFifoRpcScheduler(Configuration conf, int handlerCount) { + super(conf, handlerCount); + this.handlerCount = Math.max(1, this.handlerCount / 2); + } + + @Override + public void start() { + this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, + new ArrayBlockingQueue(maxQueueLength), + new DaemonThreadFactory("MasterFifoRpcScheduler.call.handler"), + new ThreadPoolExecutor.CallerRunsPolicy()); + this.rsReportExecutor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, + new ArrayBlockingQueue(maxQueueLength), + new DaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + + @Override + public void stop() { + this.executor.shutdown(); + this.rsReportExecutor.shutdown(); + } + + @Override + public boolean dispatch(final CallRunner task) { + // Executors provide no offer, so make our own. + int queued = queueSize.getAndIncrement(); + if (maxQueueLength > 0 && queued >= maxQueueLength) { + queueSize.decrementAndGet(); + return false; + } + + RpcCall call = task.getRpcCall(); + if (rsReportExecutor != null && call.getHeader() != null + && call.getHeader().getMethodName().equals("RegionServerReport")) { + rsReportExecutor.execute(new FifoCallRunner(task) { + @Override + public void run() { + task.setStatus(RpcServer.getStatus()); + task.run(); + queueSize.decrementAndGet(); + } + }); + } else { + executor.execute(new FifoCallRunner(task) { + @Override + public void run() { + task.setStatus(RpcServer.getStatus()); + task.run(); + queueSize.decrementAndGet(); + } + }); + } + return true; + } + + @Override + public int getGeneralQueueLength() { + return executor.getQueue().size() + rsReportExecutor.getQueue().size(); + } + + @Override + public int getActiveRpcHandlerCount() { + return executor.getActiveCount() + rsReportExecutor.getActiveCount(); + } + + @Override + public CallQueueInfo getCallQueueInfo() { + String queueName = "Master Fifo Queue"; + + HashMap methodCount = new HashMap<>(); + HashMap methodSize = new HashMap<>(); + + CallQueueInfo callQueueInfo = new CallQueueInfo(); + callQueueInfo.setCallMethodCount(queueName, methodCount); + callQueueInfo.setCallMethodSize(queueName, methodSize); + + getCallQueueMethodInfo(executor.getQueue(), methodCount, methodSize); + getCallQueueMethodInfo(rsReportExecutor.getQueue(), methodCount, methodSize); + + return callQueueInfo; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index febd035..e655a5b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -80,6 +81,7 @@ import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.quotas.QuotaObserverChore; import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; +import org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -336,6 +338,13 @@ public class MasterRpcServices extends RSRpcServices } @Override + public RpcSchedulerFactory getRpcSchedulerFactory() throws NoSuchMethodException, + InvocationTargetException, InstantiationException, IllegalAccessException { + Class cls = MasterFifoRpcSchedulerFactory.class; + return cls.asSubclass(RpcSchedulerFactory.class).getDeclaredConstructor().newInstance(); + } + + @Override protected RpcServerInterface createRpcServer(Server server, Configuration conf, RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java new file mode 100644 index 0000000..1be4a4a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.MasterFifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Factory to use when you want to use the {@link MasterFifoRpcSchedulerFactory} + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MasterFifoRpcSchedulerFactory extends FifoRpcSchedulerFactory { + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { + int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + return new MasterFifoRpcScheduler(conf, handlerCount); + } + + @Deprecated + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority) { + return create(conf, priority, null); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e292ce1..a2f01a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1203,11 +1203,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); RpcSchedulerFactory rpcSchedulerFactory; try { - Class cls = rs.conf.getClass( - REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - SimpleRpcSchedulerFactory.class); - rpcSchedulerFactory = cls.asSubclass(RpcSchedulerFactory.class) - .getDeclaredConstructor().newInstance(); + rpcSchedulerFactory = getRpcSchedulerFactory(); } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { throw new IllegalArgumentException(e); @@ -1283,6 +1279,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + public RpcSchedulerFactory getRpcSchedulerFactory() throws NoSuchMethodException, + InvocationTargetException, InstantiationException, IllegalAccessException { + Class cls = this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + SimpleRpcSchedulerFactory.class); + return cls.asSubclass(RpcSchedulerFactory.class).getDeclaredConstructor().newInstance(); + } + @Override public void onConfigurationChange(Configuration newConf) { if (rpcServer instanceof ConfigurationObserver) { -- 2.7.4