diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index b9145e7..0773a64 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -164,6 +164,23 @@ public abstract class Procedure implements Comparable { } /** + * Used to keep the procedure lock even when the procedure is yielding or suspended. + * @return true if the procedure should hold on the lock until completionCleanup() + */ + protected boolean holdLock(final TEnvironment env) { + return false; + } + + /** + * This is used in conjuction with holdLock(). If holdLock() is true + * the procedure executor will not call acquireLock() if hasLock() is true. + * @return true if the procedure has the lock, false otherwise. + */ + protected boolean hasLock(final TEnvironment env) { + return false; + } + + /** * Called when the procedure is loaded for replay. * The procedure implementor may use this method to perform some quick * operation before replay. @@ -174,6 +191,14 @@ public abstract class Procedure implements Comparable { } /** + * Called when the procedure is ready to be added to the queue after + * the loading/replay operation. + */ + protected void afterReplay(final TEnvironment env) { + // no-op + } + + /** * Called when the procedure is marked as completed (success or rollback). * The procedure implementor may use this method to cleanup in-memory states. * This operation will not be retried on failure. @@ -339,6 +364,10 @@ public abstract class Procedure implements Comparable { return state == ProcedureState.RUNNABLE; } + public synchronized boolean isInitializing() { + return state == ProcedureState.INITIALIZING; + } + /** * @return true if the procedure has failed. * true may mean failed but not yet rolledback or failed and rolledback. @@ -601,6 +630,10 @@ public abstract class Procedure implements Comparable { return childrenLatch > 0; } + protected synchronized int getChildrenLatch() { + return childrenLatch; + } + /** * Called by the RootProcedureState on procedure execution. * Each procedure store its stack-index positions. diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 5066fb4..1cfd7d2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -438,6 +438,7 @@ public class ProcedureExecutor { // some procedure may be started way before this stuff. for (int i = runnableList.size() - 1; i >= 0; --i) { Procedure proc = runnableList.get(i); + proc.afterReplay(getEnvironment()); if (!proc.hasParent()) { sendProcedureLoadedNotification(proc.getProcId()); } @@ -857,9 +858,9 @@ public class ProcedureExecutor { // Execute the procedure assert proc.getState() == ProcedureState.RUNNABLE : proc; - if (proc.acquireLock(getEnvironment())) { + if (acquireLock(proc)) { execProcedure(procStack, proc); - proc.releaseLock(getEnvironment()); + releaseLock(proc, false); } else { runnables.yield(proc); } @@ -879,12 +880,29 @@ public class ProcedureExecutor { // Finalize the procedure state if (proc.getProcId() == rootProcId) { procedureFinished(proc); + } else { + execCompletionCleanup(proc); } break; } } while (procStack.isFailed()); } + private boolean acquireLock(final Procedure proc) { + final TEnvironment env = getEnvironment(); + if (proc.holdLock(env) && proc.hasLock(env)) { + return true; + } + return proc.acquireLock(env); + } + + private void releaseLock(final Procedure proc, final boolean force) { + final TEnvironment env = getEnvironment(); + if (force || !proc.holdLock(env)) { + proc.releaseLock(env); + } + } + private void timeoutLoop() { while (isRunning()) { Procedure proc = waitingTimeout.poll(); @@ -940,7 +958,7 @@ public class ProcedureExecutor { * finished to user, and the result will be the fatal exception. */ private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) { - Procedure rootProc = procedures.get(rootProcId); + final Procedure rootProc = procedures.get(rootProcId); RemoteProcedureException exception = rootProc.getException(); if (exception == null) { exception = procStack.getException(); @@ -948,7 +966,7 @@ public class ProcedureExecutor { store.update(rootProc); } - List subprocStack = procStack.getSubproceduresStack(); + final List subprocStack = procStack.getSubproceduresStack(); assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; int stackTail = subprocStack.size(); @@ -956,7 +974,7 @@ public class ProcedureExecutor { while (stackTail --> 0) { final Procedure proc = subprocStack.get(stackTail); - if (!reuseLock && !proc.acquireLock(getEnvironment())) { + if (!reuseLock && !acquireLock(proc)) { // can't take a lock on the procedure, add the root-proc back on the // queue waiting for the lock availability return false; @@ -970,7 +988,7 @@ public class ProcedureExecutor { // we can avoid to lock/unlock each step reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback; if (!reuseLock) { - proc.releaseLock(getEnvironment()); + releaseLock(proc, false); } // allows to kill the executor before something is stored to the wal. @@ -985,6 +1003,10 @@ public class ProcedureExecutor { if (proc.isYieldAfterExecutionStep(getEnvironment())) { return false; } + + if (proc != rootProc) { + execCompletionCleanup(proc); + } } // Finalize the procedure state @@ -1302,14 +1324,22 @@ public class ProcedureExecutor { return Procedure.getRootProcedureId(procedures, proc); } - private void procedureFinished(final Procedure proc) { - // call the procedure completion cleanup handler + private void execCompletionCleanup(final Procedure proc) { + final TEnvironment env = getEnvironment(); + if (proc.holdLock(env) && proc.hasLock(env)) { + releaseLock(proc, true); + } try { - proc.completionCleanup(getEnvironment()); + proc.completionCleanup(env); } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e); } + } + + private void procedureFinished(final Procedure proc) { + // call the procedure completion cleanup handler + execCompletionCleanup(proc); // update the executor internal state maps ProcedureInfo procInfo = Procedure.createProcedureInfo(proc, proc.getNonceKey()); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java index 456f83d..402ddfc 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java @@ -65,14 +65,23 @@ public class RemoteProcedureException extends ProcedureException { return source; } - public IOException unwrapRemoteException() { - if (getCause() instanceof RemoteException) { - return ((RemoteException)getCause()).unwrapRemoteException(); + public Exception unwrapRemoteException() { + final Throwable cause = getCause(); + if (cause instanceof RemoteException) { + return ((RemoteException)cause).unwrapRemoteException(); } - if (getCause() instanceof IOException) { - return (IOException)getCause(); + if (cause instanceof Exception) { + return (Exception)cause; } - return new IOException(getCause()); + return new Exception(cause); + } + + public IOException unwrapRemoteIOException() { + final Exception cause = unwrapRemoteException(); + if (cause instanceof IOException) { + return (IOException)cause; + } + return new IOException(cause); } @Override diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java index 7eb6465..69b120a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java @@ -130,7 +130,9 @@ public abstract class StateMachineProcedure subProcList = new ArrayList(subProcedure.length); } for (int i = 0; i < subProcedure.length; ++i) { - subProcList.add(subProcedure[i]); + Procedure proc = subProcedure[i]; + if (!proc.hasOwner()) proc.setOwner(getOwner()); + subProcList.add(proc); } } diff --git a/hbase-procedure/src/test/resources/hbase-site.xml b/hbase-procedure/src/test/resources/hbase-site.xml index d350127..8e41bb6 100644 --- a/hbase-procedure/src/test/resources/hbase-site.xml +++ b/hbase-procedure/src/test/resources/hbase-site.xml @@ -32,4 +32,16 @@ procedure to have an owner + + hbase.procedure.cleaner.interval + 10000 + + + hbase.procedure.cleaner.evict.ttl + 21000 + + + hbase.procedure.cleaner.acked.evict.ttl + 7000 + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 3a215d5..548fb00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -297,7 +297,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { TableProcedureInterface iProcTable = (TableProcedureInterface)proc; boolean tableDeleted; if (proc.hasException()) { - IOException procEx = proc.getException().unwrapRemoteException(); + Exception procEx = proc.getException().unwrapRemoteException(); if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { // create failed because the table already exist tableDeleted = !(procEx instanceof TableExistsException); @@ -1628,4 +1628,4 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return Math.max(1, queue.getPriority() * quantum); // TODO } } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java index eaeb9ac..eb23960 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java @@ -82,7 +82,7 @@ public abstract class ProcedurePrepareLatch { protected void countDown(final Procedure proc) { if (proc.hasException()) { - exception = proc.getException().unwrapRemoteException(); + exception = proc.getException().unwrapRemoteIOException(); } latch.countDown(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java index cf8fdd4..0e10293 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java @@ -83,7 +83,8 @@ public final class ProcedureSyncWait { if (result.isFailed()) { // If the procedure fails, we should always have an exception captured. Throw it. throw RemoteProcedureException.fromProto( - result.getForeignExceptionMessage().getForeignExchangeMessage()).unwrapRemoteException(); + result.getForeignExceptionMessage().getForeignExchangeMessage()) + .unwrapRemoteIOException(); } return result.getResult(); } else {