diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index b401871..2ffae8e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -164,6 +164,23 @@ public abstract class Procedure implements Comparable { } /** + * Used to keep the procedure lock even when the procedure is yielding or suspended. + * @return true if the procedure should hold on the lock until completionCleanup() + */ + protected boolean holdLock(final TEnvironment env) { + return false; + } + + /** + * This is used in conjuction with holdLock(). If holdLock() is true + * the procedure executor will not call acquireLock() if hasLock() is true. + * @return true if the procedure has the lock, false otherwise. + */ + protected boolean hasLock(final TEnvironment env) { + return false; + } + + /** * Called when the procedure is loaded for replay. * The procedure implementor may use this method to perform some quick * operation before replay. @@ -174,6 +191,14 @@ public abstract class Procedure implements Comparable { } /** + * Called when the procedure is ready to be added to the queue after + * the loading/replay operation. + */ + protected void afterReplay(final TEnvironment env) { + // no-op + } + + /** * Called when the procedure is marked as completed (success or rollback). * The procedure implementor may use this method to cleanup in-memory states. * This operation will not be retried on failure. @@ -332,6 +357,10 @@ public abstract class Procedure implements Comparable { return nonceKey; } + public synchronized boolean isInitializing() { + return state == ProcedureState.INITIALIZING; + } + /** * @return true if the procedure has failed. * true may mean failed but not yet rolledback or failed and rolledback. @@ -594,6 +623,10 @@ public abstract class Procedure implements Comparable { return childrenLatch > 0; } + protected synchronized int getChildrenLatch() { + return childrenLatch; + } + /** * Called by the RootProcedureState on procedure execution. * Each procedure store its stack-index positions. diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 5042329..a6955c2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -438,6 +438,7 @@ public class ProcedureExecutor { // some procedure may be started way before this stuff. for (int i = runnableList.size() - 1; i >= 0; --i) { Procedure proc = runnableList.get(i); + proc.afterReplay(getEnvironment()); if (!proc.hasParent()) { sendProcedureLoadedNotification(proc.getProcId()); } @@ -854,9 +855,9 @@ public class ProcedureExecutor { // Execute the procedure assert proc.getState() == ProcedureState.RUNNABLE : proc; - if (proc.acquireLock(getEnvironment())) { + if (acquireLock(proc)) { execProcedure(procStack, proc); - proc.releaseLock(getEnvironment()); + releaseLock(proc, false); } else { runnables.yield(proc); } @@ -876,12 +877,29 @@ public class ProcedureExecutor { // Finalize the procedure state if (proc.getProcId() == rootProcId) { procedureFinished(proc); + } else { + execCompletionCleanup(proc); } break; } } while (procStack.isFailed()); } + private boolean acquireLock(final Procedure proc) { + final TEnvironment env = getEnvironment(); + if (proc.holdLock(env) && proc.hasLock(env)) { + return true; + } + return proc.acquireLock(env); + } + + private void releaseLock(final Procedure proc, final boolean force) { + final TEnvironment env = getEnvironment(); + if (force || !proc.holdLock(env)) { + proc.releaseLock(env); + } + } + private void timeoutLoop() { while (isRunning()) { Procedure proc = waitingTimeout.poll(); @@ -935,7 +953,7 @@ public class ProcedureExecutor { * finished to user, and the result will be the fatal exception. */ private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) { - Procedure rootProc = procedures.get(rootProcId); + final Procedure rootProc = procedures.get(rootProcId); RemoteProcedureException exception = rootProc.getException(); if (exception == null) { exception = procStack.getException(); @@ -943,7 +961,7 @@ public class ProcedureExecutor { store.update(rootProc); } - List subprocStack = procStack.getSubproceduresStack(); + final List subprocStack = procStack.getSubproceduresStack(); assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; int stackTail = subprocStack.size(); @@ -951,7 +969,7 @@ public class ProcedureExecutor { while (stackTail --> 0) { final Procedure proc = subprocStack.get(stackTail); - if (!reuseLock && !proc.acquireLock(getEnvironment())) { + if (!reuseLock && !acquireLock(proc)) { // can't take a lock on the procedure, add the root-proc back on the // queue waiting for the lock availability return false; @@ -965,7 +983,7 @@ public class ProcedureExecutor { // we can avoid to lock/unlock each step reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback; if (!reuseLock) { - proc.releaseLock(getEnvironment()); + releaseLock(proc, false); } // allows to kill the executor before something is stored to the wal. @@ -980,6 +998,10 @@ public class ProcedureExecutor { if (proc.isYieldAfterExecutionStep(getEnvironment())) { return false; } + + if (proc != rootProc) { + execCompletionCleanup(proc); + } } // Finalize the procedure state @@ -1297,14 +1319,22 @@ public class ProcedureExecutor { return Procedure.getRootProcedureId(procedures, proc); } - private void procedureFinished(final Procedure proc) { - // call the procedure completion cleanup handler + private void execCompletionCleanup(final Procedure proc) { + final TEnvironment env = getEnvironment(); + if (proc.holdLock(env) && proc.hasLock(env)) { + releaseLock(proc, true); + } try { - proc.completionCleanup(getEnvironment()); + proc.completionCleanup(env); } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e); } + } + + private void procedureFinished(final Procedure proc) { + // call the procedure completion cleanup handler + execCompletionCleanup(proc); // update the executor internal state maps ProcedureInfo procInfo = Procedure.createProcedureInfo(proc, proc.getNonceKey()); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java index 456f83d..402ddfc 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java @@ -65,14 +65,23 @@ public class RemoteProcedureException extends ProcedureException { return source; } - public IOException unwrapRemoteException() { - if (getCause() instanceof RemoteException) { - return ((RemoteException)getCause()).unwrapRemoteException(); + public Exception unwrapRemoteException() { + final Throwable cause = getCause(); + if (cause instanceof RemoteException) { + return ((RemoteException)cause).unwrapRemoteException(); } - if (getCause() instanceof IOException) { - return (IOException)getCause(); + if (cause instanceof Exception) { + return (Exception)cause; } - return new IOException(getCause()); + return new Exception(cause); + } + + public IOException unwrapRemoteIOException() { + final Exception cause = unwrapRemoteException(); + if (cause instanceof IOException) { + return (IOException)cause; + } + return new IOException(cause); } @Override diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java index 7eb6465..69b120a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java @@ -130,7 +130,9 @@ public abstract class StateMachineProcedure subProcList = new ArrayList(subProcedure.length); } for (int i = 0; i < subProcedure.length; ++i) { - subProcList.add(subProcedure[i]); + Procedure proc = subProcedure[i]; + if (!proc.hasOwner()) proc.setOwner(getOwner()); + subProcList.add(proc); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java new file mode 100644 index 0000000..8ac0fbd --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Threads; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureSuspended { + private static final Log LOG = LogFactory.getLog(TestProcedureSuspended.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + private static final Procedure NULL_PROC = null; + + private ProcedureExecutor procExecutor; + private ProcedureStore procStore; + + private HBaseCommonTestingUtility htu; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + + procStore = new NoopProcedureStore(); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + } + + @Test(timeout=10000) + public void testSuspendWhileHoldingLocks() { + final AtomicBoolean lockA = new AtomicBoolean(false); + final AtomicBoolean lockB = new AtomicBoolean(false); + + final TestLockProcedure p1keyA = new TestLockProcedure(lockA, "keyA", false, true); + final TestLockProcedure p2keyA = new TestLockProcedure(lockA, "keyA", false, true); + final TestLockProcedure p3keyB = new TestLockProcedure(lockB, "keyB", false, true); + + procExecutor.submitProcedure(p1keyA); + procExecutor.submitProcedure(p2keyA); + procExecutor.submitProcedure(p3keyB); + + // first run p1, p3 are able to run p2 is blocked by p1 + waitAndAssertTimestamp(p1keyA, 1, 1); + waitAndAssertTimestamp(p2keyA, 0, -1); + waitAndAssertTimestamp(p3keyB, 1, 2); + assertEquals(true, lockA.get()); + assertEquals(true, lockB.get()); + + // release p3 + p3keyB.setThrowSuspend(false); + procExecutor.getRunnableSet().addFront(p3keyB); + waitAndAssertTimestamp(p1keyA, 1, 1); + waitAndAssertTimestamp(p2keyA, 0, -1); + waitAndAssertTimestamp(p3keyB, 2, 3); + assertEquals(true, lockA.get()); + + // wait until p3 is fully completed + ProcedureTestingUtility.waitProcedure(procExecutor, p3keyB); + assertEquals(false, lockB.get()); + + // rollback p2 and wait until is fully completed + p1keyA.setTriggerRollback(true); + procExecutor.getRunnableSet().addFront(p1keyA); + ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA); + + // p2 should start and suspend + waitAndAssertTimestamp(p1keyA, 4, 60000); + waitAndAssertTimestamp(p2keyA, 1, 7); + waitAndAssertTimestamp(p3keyB, 2, 3); + assertEquals(true, lockA.get()); + + // wait until p2 is fully completed + p2keyA.setThrowSuspend(false); + procExecutor.getRunnableSet().addFront(p2keyA); + ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA); + waitAndAssertTimestamp(p1keyA, 4, 60000); + waitAndAssertTimestamp(p2keyA, 2, 8); + waitAndAssertTimestamp(p3keyB, 2, 3); + assertEquals(false, lockA.get()); + assertEquals(false, lockB.get()); + } + + @Test(timeout=10000) + public void testYieldWhileHoldingLocks() { + final AtomicBoolean lock = new AtomicBoolean(false); + + final TestLockProcedure p1 = new TestLockProcedure(lock, "key", true, false); + final TestLockProcedure p2 = new TestLockProcedure(lock, "key", true, false); + + procExecutor.submitProcedure(p1); + procExecutor.submitProcedure(p2); + + // try to execute a bunch of yield on p1, p2 should be blocked + while (p1.getTimestamps().size() < 100) Threads.sleep(10); + assertEquals(0, p2.getTimestamps().size()); + + // wait until p1 is completed + p1.setThrowYield(false); + ProcedureTestingUtility.waitProcedure(procExecutor, p1); + + // try to execute a bunch of yield on p2 + while (p2.getTimestamps().size() < 100) Threads.sleep(10); + assertEquals(p1.getTimestamps().get(p1.getTimestamps().size() - 1).longValue() + 1, + p2.getTimestamps().get(0).longValue()); + + // wait until p2 is completed + p1.setThrowYield(false); + ProcedureTestingUtility.waitProcedure(procExecutor, p1); + } + + private void waitAndAssertTimestamp(TestLockProcedure proc, int size, int lastTs) { + final ArrayList timestamps = proc.getTimestamps(); + while (timestamps.size() < size) Threads.sleep(10); + LOG.info(proc + " -> " + timestamps); + assertEquals(size, timestamps.size()); + if (size > 0) { + assertEquals(lastTs, timestamps.get(timestamps.size() - 1).longValue()); + } + } + + public static class TestLockProcedure extends Procedure { + private final ArrayList timestamps = new ArrayList(); + private final String key; + + private boolean triggerRollback = false; + private boolean throwSuspend = false; + private boolean throwYield = false; + private AtomicBoolean lock = null; + private boolean hasLock = false; + + public TestLockProcedure(final AtomicBoolean lock, final String key, + final boolean throwYield, final boolean throwSuspend) { + this.lock = lock; + this.key = key; + this.throwYield = throwYield; + this.throwSuspend = throwSuspend; + } + + public void setThrowYield(final boolean throwYield) { + this.throwYield = throwYield; + } + + public void setThrowSuspend(final boolean throwSuspend) { + this.throwSuspend = throwSuspend; + } + + public void setTriggerRollback(final boolean triggerRollback) { + this.triggerRollback = triggerRollback; + } + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureYieldException, ProcedureSuspendedException { + LOG.info("EXECUTE " + this + " suspend " + (lock != null)); + timestamps.add(env.nextTimestamp()); + if (triggerRollback) { + setFailure(getClass().getSimpleName(), new Exception("injected failure")); + } else if (throwYield) { + throw new ProcedureYieldException(); + } else if (throwSuspend) { + throw new ProcedureSuspendedException(); + } + return null; + } + + @Override + protected void rollback(final TestProcEnv env) { + LOG.info("ROLLBACK " + this); + timestamps.add(env.nextTimestamp() * 10000); + } + + protected boolean acquireLock(final TestProcEnv env) { + if ((hasLock = lock.compareAndSet(false, true))) { + LOG.info("ACQUIRE LOCK " + this + " " + (hasLock)); + } + return hasLock; + } + + protected void releaseLock(final TestProcEnv env) { + LOG.info("RELEASE LOCK " + this + " " + hasLock); + lock.set(false); + hasLock = false; + } + + protected boolean holdLock(final TestProcEnv env) { + return true; + } + + protected boolean hasLock(final TestProcEnv env) { + return hasLock; + } + + public ArrayList getTimestamps() { + return timestamps; + } + + @Override + protected void toStringClassDetails(StringBuilder builder) { + builder.append(getClass().getName()); + builder.append("(" + key + ")"); + } + + @Override + protected boolean abort(TestProcEnv env) { return false; } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + } + } + + private static class TestProcEnv { + public final AtomicLong timestamp = new AtomicLong(0); + + public long nextTimestamp() { + return timestamp.incrementAndGet(); + } + } +} \ No newline at end of file diff --git a/hbase-procedure/src/test/resources/hbase-site.xml b/hbase-procedure/src/test/resources/hbase-site.xml index d350127..8e41bb6 100644 --- a/hbase-procedure/src/test/resources/hbase-site.xml +++ b/hbase-procedure/src/test/resources/hbase-site.xml @@ -32,4 +32,16 @@ procedure to have an owner + + hbase.procedure.cleaner.interval + 10000 + + + hbase.procedure.cleaner.evict.ttl + 21000 + + + hbase.procedure.cleaner.acked.evict.ttl + 7000 + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 51bdf52..07c8b98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -291,7 +291,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { TableProcedureInterface iProcTable = (TableProcedureInterface)proc; boolean tableDeleted; if (proc.hasException()) { - IOException procEx = proc.getException().unwrapRemoteException(); + Exception procEx = proc.getException().unwrapRemoteException(); if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { // create failed because the table already exist tableDeleted = !(procEx instanceof TableExistsException); @@ -1616,4 +1616,4 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return Math.max(1, queue.getPriority() * quantum); // TODO } } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java index eaeb9ac..31077da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java @@ -82,7 +82,7 @@ public abstract class ProcedurePrepareLatch { protected void countDown(final Procedure proc) { if (proc.hasException()) { - exception = proc.getException().unwrapRemoteException(); + exception = proc.getException().unwrapIORemoteException(); } latch.countDown(); }