diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 1e0ee79..267bfcf 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -1837,8 +1837,9 @@ public class ProcedureExecutor { @Override public void run() { long lastUpdate = EnvironmentEdgeManager.currentTime(); - try { - while (isRunning() && keepAlive(lastUpdate)) { + IdLock.Entry lockEntry = null; + while (isRunning() && keepAlive(lastUpdate)) { + try { Procedure proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); if (proc == null) { continue; @@ -1846,31 +1847,33 @@ public class ProcedureExecutor { this.activeProcedure = proc; int activeCount = activeExecutorCount.incrementAndGet(); int runningCount = store.setRunningProcedureCount(activeCount); - LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), - runningCount, activeCount); + LOG.trace("Execute pid={} runningCount={}, activeCount={}", + proc.getProcId(), runningCount, activeCount); executionStartTime.set(EnvironmentEdgeManager.currentTime()); - IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); - try { - executeProcedure(proc); - } catch (AssertionError e) { - LOG.info("ASSERT pid=" + proc.getProcId(), e); - throw e; - } finally { + lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); + executeProcedure(proc); + } catch (Throwable t) { + LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t); + } finally { + if (lockEntry != null) { procExecutionLock.releaseLockEntry(lockEntry); - activeCount = activeExecutorCount.decrementAndGet(); - runningCount = store.setRunningProcedureCount(activeCount); - LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), - runningCount, activeCount); - this.activeProcedure = null; + lockEntry = null; + } + if (activeProcedure != null) { + int activeCount = activeExecutorCount.decrementAndGet(); + int runningCount = store.setRunningProcedureCount(activeCount); + LOG.trace("Halt pid={} runningCount={}, activeCount={}", + activeProcedure.getProcId(), runningCount, activeCount); + // only update lastUpdate if a procedure is truly executed + // otherwise the KeepAliveThread won't exit lastUpdate = EnvironmentEdgeManager.currentTime(); - executionStartTime.set(Long.MAX_VALUE); } + executionStartTime.set(Long.MAX_VALUE); + this.activeProcedure = null; } - } catch (Throwable t) { - LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t); - } finally { - LOG.trace("Worker terminated."); } + LOG.debug("Worker terminated: " + this.getName() + " isRunning: " + isRunning() + + " isAlive" + isAlive()); workerThreads.remove(this); }