diff --git a/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java b/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java index dbdadcc..798cd64 100644 --- a/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java +++ b/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java @@ -52,7 +52,9 @@ public class TaskMonitor { private static TaskMonitor instance; private CircularFifoBuffer tasks = new CircularFifoBuffer(MAX_TASKS); - + // Used to hold status for regular rpc handler, priority rpc handler and replication rpc handler + // so that it will not be overriden by spike of non-RPC handler status + private List rpcHandlerTasks = Lists.newArrayList(); /** * Get singleton instance. * TODO this would be better off scoped to a single daemon @@ -84,11 +86,13 @@ public class TaskMonitor { new Class[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler(stat)); TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); - tasks.add(pair); + rpcHandlerTasks.add(pair); return proxy; } private synchronized void purgeExpiredTasks() { + // no need to purge rpcHandlerTasks since they are not short lived tasks that come and go + // like non-RPC task status for (Iterator it = tasks.iterator(); it.hasNext();) { TaskAndWeakRefPair pair = it.next(); @@ -116,13 +120,21 @@ public class TaskMonitor { */ public synchronized List getTasks() { purgeExpiredTasks(); - ArrayList ret = Lists.newArrayListWithCapacity(tasks.size()); - for (Iterator it = tasks.iterator(); - it.hasNext();) { + ArrayList ret = Lists.newArrayListWithCapacity(tasks.size() + + rpcHandlerTasks.size()); + + for (Iterator it = rpcHandlerTasks.iterator(); it.hasNext();) { + TaskAndWeakRefPair pair = it.next(); + MonitoredTask t = pair.get(); + ret.add(t.clone()); + } + + for (Iterator it = tasks.iterator(); it.hasNext();) { TaskAndWeakRefPair pair = it.next(); MonitoredTask t = pair.get(); ret.add(t.clone()); } + return ret; } @@ -130,7 +142,6 @@ public class TaskMonitor { long cts = stat.getCompletionTimestamp(); return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME); } - public void dumpAsText(PrintWriter out) { long now = System.currentTimeMillis(); diff --git a/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java index 98de024..5d43631 100644 --- a/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java +++ b/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java @@ -101,6 +101,27 @@ public class TestTaskMonitor { assertEquals("task 10", tm.getTasks().get(0).getDescription()); } + @Test + public void testRPCHandlerStatusNotRemovedByOverflowNonRpcTask() throws Exception { + TaskMonitor tm = new TaskMonitor(); + for (int i = 0; i < 10; i++) { + tm.createRPCStatus("RPC handler " + i); + } + for (int i = 0; i < TaskMonitor.MAX_TASKS + 10; i++) { + tm.createStatus("task " + i); + } + + for (int i = 0; i < 10; i++) { + assertEquals(tm.getTasks().get(i).getDescription(), "RPC handler " + i); + } + + // Make sure it was limited correctly + assertEquals(TaskMonitor.MAX_TASKS + 10, tm.getTasks().size()); + // Make sure we culled the earlier tasks, not later + // (i.e. tasks 0 through 9 should have been deleted) + assertEquals("task 10", tm.getTasks().get(0 + 10).getDescription()); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =