diff --git hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 0856aa2..2999869 100644 --- hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -29,13 +29,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -234,7 +231,7 @@ public class ProcedureExecutor { private Configuration conf; private ThreadGroup threadGroup; - private CopyOnWriteArrayList workerThreads; + private ThreadPoolExecutor workerThreadPool; private TimeoutExecutorThread timeoutExecutor; private int corePoolSize; @@ -247,7 +244,6 @@ public class ProcedureExecutor { private final AtomicLong lastProcId = new AtomicLong(-1); private final AtomicLong workerId = new AtomicLong(0); - private final AtomicInteger activeExecutorCount = new AtomicInteger(0); private final AtomicBoolean running = new AtomicBoolean(false); private final TEnvironment environment; private final ProcedureStore store; @@ -493,10 +489,7 @@ public class ProcedureExecutor { // Create the workers workerId.set(0); - workerThreads = new CopyOnWriteArrayList<>(); - for (int i = 0; i < corePoolSize; ++i) { - workerThreads.add(new WorkerThread(threadGroup)); - } + workerThreadPool = createThreadPoolExecutor(); long st, et; @@ -522,19 +515,23 @@ public class ProcedureExecutor { store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st))); // Start the executors. Here we must have the lastProcId set. - LOG.debug("Start workers " + workerThreads.size()); + LOG.debug("Start workers " + workerThreadPool.getCorePoolSize()); + workerThreadPool.prestartAllCoreThreads(); timeoutExecutor.start(); - for (WorkerThread worker: workerThreads) { - worker.start(); - } // Internal chores - timeoutExecutor.add(new WorkerMonitor()); + timeoutExecutor.add(new ScheduledPoller()); // Add completed cleaner chore addChore(new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap)); } + private ThreadPoolExecutor createThreadPoolExecutor() { + int maxPoolSize = Integer.MAX_VALUE; + BlockingQueue executorQueue = new ArrayBlockingQueue<>(1); // no task will be queued, this is the responsibility of ProcedureStore + return new SchedulerPollingThreadPoolExecutor(corePoolSize, maxPoolSize, Long.MAX_VALUE, TimeUnit.NANOSECONDS, executorQueue); + } + public void stop() { if (!running.getAndSet(false)) { return; @@ -553,22 +550,17 @@ public class ProcedureExecutor { timeoutExecutor.awaitTermination(); timeoutExecutor = null; - // stop the worker threads - for (WorkerThread worker: workerThreads) { - worker.awaitTermination(); - } - workerThreads = null; - - // Destroy the Thread Group for the executors + // TODO: logging here changed try { - threadGroup.destroy(); - } catch (IllegalThreadStateException e) { - LOG.error("Thread group " + threadGroup + " contains running threads"); - threadGroup.list(); - } finally { - threadGroup = null; + workerThreadPool.shutdown(); + workerThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } + catch (InterruptedException e) { + LOG.warn("Termination of thread pool executor interrupted.", e); } + workerThreadPool = null; + // reset the in-memory state for testing completed.clear(); rollbackStack.clear(); @@ -595,18 +587,19 @@ public class ProcedureExecutor { * @return the current number of worker threads. */ public int getWorkerThreadCount() { - return workerThreads.size(); + return workerThreadPool.getActiveCount(); + //return workerThreads.size(); } /** * @return the core pool size settings. */ public int getCorePoolSize() { - return corePoolSize; + return workerThreadPool.getCorePoolSize(); } public int getActiveExecutorCount() { - return activeExecutorCount.get(); + return workerThreadPool.getActiveCount(); } public TEnvironment getEnvironment() { @@ -1551,56 +1544,30 @@ public class ProcedureExecutor { sendProcedureFinishedNotification(proc.getProcId()); } - // ========================================================================== - // Worker Thread - // ========================================================================== - private final class WorkerThread extends StoppableThread { - private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); - - public WorkerThread(final ThreadGroup group) { - super(group, "ProcedureExecutorWorker-" + workerId.incrementAndGet()); + private class SchedulerPollingThreadPoolExecutor extends ThreadPoolExecutor { + public SchedulerPollingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } + // overwriting execution hook to poll scheduler @Override - public void sendStopSignal() { - scheduler.signalAll(); + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + final Procedure procedure = scheduler.poll(); + if (procedure != null) this.submit(new ProcedureRunner(procedure)); } + } - @Override - public void run() { - final boolean traceEnabled = LOG.isTraceEnabled(); - long lastUpdate = EnvironmentEdgeManager.currentTime(); - while (isRunning() && keepAlive(lastUpdate)) { - final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); - if (procedure == null) continue; - - store.setRunningProcedureCount(activeExecutorCount.incrementAndGet()); - executionStartTime.set(EnvironmentEdgeManager.currentTime()); - try { - if (traceEnabled) { - LOG.trace("Trying to start the execution of " + procedure); - } - executeProcedure(procedure); - } finally { - store.setRunningProcedureCount(activeExecutorCount.decrementAndGet()); - lastUpdate = EnvironmentEdgeManager.currentTime(); - executionStartTime.set(Long.MAX_VALUE); - } - } - LOG.debug("Worker thread terminated " + this); - workerThreads.remove(this); - } + private final class ProcedureRunner implements Runnable { + private Procedure procedure; - /** - * @return the time since the current procedure is running - */ - public long getCurrentRunTime() { - return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); + public ProcedureRunner(Procedure procedure) { + this.procedure = procedure; } - private boolean keepAlive(final long lastUpdate) { - if (workerThreads.size() <= corePoolSize) return true; - return (EnvironmentEdgeManager.currentTime() - lastUpdate) < keepAliveTime; + @Override + public void run() { + executeProcedure(procedure); } } @@ -1765,76 +1732,41 @@ public class ProcedureExecutor { // will have the tracker saying everything is in the last log. // ---------------------------------------------------------------------------- - private final class WorkerMonitor extends InlineChore { + + // needed to make sure scheduler will be polled + private final class ScheduledPoller extends InlineChore { public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = "hbase.procedure.worker.monitor.interval.msec"; private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec - public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = - "hbase.procedure.worker.stuck.threshold.msec"; - private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec - - public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = - "hbase.procedure.worker.add.stuck.percentage"; - private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck - - private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; - private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; - public WorkerMonitor() { + public ScheduledPoller() { refreshConfig(); } @Override public void run() { - final int stuckCount = checkForStuckWorkers(); - checkThreadCount(stuckCount); - // refresh interval (poor man dynamic conf update) + // why not before running? refreshConfig(); - } - - private int checkForStuckWorkers() { - // check if any of the worker is stuck - int stuckCount = 0; - for (WorkerThread worker: workerThreads) { - if (worker.getCurrentRunTime() < stuckThreshold) { - continue; - } - // WARN the worker is stuck - stuckCount++; - LOG.warn("Worker stuck " + worker + - " run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime())); - } - return stuckCount; - } - - private void checkThreadCount(final int stuckCount) { - // nothing to do if there are no runnable tasks - if (stuckCount < 1 || !scheduler.hasRunnables()) return; - - // add a new thread if the worker stuck percentage exceed the threshold limit - // and every handler is active. - final float stuckPerc = ((float)stuckCount) / workerThreads.size(); - if (stuckPerc >= addWorkerStuckPercentage && - activeExecutorCount.get() == workerThreads.size()) { - final WorkerThread worker = new WorkerThread(threadGroup); - workerThreads.add(worker); - worker.start(); - LOG.debug("Added new worker thread " + worker); + // NOTE: with this we will have corePoolSize number of threads as most times + // having at least corePoolSize threads + for (int i = workerThreadPool.getActiveCount(); i < workerThreadPool.getCorePoolSize(); i++) { + final Procedure procedure = scheduler.poll(); + if (procedure != null) workerThreadPool.submit(new ProcedureRunner(procedure)); + else return; } + + final Procedure procedure = scheduler.poll(); + if (procedure != null) workerThreadPool.submit(new ProcedureRunner(procedure)); } private void refreshConfig() { - addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, - DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); - timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, + timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, DEFAULT_WORKER_MONITOR_INTERVAL); - stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, - DEFAULT_WORKER_STUCK_THRESHOLD); - } + } @Override public int getTimeoutInterval() {