From d28b036ba8a2f202a7dbea622af0a41ac3a75132 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Fri, 14 Dec 2018 11:34:01 +0800 Subject: [PATCH] HBASE-21019 Use StealJobQueue to better utilize handlers in MasterFifoRpcScheduler --- .../apache/hadoop/hbase/ipc/FifoRpcScheduler.java | 36 ++++++++------------ .../hadoop/hbase/ipc/MasterFifoRpcScheduler.java | 38 ++++++++++++---------- 2 files changed, 35 insertions(+), 39 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index 833b02d..c8258d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -17,13 +17,12 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.IOException; import java.util.HashMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.yetus.audience.InterfaceAudience; @@ -42,7 +41,6 @@ public class FifoRpcScheduler extends RpcScheduler { private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class); protected final int handlerCount; protected final int maxQueueLength; - protected final AtomicInteger queueSize = new AtomicInteger(0); protected ThreadPoolExecutor executor; public FifoRpcScheduler(Configuration conf, int handlerCount) { @@ -94,29 +92,23 @@ public class FifoRpcScheduler extends RpcScheduler { } @Override - public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { - return executeRpcCall(executor, queueSize, task); + public boolean dispatch(final CallRunner task) { + return executeRpcCall(executor, task); } - protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize, - final CallRunner task) { - // Executors provide no offer, so make our own. - int queued = queueSize.getAndIncrement(); - if (maxQueueLength > 0 && queued >= maxQueueLength) { - queueSize.decrementAndGet(); + protected boolean executeRpcCall(final ThreadPoolExecutor executor, final CallRunner task) { + try { + executor.execute(new FifoCallRunner(task) { + @Override + public void run() { + task.setStatus(RpcServer.getStatus()); + task.run(); + } + }); + return true; + } catch (RejectedExecutionException exception) { return false; } - - executor.execute(new FifoCallRunner(task){ - @Override - public void run() { - task.setStatus(RpcServer.getStatus()); - task.run(); - queueSize.decrementAndGet(); - } - }); - - return true; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java index b596c40..a08043a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java @@ -17,15 +17,15 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.IOException; import java.util.HashMap; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.util.StealJobQueue; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -55,6 +55,9 @@ public class MasterFifoRpcScheduler extends FifoRpcScheduler { private final AtomicInteger rsReportQueueSize = new AtomicInteger(0); private ThreadPoolExecutor rsReportExecutor; + private StealJobQueue rpcQueue; + private BlockingQueue rsReportQueue; + public MasterFifoRpcScheduler(Configuration conf, int callHandlerCount, int rsReportHandlerCount) { super(conf, callHandlerCount); @@ -66,18 +69,19 @@ public class MasterFifoRpcScheduler extends FifoRpcScheduler { @Override public void start() { LOG.info( - "Using {} as call queue; handlerCount={}; maxQueueLength={}; rsReportHandlerCount={}; " - + "rsReportMaxQueueLength={}", - this.getClass().getSimpleName(), handlerCount, maxQueueLength, rsReportHandlerCount, - rsRsreportMaxQueueLength); - this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, - new ArrayBlockingQueue(maxQueueLength), - new DaemonThreadFactory("MasterFifoRpcScheduler.call.handler"), - new ThreadPoolExecutor.CallerRunsPolicy()); - this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60, - TimeUnit.SECONDS, new ArrayBlockingQueue(rsRsreportMaxQueueLength), - new DaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"), - new ThreadPoolExecutor.CallerRunsPolicy()); + "Using {} as call queue; handlerCount={}; maxQueueLength={}; rsReportHandlerCount={}; " + + "rsReportMaxQueueLength={}", this.getClass().getSimpleName(), handlerCount, + maxQueueLength, rsReportHandlerCount, rsRsreportMaxQueueLength); + rpcQueue = new StealJobQueue<>(maxQueueLength, rsRsreportMaxQueueLength, (r1, r2) -> 0); + rsReportQueue = rpcQueue.getStealFromQueue(); + this.executor = + new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, rpcQueue, + new DaemonThreadFactory("MasterFifoRpcScheduler.call.handler"), + new ThreadPoolExecutor.CallerRunsPolicy()); + this.rsReportExecutor = + new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60, TimeUnit.SECONDS, + rsReportQueue, new DaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"), + new ThreadPoolExecutor.CallerRunsPolicy()); } @Override @@ -87,12 +91,12 @@ public class MasterFifoRpcScheduler extends FifoRpcScheduler { } @Override - public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { + public boolean dispatch(final CallRunner task) { String method = getCallMethod(task); if (rsReportExecutor != null && method != null && method.equals(REGION_SERVER_REPORT)) { - return executeRpcCall(rsReportExecutor, rsReportQueueSize, task); + return executeRpcCall(rsReportExecutor, task); } else { - return executeRpcCall(executor, queueSize, task); + return executeRpcCall(executor, task); } } -- 2.7.4