From 077b31cc1e5d0fc3a804a7425ea46425ca57e607 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 28 Mar 2018 10:04:27 +0800 Subject: [PATCH] HBASE-20300 Minor refactor for RpcExecutor --- .../ipc/FastPathBalancedQueueRpcExecutor.java | 35 ++-- .../apache/hadoop/hbase/ipc/FifoRpcScheduler.java | 35 ++-- .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 223 ++++++++++++--------- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 22 +- .../hadoop/hbase/monitoring/TaskMonitor.java | 149 ++++++-------- .../hadoop/hbase/monitoring/TestMonitoredTask.java | 93 +++++++++ 7 files changed, 328 insertions(+), 231 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestMonitoredTask.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java index eaea34deef..a8390d4b15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java @@ -21,8 +21,6 @@ import java.util.Deque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.yetus.audience.InterfaceAudience; @@ -56,10 +54,9 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { } @Override - protected Handler getHandler(String name, double handlerFailureThreshhold, - BlockingQueue q, AtomicInteger activeHandlerCount) { - return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount, - fastPathHandlerStack); + Handler getHandler(final String name, final BlockingQueue q, + final HandlerObserver observer) { + return new FastPathHandler(observer, name, q, fastPathHandlerStack); } @Override @@ -75,19 +72,18 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { return this.fastPathHandlerStack.poll(); } - class FastPathHandler extends Handler { + private static class FastPathHandler extends Handler { // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque // if an empty queue of CallRunners so we are available for direct handoff when one comes in. - final Deque fastPathHandlerStack; + private final Deque fastPathHandlerStack; // Semaphore to coordinate loading of fastpathed loadedTask and our running it. - private Semaphore semaphore = new Semaphore(0); + private final Semaphore semaphore = new Semaphore(0); // The task we get when fast-pathing. private CallRunner loadedCallRunner; - FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue q, - final AtomicInteger activeHandlerCount, + FastPathHandler(HandlerObserver observer, String name, BlockingQueue q, final Deque fastPathHandlerStack) { - super(name, handlerFailureThreshhold, q, activeHandlerCount); + super(observer, name, q); this.fastPathHandlerStack = fastPathHandlerStack; } @@ -98,21 +94,16 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { if (cr == null) { // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for // the fastpath handoff done via fastPathHandlerStack. - if (this.fastPathHandlerStack != null) { - this.fastPathHandlerStack.push(this); - this.semaphore.acquire(); - cr = this.loadedCallRunner; - this.loadedCallRunner = null; - } else { - // No fastpath available. Block until a task comes available. - cr = super.getCallRunner(); - } + this.fastPathHandlerStack.push(this); + this.semaphore.acquire(); + cr = this.loadedCallRunner; + this.loadedCallRunner = null; } return cr; } /** - * @param task Task gotten via fastpath. + * @param cr Task gotten via fastpath. * @return True if we successfully loaded our task */ boolean loadCallRunner(final CallRunner cr) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index ce0f86d276..1d061c19d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,8 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; @InterfaceAudience.Private public class FifoRpcScheduler extends RpcScheduler { private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class); + /** Keeps MonitoredRPCHandler per handler thread. */ + private static final ThreadLocal MONITORED_RPC = new ThreadLocal<>(); private final int handlerCount; private final int maxQueueLength; private final AtomicInteger queueSize = new AtomicInteger(0); @@ -74,11 +77,17 @@ public class FifoRpcScheduler extends RpcScheduler { this.executor.shutdown(); } + /** + * It is used to carry the CallRunner since #getCallQueueInfo will cast the runnable to + * FifoCallRunner to get the running CallRunner. + * Hence, we can't use anonymous class instead. + */ private static class FifoCallRunner implements Runnable { private final CallRunner callRunner; - - FifoCallRunner(CallRunner cr) { + private final AtomicInteger queueSize; + FifoCallRunner(CallRunner cr, AtomicInteger queueSize) { this.callRunner = cr; + this.queueSize = queueSize; } CallRunner getCallRunner() { @@ -87,11 +96,22 @@ public class FifoRpcScheduler extends RpcScheduler { @Override public void run() { + callRunner.setStatus(getStatus()); callRunner.run(); + queueSize.decrementAndGet(); } } + private static MonitoredRPCHandler getStatus() { + MonitoredRPCHandler status = MONITORED_RPC.get(); + if (status == null) { + status = RpcServer.createMonitoredRPCHandler(); + MONITORED_RPC.set(status); + } + return status; + } + @Override public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { // Executors provide no offer, so make our own. @@ -100,16 +120,7 @@ public class FifoRpcScheduler extends RpcScheduler { queueSize.decrementAndGet(); return false; } - - executor.execute(new FifoCallRunner(task){ - @Override - public void run() { - task.setStatus(RpcServer.getStatus()); - task.run(); - queueSize.decrementAndGet(); - } - }); - + executor.execute(new FifoCallRunner(task, queueSize)); return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 28a8ecc523..70acea8df5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -111,7 +111,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { } @Override - protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) { + int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) { // at least 1 read queue and 1 write queue return Math.max(2, (int) Math.round(handlerCount * callQueuesHandlersFactor)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 7470758032..cb91e74890 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -20,40 +20,40 @@ package org.apache.hadoop.hbase.ipc; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; -import java.util.Map; -import java.util.HashMap; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; -import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Strings; +import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; /** * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular * scheduling behavior. */ @InterfaceAudience.Private -public abstract class RpcExecutor { +public abstract class RpcExecutor implements ConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class); - protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; + private static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor"; /** max delay in msec used to bound the deprioritized requests */ @@ -79,17 +79,17 @@ public abstract class RpcExecutor { public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100; public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8; - private LongAdder numGeneralCallsDropped = new LongAdder(); - private LongAdder numLifoModeSwitches = new LongAdder(); + private final LongAdder numGeneralCallsDropped = new LongAdder(); + private final LongAdder numLifoModeSwitches = new LongAdder(); - protected final int numCallQueues; - protected final List> queues; + final int numCallQueues; + final List> queues; private final Class queueClass; private final Object[] queueInitArgs; private final PriorityFunction priority; - protected volatile int currentQueueLimit; + volatile int currentQueueLimit; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); private final List handlers; @@ -97,18 +97,18 @@ public abstract class RpcExecutor { private final AtomicInteger failedHandlerCount = new AtomicInteger(0); private String name; - private boolean running; + private volatile boolean running; - private Configuration conf = null; - private Abortable abortable = null; + private final Configuration conf; + private final Abortable abortable; - public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, + RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT), maxQueueLength, priority, conf, abortable); } - public RpcExecutor(final String name, final int handlerCount, final String callQueueType, + RpcExecutor(final String name, final int handlerCount, final String callQueueType, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { this.name = Strings.nullToEmpty(name); @@ -150,11 +150,11 @@ public abstract class RpcExecutor { this.name, this.queueClass, this.numCallQueues, maxQueueLength, this.handlerCount); } - protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) { + int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) { return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor)); } - public Map getCallQueueCountsSummary() { + Map getCallQueueCountsSummary() { HashMap callQueueMethodTotalCount = new HashMap<>(); for(BlockingQueue queue: queues) { @@ -175,7 +175,7 @@ public abstract class RpcExecutor { return callQueueMethodTotalCount; } - public Map getCallQueueSizeSummary() { + Map getCallQueueSizeSummary() { HashMap callQueueMethodTotalSize = new HashMap<>(); for(BlockingQueue queue: queues) { @@ -198,7 +198,7 @@ public abstract class RpcExecutor { } - protected void initializeQueues(final int numQueues) { + void initializeQueues(final int numQueues) { if (queueInitArgs.length > 0) { currentQueueLimit = (int) queueInitArgs[0]; queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); @@ -208,12 +208,12 @@ public abstract class RpcExecutor { } } - public void start(final int port) { + void start(final int port) { running = true; startHandlers(port); } - public void stop() { + void stop() { running = false; for (Thread handler : handlers) { handler.interrupt(); @@ -221,42 +221,74 @@ public abstract class RpcExecutor { } /** Add the request to the executor queue */ - public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException; - - /** Returns the list of request queues */ - protected List> getQueues() { - return queues; - } + abstract boolean dispatch(final CallRunner callTask) throws InterruptedException; - protected void startHandlers(final int port) { - List> callQueues = getQueues(); - startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port, activeHandlerCount); + void startHandlers(final int port) { + startHandlers(null, handlerCount, queues, 0, queues.size(), port, activeHandlerCount); } /** * Override if providing alternate Handler implementation. */ - protected Handler getHandler(final String name, final double handlerFailureThreshhold, - final BlockingQueue q, final AtomicInteger activeHandlerCount) { - return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount); + Handler getHandler(final String name, final BlockingQueue q, + final HandlerObserver observer) { + return new Handler(observer, name, q); + } + + private HandlerObserver getHandleObserver(final double handlerFailureThreshold, + final AtomicInteger activeHandlerCount) { + return new HandlerObserver() { + @Override + public boolean isDone() { + return !running; + } + + @Override + public void preRun() { + activeHandlerCount.incrementAndGet(); + } + + @Override + public void postRun() { + activeHandlerCount.decrementAndGet(); + } + + @Override + public void handleException(Throwable e) { + int failedCount = failedHandlerCount.incrementAndGet(); + if (handlerFailureThreshold >= 0 && failedCount > handlerCount * handlerFailureThreshold) { + String message = + "Number of failed RpcServer handler runs exceeded threshhold " + handlerFailureThreshold + + "; reason: " + StringUtils.stringifyException(e); + if (abortable == null) { + LOG.error("Error but can't abort because abortable is null: " + StringUtils + .stringifyException(e)); + throw new RuntimeException(e); + } + abortable.abort(message, e); + } else { + LOG.warn("Handler errors " + StringUtils.stringifyException(e)); + } + } + }; } /** * Start up our handlers. */ - protected void startHandlers(final String nameSuffix, final int numHandlers, + void startHandlers(final String nameSuffix, final int numHandlers, final List> callQueues, final int qindex, final int qsize, final int port, final AtomicInteger activeHandlerCount) { final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); - double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble( + double handlerFailureThreshold = conf == null ? 1.0 : conf.getDouble( HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); for (int i = 0; i < numHandlers; i++) { final int index = qindex + (i % qsize); String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index + ",port=" + port; - Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index), - activeHandlerCount); + Handler handler = getHandler(name, callQueues.get(index), + getHandleObserver(handlerFailureThreshold, activeHandlerCount)); handler.start(); handlers.add(handler); } @@ -264,51 +296,66 @@ public abstract class RpcExecutor { handlers.size(), threadPrefix, qsize, port); } + interface HandlerObserver { + /** + * @return true if it is time to end the handler + */ + boolean isDone(); + + /** + * executed before executing the CallRunner. + */ + void preRun(); + + /** + * executed after executing the CallRunner. + */ + void postRun(); + + /** + * handle the exception thrown by CallRunner + */ + void handleException(Throwable e) ; + } + /** * Handler thread run the {@link CallRunner#run()} in. */ - protected class Handler extends Thread { + static class Handler extends Thread { /** * Q to find CallRunners to run in. */ final BlockingQueue q; - final double handlerFailureThreshhold; - - // metrics (shared with other handlers) - final AtomicInteger activeHandlerCount; - - Handler(final String name, final double handlerFailureThreshhold, - final BlockingQueue q, final AtomicInteger activeHandlerCount) { + private final HandlerObserver observer; + private final MonitoredRPCHandler status = RpcServer.createMonitoredRPCHandler(); + Handler(HandlerObserver observer, final String name, + final BlockingQueue q) { super(name); setDaemon(true); + this.observer = observer; this.q = q; - this.handlerFailureThreshhold = handlerFailureThreshhold; - this.activeHandlerCount = activeHandlerCount; } /** * @return A {@link CallRunner} * @throws InterruptedException */ - protected CallRunner getCallRunner() throws InterruptedException { + CallRunner getCallRunner() throws InterruptedException { return this.q.take(); } @Override - public void run() { + public final void run() { boolean interrupted = false; try { - while (running) { + while (!observer.isDone()) { try { run(getCallRunner()); } catch (InterruptedException e) { interrupted = true; } } - } catch (Exception e) { - LOG.warn(e.toString(), e); - throw e; } finally { if (interrupted) { Thread.currentThread().interrupt(); @@ -316,46 +363,27 @@ public abstract class RpcExecutor { } } - private void run(CallRunner cr) { - MonitoredRPCHandler status = RpcServer.getStatus(); + private final void run(CallRunner cr) { cr.setStatus(status); try { - this.activeHandlerCount.incrementAndGet(); + observer.preRun(); cr.run(); } catch (Throwable e) { - if (e instanceof Error) { - int failedCount = failedHandlerCount.incrementAndGet(); - if (this.handlerFailureThreshhold >= 0 - && failedCount > handlerCount * this.handlerFailureThreshhold) { - String message = "Number of failed RpcServer handler runs exceeded threshhold " - + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e); - if (abortable != null) { - abortable.abort(message, e); - } else { - LOG.error("Error but can't abort because abortable is null: " - + StringUtils.stringifyException(e)); - throw e; - } - } else { - LOG.warn("Handler errors " + StringUtils.stringifyException(e)); - } - } else { - LOG.warn("Handler exception " + StringUtils.stringifyException(e)); - } + observer.handleException(e); } finally { - this.activeHandlerCount.decrementAndGet(); + observer.postRun(); } } } - public static abstract class QueueBalancer { + static abstract class QueueBalancer { /** * @return the index of the next queue to which a request should be inserted */ public abstract int getNextQueue(); } - public static QueueBalancer getBalancer(int queueSize) { + static QueueBalancer getBalancer(int queueSize) { Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1"); if (queueSize == 1) { return ONE_QUEUE; @@ -419,44 +447,44 @@ public abstract class RpcExecutor { } } - public static boolean isDeadlineQueueType(final String callQueueType) { + static boolean isDeadlineQueueType(final String callQueueType) { return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); } - public static boolean isCodelQueueType(final String callQueueType) { + static boolean isCodelQueueType(final String callQueueType) { return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE); } - public static boolean isFifoQueueType(final String callQueueType) { + static boolean isFifoQueueType(final String callQueueType) { return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE); } - public long getNumGeneralCallsDropped() { + long getNumGeneralCallsDropped() { return numGeneralCallsDropped.longValue(); } - public long getNumLifoModeSwitches() { + long getNumLifoModeSwitches() { return numLifoModeSwitches.longValue(); } - public int getActiveHandlerCount() { + int getActiveHandlerCount() { return activeHandlerCount.get(); } - public int getActiveWriteHandlerCount() { + int getActiveWriteHandlerCount() { return 0; } - public int getActiveReadHandlerCount() { + int getActiveReadHandlerCount() { return 0; } - public int getActiveScanHandlerCount() { + int getActiveScanHandlerCount() { return 0; } /** Returns the length of the pending queue */ - public int getQueueLength() { + int getQueueLength() { int length = 0; for (final BlockingQueue queue: queues) { length += queue.size(); @@ -464,19 +492,19 @@ public abstract class RpcExecutor { return length; } - public int getReadQueueLength() { + int getReadQueueLength() { return 0; } - public int getScanQueueLength() { + int getScanQueueLength() { return 0; } - public int getWriteQueueLength() { + int getWriteQueueLength() { return 0; } - public String getName() { + String getName() { return this.name; } @@ -484,7 +512,7 @@ public abstract class RpcExecutor { * Update current soft limit for executor's call queues * @param conf updated configuration */ - public void resizeQueues(Configuration conf) { + void resizeQueues(Configuration conf) { String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) { configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; @@ -492,6 +520,7 @@ public abstract class RpcExecutor { currentQueueLimit = conf.getInt(configKey, currentQueueLimit); } + @Override public void onConfigurationChange(Configuration conf) { // update CoDel Scheduler tunables int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 686d5785cc..8a7125ef25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -124,9 +124,6 @@ public abstract class RpcServer implements RpcServerInterface, */ protected static final ThreadLocal CurCall = new ThreadLocal<>(); - /** Keeps MonitoredRPCHandler per handler thread. */ - protected static final ThreadLocal MONITORED_RPC = new ThreadLocal<>(); - protected final InetSocketAddress bindAddress; protected MetricsHBaseServer metrics; @@ -585,15 +582,13 @@ public abstract class RpcServer implements RpcServerInterface, } /** - * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer). + * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}. * Only one of readCh or writeCh should be non-null. - * * @param readCh read channel * @param writeCh write channel - * @param buf buffer to read or write into/out of + * @param buf to read or write into/out of * @return bytes written - * @throws java.io.IOException e - * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) + * @throws IOException e */ private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, @@ -758,15 +753,10 @@ public abstract class RpcServer implements RpcServerInterface, return bsasi == null? null: bsasi.getBlockingService(); } - protected static MonitoredRPCHandler getStatus() { - // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. - MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); - if (status != null) { - return status; - } - status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); + static MonitoredRPCHandler createMonitoredRPCHandler() { + MonitoredRPCHandler status = + TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); status.pause("Waiting for a call"); - RpcServer.MONITORED_RPC.set(status); return status; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java index 1bde915536..72e91db8a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java @@ -24,17 +24,19 @@ import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; - +import java.util.function.Predicate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Threads; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; @@ -61,7 +63,7 @@ public class TaskMonitor { private final int maxTasks; private final long rpcWarnTime; private final long expirationTime; - private final CircularFifoQueue tasks; + private final CircularFifoQueue tasks; private final List rpcTasks; private final long monitorInterval; private Thread monitorThread; @@ -70,7 +72,7 @@ public class TaskMonitor { maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS); expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME); rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME); - tasks = new CircularFifoQueue(maxTasks); + tasks = new CircularFifoQueue<>(maxTasks); rpcTasks = Lists.newArrayList(); monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL); monitorThread = new Thread(new MonitorRunnable()); @@ -87,7 +89,13 @@ public class TaskMonitor { } return instance; } - + + @VisibleForTesting + synchronized void clearTasks() { + rpcTasks.clear(); + tasks.clear(); + } + public synchronized MonitoredTask createStatus(String description) { MonitoredTask stat = new MonitoredTaskImpl(); stat.setDescription(description); @@ -136,7 +144,7 @@ public class TaskMonitor { it.hasNext();) { TaskAndWeakRefPair pair = it.next(); MonitoredTask stat = pair.get(); - + if (pair.isDead()) { // The class who constructed this leaked it. So we can // assume it's done. @@ -145,7 +153,7 @@ public class TaskMonitor { stat.cleanup(); } } - + if (canPurge(stat)) { it.remove(); } @@ -153,56 +161,37 @@ public class TaskMonitor { } /** - * Produces a list containing copies of the current state of all non-expired + * Produces a list containing copies of the current state of all non-expired * MonitoredTasks handled by this TaskMonitor. * @return A complete list of MonitoredTasks. */ - public List getTasks() { - return getTasks(null); + public synchronized List getTasks() { + return getTasks(TaskType.ALL); } /** - * Produces a list containing copies of the current state of all non-expired + * Produces a list containing copies of the current state of all non-expired * MonitoredTasks handled by this TaskMonitor. * @param filter type of wanted tasks * @return A filtered list of MonitoredTasks. */ public synchronized List getTasks(String filter) { + return getTasks(TaskType.get(filter)); + } + + public synchronized List getTasks(TaskType type) { purgeExpiredTasks(); - TaskFilter taskFilter = createTaskFilter(filter); ArrayList results = - Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); - processTasks(tasks, taskFilter, results); - processTasks(rpcTasks, taskFilter, results); + Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size()); + processTasks(tasks, type.predicate, results); + processTasks(rpcTasks, type.predicate, results); return results; } - /** - * Create a task filter according to a given filter type. - * @param filter type of monitored task - * @return a task filter - */ - private static TaskFilter createTaskFilter(String filter) { - switch (TaskFilter.TaskType.getTaskType(filter)) { - case GENERAL: return task -> task instanceof MonitoredRPCHandler; - case HANDLER: return task -> !(task instanceof MonitoredRPCHandler); - case RPC: return task -> !(task instanceof MonitoredRPCHandler) || - !((MonitoredRPCHandler) task).isRPCRunning(); - case OPERATION: return task -> !(task instanceof MonitoredRPCHandler) || - !((MonitoredRPCHandler) task).isOperationRunning(); - default: return task -> false; - } - } - - private static void processTasks(Iterable tasks, - TaskFilter filter, - List results) { - for (TaskAndWeakRefPair task : tasks) { - MonitoredTask t = task.get(); - if (!filter.filter(t)) { - results.add(t.clone()); - } - } + private static void processTasks(Collection tasks, + Predicate predicate, List results) { + tasks.stream().map(TaskAndWeakRefPair::get).filter(predicate) + .forEach(t -> results.add(t.clone())); } private boolean canPurge(MonitoredTask stat) { @@ -212,7 +201,7 @@ public class TaskMonitor { public void dumpAsText(PrintWriter out) { long now = EnvironmentEdgeManager.currentTime(); - + List tasks = getTasks(); for (MonitoredTask task : tasks) { out.println("Task: " + task.getDescription()); @@ -246,7 +235,7 @@ public class TaskMonitor { * v \ * PassthroughInvocationHandler | weak reference * | / - * MonitoredTaskImpl / + * MonitoredTaskImpl / * | / * StatAndWeakRefProxy ------/ * @@ -258,29 +247,29 @@ public class TaskMonitor { private static class TaskAndWeakRefPair { private MonitoredTask impl; private WeakReference weakProxy; - + public TaskAndWeakRefPair(MonitoredTask stat, MonitoredTask proxy) { this.impl = stat; this.weakProxy = new WeakReference<>(proxy); } - + public MonitoredTask get() { return impl; } - + public boolean isDead() { return weakProxy.get() == null; } } - + /** - * An InvocationHandler that simply passes through calls to the original + * An InvocationHandler that simply passes through calls to the original * object. */ private static class PassthroughInvocationHandler implements InvocationHandler { private T delegatee; - + public PassthroughInvocationHandler(T delegatee) { this.delegatee = delegatee; } @@ -289,7 +278,7 @@ public class TaskMonitor { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { return method.invoke(delegatee, args); - } + } } private class MonitorRunnable implements Runnable { @@ -311,44 +300,38 @@ public class TaskMonitor { } } - private interface TaskFilter { - enum TaskType { - GENERAL("general"), - HANDLER("handler"), - RPC("rpc"), - OPERATION("operation"), - ALL("all"); - - private final String type; + @VisibleForTesting + enum TaskType { + GENERAL("general", task -> !(task instanceof MonitoredRPCHandler)), + HANDLER("handler", task -> task instanceof MonitoredRPCHandler), + RPC("rpc", task -> task instanceof MonitoredRPCHandler && + ((MonitoredRPCHandler) task).isRPCRunning()), + OPERATION("operation", task -> (task instanceof MonitoredRPCHandler) && + ((MonitoredRPCHandler) task).isOperationRunning()), + ALL("all", task -> true); - private TaskType(String type) { - this.type = type.toLowerCase(); - } + final String type; + final Predicate predicate; + TaskType(String type, Predicate predicate) { + this.type = type; + this.predicate = predicate; + } - static TaskType getTaskType(String type) { - if (type == null || type.isEmpty()) { - return ALL; - } - type = type.toLowerCase(); - for (TaskType taskType : values()) { - if (taskType.toString().equals(type)) { - return taskType; - } - } + static TaskType get(String type) { + if (type == null) { return ALL; } - - @Override - public String toString() { - return type; + for (TaskType taskType : values()) { + if (taskType.toString().equalsIgnoreCase(type)) { + return taskType; + } } + return ALL; } - /** - * Filter out unwanted task. - * @param task monitored task - * @return false if a task is accepted, true if it is filtered - */ - boolean filter(MonitoredTask task); + @Override + public String toString() { + return type; + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestMonitoredTask.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestMonitoredTask.java new file mode 100644 index 0000000000..c1b71ad153 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestMonitoredTask.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitoring; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.FifoRpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@Category({MediumTests.class}) +@RunWith(Parameterized.class) +public class TestMonitoredTask { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMonitoredTask.class); + + private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private final int handlerCount = 2; + + @Parameterized.Parameter + public Class rpcScheduleProvider; + + @Parameterized.Parameters(name = "{index}: provider={0}") + public static Iterable data() { + return Arrays.asList(new Object[] { SimpleRpcSchedulerFactory.class }, + new Object[] { FifoRpcSchedulerFactory.class }); + } + + @Before + public void before() throws Exception { + UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, handlerCount); + UTIL.getConfiguration() + .setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, handlerCount); + UTIL.getConfiguration() + .setInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, handlerCount); + UTIL.getConfiguration() + .set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, rpcScheduleProvider.getName()); + UTIL.startMiniCluster(1); + } + + @After + public void teardown() throws Exception { + UTIL.shutdownMiniCluster(); + TaskMonitor.get().clearTasks(); + } + + @Test + public void testCountOfMonitoredTask() throws Exception { + if (rpcScheduleProvider == SimpleRpcSchedulerFactory.class) { + // (default rpc (2) + meta rpc (2) + replica rpc (2)) * (rs + master) = 12 + assertEquals(handlerCount * 3 * 2, + TaskMonitor.get().getTasks(TaskMonitor.TaskType.HANDLER).size()); + } + + if (rpcScheduleProvider == FifoRpcSchedulerFactory.class) { + // (default rpc (2)) * (rs + master) = 4 + assertEquals(handlerCount * 2, + TaskMonitor.get().getTasks(TaskMonitor.TaskType.HANDLER).size()); + } + } +} + -- 2.16.2.windows.1