diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index b401871..b9145e7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -333,6 +333,13 @@ public abstract class Procedure implements Comparable { } /** + * @return true if the procedure is in a RUNNABLE state. + */ + protected synchronized boolean isRunnable() { + return state == ProcedureState.RUNNABLE; + } + + /** * @return true if the procedure has failed. * true may mean failed but not yet rolledback or failed and rolledback. */ diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 5042329..75e4a91 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -609,6 +609,7 @@ public class ProcedureExecutor { * @param chore the chore to add */ public void addChore(final ProcedureInMemoryChore chore) { + chore.setState(ProcedureState.RUNNABLE); waitingTimeout.add(chore); } @@ -617,6 +618,7 @@ public class ProcedureExecutor { * @param chore the chore to remove */ public void removeChore(final ProcedureInMemoryChore chore) { + chore.setState(ProcedureState.FINISHED); waitingTimeout.remove(chore); } @@ -906,13 +908,15 @@ public class ProcedureExecutor { // instead of bringing the Chore class in, we reuse this timeout thread for // this special case. if (proc instanceof ProcedureInMemoryChore) { - try { - ((ProcedureInMemoryChore)proc).periodicExecute(getEnvironment()); - } catch (Throwable e) { - LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e); + if (proc.isRunnable()) { + try { + ((ProcedureInMemoryChore)proc).periodicExecute(getEnvironment()); + } catch (Throwable e) { + LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e); + } + proc.setStartTime(EnvironmentEdgeManager.currentTime()); + if (proc.isRunnable()) waitingTimeout.add(proc); } - proc.setStartTime(EnvironmentEdgeManager.currentTime()); - waitingTimeout.add(proc); continue; } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java index 32e3e8c..8bc8fa8 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java @@ -76,15 +76,18 @@ public class TestProcedureInMemoryChore { CountDownLatch latch = new CountDownLatch(nCountDown); TestLatchChore chore = new TestLatchChore(timeoutMSec, latch); procExecutor.addChore(chore); + assertTrue(chore.isRunnable()); latch.await(); // remove the chore and verify it is no longer executed + assertTrue(chore.isRunnable()); procExecutor.removeChore(chore); latch = new CountDownLatch(nCountDown); chore.setLatch(latch); latch.await(timeoutMSec * nCountDown, TimeUnit.MILLISECONDS); LOG.info("chore latch count=" + latch.getCount()); - assertTrue(latch.getCount() > 0); + assertFalse(chore.isRunnable()); + assertTrue("latchCount=" + latch.getCount(), latch.getCount() > 0); } public static class TestLatchChore extends ProcedureInMemoryChore {