From 824a9ae486dc306f3caead1679a08aba8b80a3e8 Mon Sep 17 00:00:00 2001 From: Xu Cang Date: Sat, 2 Jun 2018 09:41:30 -0700 Subject: [PATCH] HBASE-20611 Avoid UnSupportedOperationException in getCallQueueInfo() --- .../org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java | 3 ++- .../java/org/apache/hadoop/hbase/ipc/RpcExecutor.java | 6 ++++-- .../apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 16 ++++++++++++++++ 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index bd8bdce..693f7cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -200,7 +200,8 @@ public class FifoRpcScheduler extends RpcScheduler { callQueueInfo.setCallMethodSize(queueName, methodSize); - for (Runnable r:executor.getQueue()) { + while (!executor.getQueue().isEmpty()) { + Runnable r = executor.getQueue().poll(); FifoCallRunner mcr = (FifoCallRunner) r; RpcCall rpcCall = mcr.getCallRunner().getRpcCall(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index f63b243..0b5e665 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -158,7 +158,8 @@ public abstract class RpcExecutor { HashMap callQueueMethodTotalCount = new HashMap<>(); for(BlockingQueue queue: queues) { - for (CallRunner cr:queue) { + while (!queue.isEmpty()) { + CallRunner cr = queue.poll(); RpcCall rpcCall = cr.getRpcCall(); String method; @@ -179,7 +180,8 @@ public abstract class RpcExecutor { HashMap callQueueMethodTotalSize = new HashMap<>(); for(BlockingQueue queue: queues) { - for (CallRunner cr:queue) { + while (!queue.isEmpty()) { + CallRunner cr = queue.poll(); RpcCall rpcCall = cr.getRpcCall(); String method; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index a0b3b54..d9fe440 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -108,6 +108,22 @@ public class TestSimpleRpcScheduler { scheduler.stop(); } + @Test + public void testWithDeadlineQueue() throws IOException, InterruptedException { + + PriorityFunction qosFunction = mock(PriorityFunction.class); + conf.set("hbase.ipc.server.callqueue.type", "deadline"); + RpcScheduler scheduler = new SimpleRpcScheduler( + conf, 10, 0, 0, qosFunction, 0); + scheduler.init(CONTEXT); + scheduler.start(); + CallRunner task = createMockTask(); + task.setStatus(new MonitoredRPCHandlerImpl()); + scheduler.dispatch(task); + verify(task, timeout(10000)).run(); + scheduler.stop(); + } + private RpcScheduler disableHandlers(RpcScheduler scheduler) { try { Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor"); -- 2.7.4