commit cbf4180f20ecd61dbd059088ac0d7002ff9eccc6 Author: Todd Lipcon Date: Mon Aug 29 19:36:43 2011 -0700 HBASE-4281. ExecutorService monitoring (v3) diff --git a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java index 958de8d..ad60ba9 100644 --- a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java @@ -229,4 +229,21 @@ public abstract class EventHandler implements Runnable, Comparable { public synchronized void setListener(EventHandlerListener listener) { this.listener = listener; } -} \ No newline at end of file + + @Override + public String toString() { + return "Event #" + getSeqid() + + " of type " + eventType + + " (" + getInformativeName() + ")"; + } + + /** + * Event implementations should override thie class to provide an + * informative name about what event they are handling. For example, + * event-specific information such as which region or server is + * being processed should be included if possible. + */ + public String getInformativeName() { + return this.getClass().toString(); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 06b0849..1216f94 100644 --- a/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -19,12 +19,19 @@ */ package org.apache.hadoop.hbase.executor; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.Writer; +import java.lang.management.ThreadInfo; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -33,7 +40,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener; import org.apache.hadoop.hbase.executor.EventHandler.EventType; +import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -245,6 +255,14 @@ public class ExecutorService { return this.eventHandlerListeners.remove(type); } + public Map getAllExecutorStatuses() { + Map ret = Maps.newHashMap(); + for (Map.Entry e : executorMap.entrySet()) { + ret.put(e.getKey(), e.getValue().getStatus()); + } + return ret; + } + /** * Executor instance. */ @@ -252,7 +270,7 @@ public class ExecutorService { // how long to retain excess threads final long keepAliveTimeInMillis = 1000; // the thread pool executor that services the requests - final ThreadPoolExecutor threadPoolExecutor; + final TrackingThreadPoolExecutor threadPoolExecutor; // work queue to use - unbounded queue final BlockingQueue q = new LinkedBlockingQueue(); private final String name; @@ -266,7 +284,7 @@ public class ExecutorService { this.name = name; this.eventHandlerListeners = eventHandlerListeners; // create the thread pool executor - this.threadPoolExecutor = new ThreadPoolExecutor(maxThreads, maxThreads, + this.threadPoolExecutor = new TrackingThreadPoolExecutor(maxThreads, maxThreads, keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q); // name the threads for this threadpool ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); @@ -292,5 +310,118 @@ public class ExecutorService { public String toString() { return getClass().getSimpleName() + "-" + id + "-" + name; } + + public ExecutorStatus getStatus() { + List queuedEvents = Lists.newArrayList(); + for (Runnable r : q) { + if (!(r instanceof EventHandler)) { + LOG.warn("Non-EventHandler " + r + " queued in " + name); + continue; + } + queuedEvents.add((EventHandler)r); + } + + List running = Lists.newArrayList(); + for (Map.Entry e : + threadPoolExecutor.getRunningTasks().entrySet()) { + Runnable r = e.getValue(); + if (!(r instanceof EventHandler)) { + LOG.warn("Non-EventHandler " + r + " running in " + name); + continue; + } + running.add(new RunningEventStatus(e.getKey(), (EventHandler)r)); + } + + return new ExecutorStatus(this, queuedEvents, running); + } + } + + /** + * A subclass of ThreadPoolExecutor that keeps track of the Runnables that + * are executing at any given point in time. + */ + static class TrackingThreadPoolExecutor extends ThreadPoolExecutor { + private ConcurrentMap running = Maps.newConcurrentMap(); + + public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + running.remove(Thread.currentThread()); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + Runnable oldPut = running.put(t, r); + assert oldPut == null : "inconsistency for thread " + t; + super.beforeExecute(t, r); + } + + public ConcurrentMap getRunningTasks() { + return running; + } + } + + /** + * A snapshot of the status of a particular executor. This includes + * the contents of the executor's pending queue, as well as the + * threads and events currently being processed. + * + * This is a consistent snapshot that is immutable once constructed. + */ + public static class ExecutorStatus { + final Executor executor; + final List queuedEvents; + final List running; + + public ExecutorStatus(Executor executor, + List queuedEvents, + List running) { + this.executor = executor; + this.queuedEvents = queuedEvents; + this.running = running; + } + + public void dumpTo(Writer out, String indent) throws IOException { + out.write(indent + "Status for executor: " + executor + "\n"); + out.write(indent + "=======================================\n"); + out.write(indent + queuedEvents.size() + " events queued, " + + running.size() + " running\n"); + if (!queuedEvents.isEmpty()) { + out.write(indent + "Queued:\n"); + for (EventHandler e : queuedEvents) { + out.write(indent + " " + e + "\n"); + } + out.write("\n"); + } + if (!running.isEmpty()) { + out.write(indent + "Running:\n"); + for (RunningEventStatus stat : running) { + out.write(indent + " Running on thread '" + stat.threadInfo.getThreadName() + + "': " + stat.event + "\n"); + out.write(ThreadMonitoring.formatThreadInfo(stat.threadInfo, indent + " ")); + out.write("\n"); + } + } + out.flush(); + } + } + + /** + * The status of a particular event that is in the middle of being + * handled by an executor. + */ + public static class RunningEventStatus { + final ThreadInfo threadInfo; + final EventHandler event; + + public RunningEventStatus(Thread t, EventHandler event) { + this.threadInfo = ThreadMonitoring.getThreadInfo(t); + this.event = event; + } } } diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index dd2d6f6..4754ff0 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -71,6 +71,15 @@ public class ServerShutdownHandler extends EventHandler { LOG.warn(this.serverName + " is NOT in deadservers; it should be!"); } } + + @Override + public String getInformativeName() { + if (serverName != null) { + return this.getClass().getSimpleName() + " for " + serverName; + } else { + return super.getInformativeName(); + } + } /** * Before assign the ROOT region, ensure it haven't diff --git a/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadMonitoring.java b/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadMonitoring.java new file mode 100644 index 0000000..1b82610 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadMonitoring.java @@ -0,0 +1,91 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitoring; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; + +public abstract class ThreadMonitoring { + + private static final ThreadMXBean threadBean = + ManagementFactory.getThreadMXBean(); + private static final int STACK_DEPTH = 20; + + public static ThreadInfo getThreadInfo(Thread t) { + long tid = t.getId(); + return threadBean.getThreadInfo(tid, STACK_DEPTH); + } + + + public static String formatThreadInfo(ThreadInfo threadInfo, String indent) { + StringBuilder sb = new StringBuilder(); + appendThreadInfo(sb, threadInfo, indent); + return sb.toString(); + } + + /** + * Print all of the thread's information and stack traces. + * + * @param stream the stream to + * + */ + public static void appendThreadInfo(StringBuilder sb, + ThreadInfo info, + String indent) { + boolean contention = threadBean.isThreadContentionMonitoringEnabled(); + + if (info == null) { + sb.append(indent).append("Inactive (perhaps exited while monitoring was done)\n"); + return; + } + String taskName = getTaskName(info.getThreadId(), info.getThreadName()); + sb.append(indent).append("Thread ").append(taskName).append(":\n"); + + Thread.State state = info.getThreadState(); + sb.append(indent).append(" State: ").append(state).append("\n"); + sb.append(indent).append(" Blocked count: ").append(info.getBlockedCount()).append("\n"); + sb.append(indent).append(" Waited count: ").append(info.getWaitedCount()).append("\n"); + if (contention) { + sb.append(indent).append(" Blocked time: " + info.getBlockedTime()).append("\n"); + sb.append(indent).append(" Waited time: " + info.getWaitedTime()).append("\n"); + } + if (state == Thread.State.WAITING) { + sb.append(indent).append(" Waiting on ").append(info.getLockName()).append("\n"); + } else if (state == Thread.State.BLOCKED) { + sb.append(indent).append(" Blocked on ").append(info.getLockName()).append("\n"); + sb.append(indent).append(" Blocked by ").append( + getTaskName(info.getLockOwnerId(), info.getLockOwnerName())).append("\n"); + } + sb.append(indent).append(" Stack:").append("\n"); + for (StackTraceElement frame: info.getStackTrace()) { + sb.append(indent).append(" ").append(frame.toString()).append("\n"); + } + } + + private static String getTaskName(long id, String name) { + if (name == null) { + return Long.toString(id); + } + return id + " (" + name + ")"; + } + + +} diff --git a/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java b/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java index 88e5aaa..68d76f2 100644 --- a/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java +++ b/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java @@ -19,9 +19,12 @@ */ package org.apache.hadoop.hbase.executor; +import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -32,6 +35,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService.Executor; +import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.junit.Test; import static org.mockito.Mockito.*; @@ -82,6 +86,12 @@ public class TestExecutorService { assertEquals(maxThreads, counter.get()); assertEquals(maxThreads, pool.getPoolSize()); + ExecutorStatus status = executor.getStatus(); + assertTrue(status.queuedEvents.isEmpty()); + assertEquals(5, status.running.size()); + checkStatusDump(status); + + // Now interrupt the running Executor synchronized (lock) { lock.set(false); @@ -116,6 +126,15 @@ public class TestExecutorService { assertEquals(maxThreads, pool.getPoolSize()); } + private void checkStatusDump(ExecutorStatus status) throws IOException { + StringWriter sw = new StringWriter(); + status.dumpTo(sw, ""); + String dump = sw.toString(); + LOG.info("Got status dump:\n" + dump); + + assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean")); + } + public static class TestEventHandler extends EventHandler { private AtomicBoolean lock; private AtomicInteger counter;