From 51e554ab548cb7eead279c959a2cf861ef513010 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 18 Jul 2018 09:40:21 +0800 Subject: [PATCH] HBASE-20846 Restore procedure locks when master restarts --- .../procedure2/AbstractProcedureScheduler.java | 2 +- .../hadoop/hbase/procedure2/DelayedProcedure.java | 5 +- .../apache/hadoop/hbase/procedure2/Procedure.java | 118 ++++-- .../hadoop/hbase/procedure2/ProcedureExecutor.java | 427 +++++++++++---------- .../hadoop/hbase/procedure2/ProcedureUtil.java | 7 + .../hbase/procedure2/RootProcedureState.java | 44 +-- .../hbase/procedure2/TimeoutExecutorThread.java | 26 +- .../hbase/procedure2/TestProcedureReplayOrder.java | 8 +- .../hbase/procedure2/TestProcedureSuspended.java | 6 - .../src/main/protobuf/Procedure.proto | 3 + .../hbase/master/ClusterSchemaServiceImpl.java | 4 +- .../org/apache/hadoop/hbase/master/HMaster.java | 7 +- .../hbase/master/assignment/GCRegionProcedure.java | 5 - .../assignment/MergeTableRegionsProcedure.java | 12 +- .../assignment/RegionTransitionProcedure.java | 33 +- .../hadoop/hbase/master/locking/LockProcedure.java | 9 - .../AbstractStateMachineNamespaceProcedure.java | 6 +- .../AbstractStateMachineRegionProcedure.java | 9 - .../AbstractStateMachineTableProcedure.java | 8 +- .../master/procedure/CreateNamespaceProcedure.java | 35 +- .../master/procedure/CreateTableProcedure.java | 12 +- .../hbase/master/procedure/InitMetaProcedure.java | 7 +- .../master/procedure/MasterProcedureScheduler.java | 37 +- .../master/procedure/MasterProcedureUtil.java | 2 +- .../hadoop/hbase/master/procedure/PeerQueue.java | 14 - .../hbase/master/procedure/ProcedureSyncWait.java | 4 +- .../hadoop/hbase/master/procedure/Queue.java | 13 +- .../master/replication/AbstractPeerProcedure.java | 14 +- .../resources/hbase-webapps/master/procedures.jsp | 2 +- .../hbase/client/TestGetProcedureResult.java | 2 +- .../master/assignment/TestAssignmentManager.java | 3 +- .../procedure/TestMasterProcedureEvents.java | 2 +- .../hbase/master/procedure/TestProcedureAdmin.java | 2 +- .../hbase/procedure/TestFailedProcCleanup.java | 5 +- .../security/access/TestAccessController.java | 2 +- 35 files changed, 479 insertions(+), 416 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index c036163..5645f89 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -163,8 +163,8 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { return null; } } - final Procedure pollResult = dequeue(); + pollCalls++; nullPollCalls += (pollResult == null) ? 1 : 0; return pollResult; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java index a9f3e7d..3fc9750 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java @@ -24,8 +24,9 @@ import org.apache.yetus.audience.InterfaceAudience; * Vessel that carries a Procedure and a timeout. */ @InterfaceAudience.Private -class DelayedProcedure extends DelayedUtil.DelayedContainerWithTimestamp> { - public DelayedProcedure(Procedure procedure) { +class DelayedProcedure + extends DelayedUtil.DelayedContainerWithTimestamp> { + public DelayedProcedure(Procedure procedure) { super(procedure, procedure.getTimeoutTimestamp()); } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 545bedf..42bb35e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -22,22 +22,23 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.metrics.Counter; import org.apache.hadoop.hbase.metrics.Histogram; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.NonceKey; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + /** * Base Procedure class responsible for Procedure Metadata; * e.g. state, submittedTime, lastUpdate, stack-indexes, etc. @@ -76,7 +77,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti * called multiple times, so again the code must be idempotent. * *

Procedure can be made respect a locking regime. It has acquire/release methods as - * well as an {@link #hasLock(Object)}. The lock implementation is up to the implementor. + * well as an {@link #hasLock()}. The lock implementation is up to the implementor. * If an entity needs to be locked for the life of a procedure -- not just the calls to * execute -- then implementations should say so with the {@link #holdLock(Object)} * method. @@ -122,6 +123,10 @@ public abstract class Procedure implements Comparable implements Comparable + * This is because that when master restarts, we need to restore the lock state for all the + * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the + * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part + * of the initialization!), so we need to split the code into two steps, and when restore, we just + * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock. + * @return true means we need to wait until the environment has been initialized, otherwise true. + */ + protected boolean waitInitialized(TEnvironment env) { + return false; + } + + /** * The user should override this method if they need a lock on an Entity. * A lock can be anything, and it is up to the implementor. The Procedure * Framework will call this method just before it invokes {@link #execute(Object)}. @@ -218,9 +239,6 @@ public abstract class Procedure implements Comparable implements Comparable implements Comparable implements Comparable procedures, - Procedure proc) { + protected static Long getRootProcedureId(final Map> procedures, + Procedure proc) { while (proc.hasParent()) { proc = procedures.get(proc.getParentProcId()); - if (proc == null) return null; + if (proc == null) { + return null; + } } return proc.getProcId(); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index db7c118..1bd2061 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Deque; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -108,16 +110,16 @@ public class ProcedureExecutor { void procedureFinished(long procId); } - private static class CompletedProcedureRetainer { - private final Procedure procedure; + private static final class CompletedProcedureRetainer { + private final Procedure procedure; private long clientAckTime; - public CompletedProcedureRetainer(Procedure procedure) { + public CompletedProcedureRetainer(Procedure procedure) { this.procedure = procedure; clientAckTime = -1; } - public Procedure getProcedure() { + public Procedure getProcedure() { return procedure; } @@ -172,13 +174,13 @@ public class ProcedureExecutor { private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size"; private static final int DEFAULT_BATCH_SIZE = 32; - private final Map completed; + private final Map> completed; private final Map nonceKeysToProcIdsMap; private final ProcedureStore store; private Configuration conf; - public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store, - final Map completedMap, + public CompletedProcedureCleaner(Configuration conf, final ProcedureStore store, + final Map> completedMap, final Map nonceKeysToProcIdsMap) { // set the timeout interval that triggers the periodic-procedure super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL)); @@ -205,10 +207,11 @@ public class ProcedureExecutor { int batchCount = 0; final long now = EnvironmentEdgeManager.currentTime(); - final Iterator> it = completed.entrySet().iterator(); + final Iterator>> it = + completed.entrySet().iterator(); while (it.hasNext() && store.isRunning()) { - final Map.Entry entry = it.next(); - final CompletedProcedureRetainer retainer = entry.getValue(); + final Map.Entry> entry = it.next(); + final CompletedProcedureRetainer retainer = entry.getValue(); final Procedure proc = retainer.getProcedure(); // TODO: Select TTL based on Procedure type @@ -240,28 +243,32 @@ public class ProcedureExecutor { * Once a Root-Procedure completes (success or failure), the result will be added to this map. * The user of ProcedureExecutor should call getResult(procId) to get the result. */ - private final ConcurrentHashMap completed = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> completed = + new ConcurrentHashMap<>(); /** * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. * The RootProcedureState contains the execution stack of the Root-Procedure, * It is added to the map by submitProcedure() and removed on procedure completion. */ - private final ConcurrentHashMap rollbackStack = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> rollbackStack = + new ConcurrentHashMap<>(); /** * Helper map to lookup the live procedures by ID. * This map contains every procedure. root-procedures and subprocedures. */ - private final ConcurrentHashMap procedures = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> procedures = + new ConcurrentHashMap<>(); /** - * Helper map to lookup whether the procedure already issued from the same client. - * This map contains every root procedure. + * Helper map to lookup whether the procedure already issued from the same client. This map + * contains every root procedure. */ private final ConcurrentHashMap nonceKeysToProcIdsMap = new ConcurrentHashMap<>(); - private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList listeners = + new CopyOnWriteArrayList<>(); private Configuration conf; @@ -287,7 +294,7 @@ public class ProcedureExecutor { * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery * (Should be ok). */ - private TimeoutExecutorThread timeoutExecutor; + private TimeoutExecutorThread timeoutExecutor; private int corePoolSize; private int maxPoolSize; @@ -357,27 +364,68 @@ public class ProcedureExecutor { }); } - private void loadProcedures(final ProcedureIterator procIter, - final boolean abortOnCorruption) throws IOException { - final boolean debugEnabled = LOG.isDebugEnabled(); + private void restoreLock(Procedure proc, Set restored) { + proc.restoreLock(getEnvironment()); + restored.add(proc.getProcId()); + } + + private void restoreLocks(Deque> stack, Set restored) { + while (!stack.isEmpty()) { + restoreLock(stack.pop(), restored); + } + } + + // Restore the locks for all the procedures. + // Notice that we need to restore the locks starting from the root proc, otherwise there will be + // problem that a sub procedure may hold the exclusive lock first and then we are stuck when + // calling the acquireLock method for the parent procedure. + // The algorithm is straight-forward: + // 1. Use a set to record the procedures which locks have already been restored. + // 2. Use a stack to store the hierarchy of the procedures + // 3. For all the procedure, we will first try to find its parent and push it into the stack, + // unless + // a. We have no parent, i.e, we are the root procedure + // b. The lock has already been restored(by checking the set introduced in #1) + // then we start to pop the stack and call acquireLock for each procedure. + // Notice that this should be done for all procedures, not only the ones in runnableList. + private void restoreLocks() { + Set restored = new HashSet<>(); + Deque> stack = new ArrayDeque<>(); + procedures.values().forEach(proc -> { + for (;;) { + if (restored.contains(proc.getProcId())) { + restoreLocks(stack, restored); + return; + } + if (!proc.hasParent()) { + restoreLock(proc, restored); + restoreLocks(stack, restored); + return; + } + stack.push(proc); + proc = procedures.get(proc.getParentProcId()); + } + }); + } + private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption) + throws IOException { // 1. Build the rollback stack int runnablesCount = 0; + int failedCount = 0; while (procIter.hasNext()) { boolean finished = procIter.isNextFinished(); - Procedure proc = procIter.next(); + Procedure proc = procIter.next(); NonceKey nonceKey = proc.getNonceKey(); long procId = proc.getProcId(); if (finished) { - completed.put(proc.getProcId(), new CompletedProcedureRetainer(proc)); - if (debugEnabled) { - LOG.debug("Completed " + proc); - } + completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); + LOG.debug("Completed {}", proc); } else { if (!proc.hasParent()) { assert !proc.isFinished() : "unexpected finished procedure"; - rollbackStack.put(proc.getProcId(), new RootProcedureState()); + rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); } // add the procedure to the map @@ -386,6 +434,8 @@ public class ProcedureExecutor { if (proc.getState() == ProcedureState.RUNNABLE) { runnablesCount++; + } else if (proc.getState() == ProcedureState.FAILED) { + failedCount++; } } @@ -396,8 +446,9 @@ public class ProcedureExecutor { } // 2. Initialize the stacks - final ArrayList runnableList = new ArrayList(runnablesCount); - HashSet waitingSet = null; + List> runnableList = new ArrayList<>(runnablesCount); + List> failedList = new ArrayList<>(failedCount); + Set> waitingSet = null; procIter.reset(); while (procIter.hasNext()) { if (procIter.isNextFinished()) { @@ -405,12 +456,10 @@ public class ProcedureExecutor { continue; } - Procedure proc = procIter.next(); + Procedure proc = procIter.next(); assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; - if (debugEnabled) { - LOG.debug(String.format("Loading %s", proc)); - } + LOG.debug("Loading {}", proc); Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { @@ -420,14 +469,14 @@ public class ProcedureExecutor { } if (proc.hasParent()) { - Procedure parent = procedures.get(proc.getParentProcId()); + Procedure parent = procedures.get(proc.getParentProcId()); // corrupted procedures are handled later at step 3 if (parent != null && !proc.isFinished()) { parent.incChildrenLatch(); } } - RootProcedureState procStack = rollbackStack.get(rootProcId); + RootProcedureState procStack = rollbackStack.get(rootProcId); procStack.loadStack(proc); proc.setRootProcId(rootProcId); @@ -447,8 +496,7 @@ public class ProcedureExecutor { waitingSet.add(proc); break; case FAILED: - // add the proc to the scheduler to perform the rollback - scheduler.addBack(proc); + failedList.add(proc); break; case ROLLEDBACK: case INITIALIZING: @@ -462,13 +510,14 @@ public class ProcedureExecutor { // 3. Validate the stacks int corruptedCount = 0; - Iterator> itStack = rollbackStack.entrySet().iterator(); + Iterator>> itStack = + rollbackStack.entrySet().iterator(); while (itStack.hasNext()) { - Map.Entry entry = itStack.next(); - RootProcedureState procStack = entry.getValue(); + Map.Entry> entry = itStack.next(); + RootProcedureState procStack = entry.getValue(); if (procStack.isValid()) continue; - for (Procedure proc: procStack.getSubproceduresStack()) { + for (Procedure proc : procStack.getSubproceduresStack()) { LOG.error("Corrupted " + proc); procedures.remove(proc.getProcId()); runnableList.remove(proc); @@ -484,30 +533,22 @@ public class ProcedureExecutor { // 4. Push the procedures to the timeout executor if (waitingSet != null && !waitingSet.isEmpty()) { - for (Procedure proc: waitingSet) { + for (Procedure proc: waitingSet) { proc.afterReplay(getEnvironment()); timeoutExecutor.add(proc); } } - - // 5. Push the procedure to the scheduler - if (!runnableList.isEmpty()) { - // TODO: See ProcedureWALFormatReader#hasFastStartSupport - // some procedure may be started way before this stuff. - for (int i = runnableList.size() - 1; i >= 0; --i) { - Procedure proc = runnableList.get(i); - proc.afterReplay(getEnvironment()); - if (!proc.hasParent()) { - sendProcedureLoadedNotification(proc.getProcId()); - } - if (proc.wasExecuted()) { - scheduler.addFront(proc); - } else { - // if it was not in execution, it can wait. - scheduler.addBack(proc); - } + // 5. restore locks + restoreLocks(); + // 6. Push the procedure to the scheduler + failedList.forEach(scheduler::addBack); + runnableList.forEach(p -> { + p.afterReplay(getEnvironment()); + if (!p.hasParent()) { + sendProcedureLoadedNotification(p.getProcId()); } - } + scheduler.addBack(p); + }); } /** @@ -529,7 +570,7 @@ public class ProcedureExecutor { corePoolSize, maxPoolSize); this.threadGroup = new ThreadGroup("PEWorkerGroup"); - this.timeoutExecutor = new TimeoutExecutorThread(this, threadGroup); + this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup); // Create the workers workerId.set(0); @@ -581,7 +622,7 @@ public class ProcedureExecutor { timeoutExecutor.add(new WorkerMonitor()); // Add completed cleaner chore - addChore(new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap)); + addChore(new CompletedProcedureCleaner<>(conf, store, completed, nonceKeysToProcIdsMap)); } public void stop() { @@ -686,7 +727,7 @@ public class ProcedureExecutor { * Add a chore procedure to the executor * @param chore the chore to add */ - public void addChore(final ProcedureInMemoryChore chore) { + public void addChore(ProcedureInMemoryChore chore) { chore.setState(ProcedureState.WAITING_TIMEOUT); timeoutExecutor.add(chore); } @@ -696,7 +737,7 @@ public class ProcedureExecutor { * @param chore the chore to remove * @return whether the chore is removed, or it will be removed later */ - public boolean removeChore(final ProcedureInMemoryChore chore) { + public boolean removeChore(ProcedureInMemoryChore chore) { chore.setState(ProcedureState.SUCCESS); return timeoutExecutor.remove(chore); } @@ -830,17 +871,21 @@ public class ProcedureExecutor { * @param procOwner name of the owner of the procedure, used to inform the user * @param exception the failure to report to the user */ - public void setFailureResultForNonce(final NonceKey nonceKey, final String procName, - final User procOwner, final IOException exception) { - if (nonceKey == null) return; + public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, + IOException exception) { + if (nonceKey == null) { + return; + } - final Long procId = nonceKeysToProcIdsMap.get(nonceKey); - if (procId == null || completed.containsKey(procId)) return; + Long procId = nonceKeysToProcIdsMap.get(nonceKey); + if (procId == null || completed.containsKey(procId)) { + return; + } - Procedure proc = new FailedProcedure(procId.longValue(), - procName, procOwner, nonceKey, exception); + Procedure proc = + new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception); - completed.putIfAbsent(procId, new CompletedProcedureRetainer(proc)); + completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc)); } // ========================================================================== @@ -851,7 +896,7 @@ public class ProcedureExecutor { * @param proc the new procedure to execute. * @return the procedure id, that can be used to monitor the operation */ - public long submitProcedure(final Procedure proc) { + public long submitProcedure(Procedure proc) { return submitProcedure(proc, null); } @@ -863,7 +908,7 @@ public class ProcedureExecutor { */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", justification = "FindBugs is blind to the check-for-null") - public long submitProcedure(final Procedure proc, final NonceKey nonceKey) { + public long submitProcedure(Procedure proc, NonceKey nonceKey) { Preconditions.checkArgument(lastProcId.get() >= 0); prepareProcedure(proc); @@ -883,9 +928,7 @@ public class ProcedureExecutor { // Commit the transaction store.insert(proc, null); - if (LOG.isDebugEnabled()) { - LOG.debug("Stored " + proc); - } + LOG.debug("Stored {}", proc); // Add the procedure to the executor return pushProcedure(proc); @@ -896,7 +939,7 @@ public class ProcedureExecutor { * @param procs the new procedures to execute. */ // TODO: Do we need to take nonces here? - public void submitProcedures(final Procedure[] procs) { + public void submitProcedures(Procedure[] procs) { Preconditions.checkArgument(lastProcId.get() >= 0); if (procs == null || procs.length <= 0) { return; @@ -919,7 +962,7 @@ public class ProcedureExecutor { } } - private Procedure prepareProcedure(final Procedure proc) { + private Procedure prepareProcedure(Procedure proc) { Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); if (this.checkOwnerSet) { @@ -928,14 +971,14 @@ public class ProcedureExecutor { return proc; } - private long pushProcedure(final Procedure proc) { + private long pushProcedure(Procedure proc) { final long currentProcId = proc.getProcId(); // Update metrics on start of a procedure proc.updateMetricsOnSubmit(getEnvironment()); // Create the rollback stack for the procedure - RootProcedureState stack = new RootProcedureState(); + RootProcedureState stack = new RootProcedureState<>(); rollbackStack.put(currentProcId, stack); // Submit the new subprocedures @@ -952,7 +995,7 @@ public class ProcedureExecutor { * @param procId the procedure to abort * @return true if the procedure exists and has received the abort, otherwise false. */ - public boolean abort(final long procId) { + public boolean abort(long procId) { return abort(procId, true); } @@ -963,8 +1006,8 @@ public class ProcedureExecutor { * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? * @return true if the procedure exists and has received the abort, otherwise false. */ - public boolean abort(final long procId, final boolean mayInterruptIfRunning) { - final Procedure proc = procedures.get(procId); + public boolean abort(long procId, boolean mayInterruptIfRunning) { + Procedure proc = procedures.get(procId); if (proc != null) { if (!mayInterruptIfRunning && proc.wasExecuted()) { return false; @@ -977,20 +1020,20 @@ public class ProcedureExecutor { // ========================================================================== // Executor query helpers // ========================================================================== - public Procedure getProcedure(final long procId) { + public Procedure getProcedure(final long procId) { return procedures.get(procId); } - public T getProcedure(final Class clazz, final long procId) { - final Procedure proc = getProcedure(procId); + public > T getProcedure(Class clazz, long procId) { + Procedure proc = getProcedure(procId); if (clazz.isInstance(proc)) { - return (T)proc; + return clazz.cast(proc); } return null; } - public Procedure getResult(final long procId) { - CompletedProcedureRetainer retainer = completed.get(procId); + public Procedure getResult(long procId) { + CompletedProcedureRetainer retainer = completed.get(procId); if (retainer == null) { return null; } else { @@ -1014,8 +1057,8 @@ public class ProcedureExecutor { * @param procId the ID of the procedure to check * @return true if the procedure execution is started, otherwise false. */ - public boolean isStarted(final long procId) { - final Procedure proc = procedures.get(procId); + public boolean isStarted(long procId) { + Procedure proc = procedures.get(procId); if (proc == null) { return completed.get(procId) != null; } @@ -1026,13 +1069,11 @@ public class ProcedureExecutor { * Mark the specified completed procedure, as ready to remove. * @param procId the ID of the procedure to remove */ - public void removeResult(final long procId) { - CompletedProcedureRetainer retainer = completed.get(procId); + public void removeResult(long procId) { + CompletedProcedureRetainer retainer = completed.get(procId); if (retainer == null) { assert !procedures.containsKey(procId) : "pid=" + procId + " is still running"; - if (LOG.isDebugEnabled()) { - LOG.debug("pid=" + procId + " already removed by the cleaner."); - } + LOG.debug("pid={} already removed by the cleaner.", procId); return; } @@ -1040,8 +1081,8 @@ public class ProcedureExecutor { retainer.setClientAckTime(EnvironmentEdgeManager.currentTime()); } - public Procedure getResultOrProcedure(final long procId) { - CompletedProcedureRetainer retainer = completed.get(procId); + public Procedure getResultOrProcedure(long procId) { + CompletedProcedureRetainer retainer = completed.get(procId); if (retainer == null) { return procedures.get(procId); } else { @@ -1056,15 +1097,16 @@ public class ProcedureExecutor { * @return true if the user is the owner of the procedure, * false otherwise or the owner is unknown. */ - public boolean isProcedureOwner(final long procId, final User user) { - if (user == null) return false; - - final Procedure runningProc = procedures.get(procId); + public boolean isProcedureOwner(long procId, User user) { + if (user == null) { + return false; + } + final Procedure runningProc = procedures.get(procId); if (runningProc != null) { return runningProc.getOwner().equals(user.getShortName()); } - final CompletedProcedureRetainer retainer = completed.get(procId); + final CompletedProcedureRetainer retainer = completed.get(procId); if (retainer != null) { return retainer.getProcedure().getOwner().equals(user.getShortName()); } @@ -1078,19 +1120,17 @@ public class ProcedureExecutor { * Get procedures. * @return the procedures in a list */ - public List> getProcedures() { - final List> procedureLists = new ArrayList<>(procedures.size() + completed.size()); - for (Procedure procedure : procedures.values()) { - procedureLists.add(procedure); - } + public List> getProcedures() { + List> procedureList = + new ArrayList<>(procedures.size() + completed.size()); + procedureList.addAll(procedures.values()); // Note: The procedure could show up twice in the list with different state, as // it could complete after we walk through procedures list and insert into // procedureList - it is ok, as we will use the information in the Procedure // to figure it out; to prevent this would increase the complexity of the logic. - for (CompletedProcedureRetainer retainer: completed.values()) { - procedureLists.add(retainer.getProcedure()); - } - return procedureLists; + completed.values().stream().map(CompletedProcedureRetainer::getProcedure) + .forEach(procedureList::add); + return procedureList; } // ========================================================================== @@ -1169,14 +1209,14 @@ public class ProcedureExecutor { return procedures.keySet(); } - Long getRootProcedureId(Procedure proc) { + Long getRootProcedureId(Procedure proc) { return Procedure.getRootProcedureId(procedures, proc); } // ========================================================================== // Executions // ========================================================================== - private void executeProcedure(final Procedure proc) { + private void executeProcedure(Procedure proc) { final Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback @@ -1185,7 +1225,7 @@ public class ProcedureExecutor { return; } - final RootProcedureState procStack = rollbackStack.get(rootProcId); + RootProcedureState procStack = rollbackStack.get(rootProcId); if (procStack == null) { LOG.warn("RootProcedureState is null for " + proc.getProcId()); return; @@ -1197,7 +1237,7 @@ public class ProcedureExecutor { // we have the 'rollback-lock' we can start rollingback switch (executeRollback(rootProcId, procStack)) { case LOCK_ACQUIRED: - break; + break; case LOCK_YIELD_WAIT: procStack.unsetRollback(); scheduler.yield(proc); @@ -1239,7 +1279,6 @@ public class ProcedureExecutor { switch (lockState) { case LOCK_ACQUIRED: execProcedure(procStack, proc); - releaseLock(proc, false); break; case LOCK_YIELD_WAIT: LOG.info(lockState + " " + proc); @@ -1254,12 +1293,6 @@ public class ProcedureExecutor { } procStack.release(proc); - // allows to kill the executor before something is stored to the wal. - // useful to test the procedure recovery. - if (testing != null && !isRunning()) { - break; - } - if (proc.isSuccess()) { // update metrics on finishing the procedure proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); @@ -1275,33 +1308,31 @@ public class ProcedureExecutor { } while (procStack.isFailed()); } - private LockState acquireLock(final Procedure proc) { - final TEnvironment env = getEnvironment(); - // hasLock() is used in conjunction with holdLock(). - // This allows us to not rewrite or carry around the hasLock() flag - // for every procedure. the hasLock() have meaning only if holdLock() is true. - if (proc.holdLock(env) && proc.hasLock(env)) { + private LockState acquireLock(Procedure proc) { + TEnvironment env = getEnvironment(); + // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if + // hasLock is true. + if (proc.hasLock()) { return LockState.LOCK_ACQUIRED; } - return proc.doAcquireLock(env); + return proc.doAcquireLock(env, store); } - private void releaseLock(final Procedure proc, final boolean force) { - final TEnvironment env = getEnvironment(); + private void releaseLock(Procedure proc, boolean force) { + TEnvironment env = getEnvironment(); // For how the framework works, we know that we will always have the lock // when we call releaseLock(), so we can avoid calling proc.hasLock() - if (force || !proc.holdLock(env)) { - proc.doReleaseLock(env); + if (force || !proc.holdLock(env) || proc.isFinished()) { + proc.doReleaseLock(env, store); } } /** - * Execute the rollback of the full procedure stack. - * Once the procedure is rolledback, the root-procedure will be visible as - * finished to user, and the result will be the fatal exception. + * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the + * root-procedure will be visible as finished to user, and the result will be the fatal exception. */ - private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) { - final Procedure rootProc = procedures.get(rootProcId); + private LockState executeRollback(long rootProcId, RootProcedureState procStack) { + Procedure rootProc = procedures.get(rootProcId); RemoteProcedureException exception = rootProc.getException(); // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are // rolling back because the subprocedure does. Clarify. @@ -1311,13 +1342,13 @@ public class ProcedureExecutor { store.update(rootProc); } - final List subprocStack = procStack.getSubproceduresStack(); + List> subprocStack = procStack.getSubproceduresStack(); assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; int stackTail = subprocStack.size(); boolean reuseLock = false; while (stackTail --> 0) { - final Procedure proc = subprocStack.get(stackTail); + Procedure proc = subprocStack.get(stackTail); LockState lockState; if (!reuseLock && (lockState = acquireLock(proc)) != LockState.LOCK_ACQUIRED) { @@ -1334,7 +1365,7 @@ public class ProcedureExecutor { // (e.g. StateMachineProcedure reuse the same instance) // we can avoid to lock/unlock each step reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback; - if (!reuseLock) { + if (!reuseLock && proc.hasLock()) { releaseLock(proc, false); } @@ -1368,13 +1399,11 @@ public class ProcedureExecutor { * It updates the store with the new state (stack index) * or will remove completly the procedure in case it is a child. */ - private LockState executeRollback(final Procedure proc) { + private LockState executeRollback(Procedure proc) { try { proc.doRollback(getEnvironment()); } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Roll back attempt failed for " + proc, e); - } + LOG.debug("Roll back attempt failed for {}", proc, e); return LockState.LOCK_YIELD_WAIT; } catch (InterruptedException e) { handleInterruptedException(proc, e); @@ -1387,9 +1416,10 @@ public class ProcedureExecutor { // allows to kill the executor before something is stored to the wal. // useful to test the procedure recovery. if (testing != null && testing.shouldKillBeforeStoreUpdate()) { - LOG.debug("TESTING: Kill before store update"); + String msg = "TESTING: Kill before store update"; + LOG.debug(msg); stop(); - return LockState.LOCK_YIELD_WAIT; + throw new RuntimeException(msg); } if (proc.removeStackIndex()) { @@ -1416,6 +1446,11 @@ public class ProcedureExecutor { return LockState.LOCK_ACQUIRED; } + private void yieldProcedure(Procedure proc) { + releaseLock(proc, false); + scheduler.yield(proc); + } + /** * Executes procedure *

    @@ -1445,10 +1480,10 @@ public class ProcedureExecutor { * *
*/ - private void execProcedure(final RootProcedureState procStack, - final Procedure procedure) { + private void execProcedure(RootProcedureState procStack, + Procedure procedure) { Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, - procedure.toString()); + procedure.toString()); // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. // The exception is caught below and then we hurry to the exit without disturbing state. The @@ -1475,22 +1510,16 @@ public class ProcedureExecutor { subprocs = null; } } catch (ProcedureSuspendedException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Suspend " + procedure); - } + LOG.trace("Suspend {}", procedure); suspended = true; } catch (ProcedureYieldException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Yield " + procedure + ": " + e.getMessage(), e); - } - scheduler.yield(procedure); + LOG.trace("Yield {}", procedure, e); + yieldProcedure(procedure); return; } catch (InterruptedException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e); - } + LOG.trace("Yield interrupt {}", procedure, e); handleInterruptedException(procedure, e); - scheduler.yield(procedure); + yieldProcedure(procedure); return; } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... @@ -1506,9 +1535,7 @@ public class ProcedureExecutor { // i.e. we go around this loop again rather than go back out on the scheduler queue. subprocs = null; reExecute = true; - if (LOG.isTraceEnabled()) { - LOG.trace("Short-circuit to next step on pid=" + procedure.getProcId()); - } + LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId()); } else { // Yield the current procedure, and make the subprocedure runnable // subprocs may come back 'null'. @@ -1519,9 +1546,7 @@ public class ProcedureExecutor { collect(Collectors.toList()).toString())); } } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { - if (LOG.isTraceEnabled()) { - LOG.trace("Added to timeoutExecutor " + procedure); - } + LOG.trace("Added to timeoutExecutor {}", procedure); timeoutExecutor.add(procedure); } else if (!suspended) { // No subtask, so we are done @@ -1535,9 +1560,10 @@ public class ProcedureExecutor { // allows to kill the executor before something is stored to the wal. // useful to test the procedure recovery. if (testing != null && testing.shouldKillBeforeStoreUpdate(suspended)) { - LOG.debug("TESTING: Kill before store update: " + procedure); + String msg = "TESTING: Kill before store update: " + procedure; + LOG.debug(msg); stop(); - return; + throw new RuntimeException(msg); } // TODO: The code here doesn't check if store is running before persisting to the store as @@ -1551,11 +1577,13 @@ public class ProcedureExecutor { updateStoreOnExec(procStack, procedure, subprocs); // if the store is not running we are aborting - if (!store.isRunning()) return; + if (!store.isRunning()) { + return; + } // if the procedure is kind enough to pass the slot to someone else, yield if (procedure.isRunnable() && !suspended && procedure.isYieldAfterExecutionStep(getEnvironment())) { - scheduler.yield(procedure); + yieldProcedure(procedure); return; } @@ -1566,6 +1594,11 @@ public class ProcedureExecutor { submitChildrenProcedures(subprocs); } + // we need to log the release lock operation before waking up the parent procedure, as there + // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all + // the sub procedures from store and cause problems... + releaseLock(procedure, false); + // if the procedure is complete and has a parent, count down the children latch. // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. if (!suspended && procedure.isFinished() && procedure.hasParent()) { @@ -1573,12 +1606,12 @@ public class ProcedureExecutor { } } - private Procedure[] initializeChildren(final RootProcedureState procStack, - final Procedure procedure, final Procedure[] subprocs) { + private Procedure[] initializeChildren(RootProcedureState procStack, + Procedure procedure, Procedure[] subprocs) { assert subprocs != null : "expected subprocedures"; final long rootProcId = getRootProcedureId(procedure); for (int i = 0; i < subprocs.length; ++i) { - final Procedure subproc = subprocs[i]; + Procedure subproc = subprocs[i]; if (subproc == null) { String msg = "subproc[" + i + "] is null, aborting the procedure"; procedure.setFailure(new RemoteProcedureException(msg, @@ -1609,9 +1642,9 @@ public class ProcedureExecutor { return subprocs; } - private void submitChildrenProcedures(final Procedure[] subprocs) { + private void submitChildrenProcedures(Procedure[] subprocs) { for (int i = 0; i < subprocs.length; ++i) { - final Procedure subproc = subprocs[i]; + Procedure subproc = subprocs[i]; subproc.updateMetricsOnSubmit(getEnvironment()); assert !procedures.containsKey(subproc.getProcId()); procedures.put(subproc.getProcId(), subproc); @@ -1619,8 +1652,9 @@ public class ProcedureExecutor { } } - private void countDownChildren(final RootProcedureState procStack, final Procedure procedure) { - final Procedure parent = procedures.get(procedure.getParentProcId()); + private void countDownChildren(RootProcedureState procStack, + Procedure procedure) { + Procedure parent = procedures.get(procedure.getParentProcId()); if (parent == null) { assert procStack.isRollingback(); return; @@ -1637,17 +1671,15 @@ public class ProcedureExecutor { } } - private void updateStoreOnExec(final RootProcedureState procStack, - final Procedure procedure, final Procedure[] subprocs) { + private void updateStoreOnExec(RootProcedureState procStack, + Procedure procedure, Procedure[] subprocs) { if (subprocs != null && !procedure.isFailed()) { if (LOG.isTraceEnabled()) { LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs)); } store.insert(procedure, subprocs); } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Store update " + procedure); - } + LOG.trace("Store update {}", procedure); if (procedure.isFinished() && !procedure.hasParent()) { // remove child procedures final long[] childProcIds = procStack.getSubprocedureIds(); @@ -1665,11 +1697,8 @@ public class ProcedureExecutor { } } - private void handleInterruptedException(final Procedure proc, final InterruptedException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Interrupt during " + proc + ". suspend and retry it later.", e); - } - + private void handleInterruptedException(Procedure proc, InterruptedException e) { + LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e); // NOTE: We don't call Thread.currentThread().interrupt() // because otherwise all the subsequent calls e.g. Thread.sleep() will throw // the InterruptedException. If the master is going down, we will be notified @@ -1677,9 +1706,13 @@ public class ProcedureExecutor { // (The interrupted procedure will be retried on the next run) } - private void execCompletionCleanup(final Procedure proc) { + private void execCompletionCleanup(Procedure proc) { final TEnvironment env = getEnvironment(); - if (proc.holdLock(env) && proc.hasLock(env)) { + if (proc.hasLock()) { + LOG.warn("Usually this should not happen, we will release the lock before if the procedure" + + " is finished, even if the holdLock is true, arrive here means we have some holes where" + + " we do not release the lock. And the releaseLock below may fail since the procedure may" + + " have already been deleted from the procedure store."); releaseLock(proc, true); } try { @@ -1690,11 +1723,11 @@ public class ProcedureExecutor { } } - private void procedureFinished(final Procedure proc) { + private void procedureFinished(Procedure proc) { // call the procedure completion cleanup handler execCompletionCleanup(proc); - CompletedProcedureRetainer retainer = new CompletedProcedureRetainer(proc); + CompletedProcedureRetainer retainer = new CompletedProcedureRetainer<>(proc); // update the executor internal state maps if (!proc.shouldWaitClientAck(getEnvironment())) { @@ -1710,14 +1743,14 @@ public class ProcedureExecutor { scheduler.completionCleanup(proc); } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... - LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: " + proc, e); + LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e); } // Notify the listeners sendProcedureFinishedNotification(proc.getProcId()); } - RootProcedureState getProcStack(long rootProcId) { + RootProcedureState getProcStack(long rootProcId) { return rollbackStack.get(rootProcId); } @@ -1726,7 +1759,7 @@ public class ProcedureExecutor { // ========================================================================== private class WorkerThread extends StoppableThread { private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); - private volatile Procedure activeProcedure; + private volatile Procedure activeProcedure; public WorkerThread(ThreadGroup group) { this(group, "PEWorker-"); @@ -1747,7 +1780,7 @@ public class ProcedureExecutor { long lastUpdate = EnvironmentEdgeManager.currentTime(); try { while (isRunning() && keepAlive(lastUpdate)) { - Procedure proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); + Procedure proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); if (proc == null) { continue; } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java index c42dfc4..1215008 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java @@ -202,6 +202,9 @@ public final class ProcedureUtil { builder.setNonce(proc.getNonceKey().getNonce()); } + if (proc.hasLock()) { + builder.setLocked(true); + } return builder.build(); } @@ -255,6 +258,10 @@ public final class ProcedureUtil { proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce())); } + if (proto.getLocked()) { + proc.lockedWhenLoading(); + } + ProcedureStateSerializer serializer = null; if (proto.getStateMessageCount() > 0) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java index 46185ea..2fc0030 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java @@ -22,11 +22,9 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; - import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; /** @@ -42,8 +40,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu */ @InterfaceAudience.Private @InterfaceStability.Evolving -class RootProcedureState { - private static final Logger LOG = LoggerFactory.getLogger(RootProcedureState.class); +class RootProcedureState { private enum State { RUNNING, // The Procedure is running or ready to run @@ -51,8 +48,8 @@ class RootProcedureState { ROLLINGBACK, // The Procedure failed and the execution was rolledback } - private Set subprocs = null; - private ArrayList subprocStack = null; + private Set> subprocs = null; + private ArrayList> subprocStack = null; private State state = State.RUNNING; private int running = 0; @@ -91,22 +88,19 @@ class RootProcedureState { } protected synchronized long[] getSubprocedureIds() { - if (subprocs == null) return null; - int index = 0; - final long[] subIds = new long[subprocs.size()]; - for (Procedure proc: subprocs) { - subIds[index++] = proc.getProcId(); + if (subprocs == null) { + return null; } - return subIds; + return subprocs.stream().mapToLong(Procedure::getProcId).toArray(); } - protected synchronized List getSubproceduresStack() { + protected synchronized List> getSubproceduresStack() { return subprocStack; } protected synchronized RemoteProcedureException getException() { if (subprocStack != null) { - for (Procedure proc: subprocStack) { + for (Procedure proc: subprocStack) { if (proc.hasException()) { return proc.getException(); } @@ -118,8 +112,10 @@ class RootProcedureState { /** * Called by the ProcedureExecutor to mark the procedure step as running. */ - protected synchronized boolean acquire(final Procedure proc) { - if (state != State.RUNNING) return false; + protected synchronized boolean acquire(Procedure proc) { + if (state != State.RUNNING) { + return false; + } running++; return true; @@ -128,7 +124,7 @@ class RootProcedureState { /** * Called by the ProcedureExecutor to mark the procedure step as finished. */ - protected synchronized void release(final Procedure proc) { + protected synchronized void release(Procedure proc) { running--; } @@ -142,7 +138,7 @@ class RootProcedureState { * Called by the ProcedureExecutor after the procedure step is completed, * to add the step to the rollback list (or procedure stack) */ - protected synchronized void addRollbackStep(final Procedure proc) { + protected synchronized void addRollbackStep(Procedure proc) { if (proc.isFailed()) { state = State.FAILED; } @@ -153,8 +149,10 @@ class RootProcedureState { subprocStack.add(proc); } - protected synchronized void addSubProcedure(final Procedure proc) { - if (!proc.hasParent()) return; + protected synchronized void addSubProcedure(Procedure proc) { + if (!proc.hasParent()) { + return; + } if (subprocs == null) { subprocs = new HashSet<>(); } @@ -168,7 +166,7 @@ class RootProcedureState { * to the store only the Procedure we executed, and nothing else. * on load we recreate the full stack by aggregating each procedure stack-positions. */ - protected synchronized void loadStack(final Procedure proc) { + protected synchronized void loadStack(Procedure proc) { addSubProcedure(proc); int[] stackIndexes = proc.getStackIndexes(); if (stackIndexes != null) { @@ -196,7 +194,7 @@ class RootProcedureState { */ protected synchronized boolean isValid() { if (subprocStack != null) { - for (Procedure proc: subprocStack) { + for (Procedure proc : subprocStack) { if (proc == null) { return false; } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index e5e3230..49c9dfc 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -31,15 +31,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * @see InlineChore */ @InterfaceAudience.Private -class TimeoutExecutorThread extends StoppableThread { +class TimeoutExecutorThread extends StoppableThread { private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class); - private final ProcedureExecutor executor; + private final ProcedureExecutor executor; private final DelayQueue queue = new DelayQueue<>(); - public TimeoutExecutorThread(ProcedureExecutor executor, ThreadGroup group) { + public TimeoutExecutorThread(ProcedureExecutor executor, ThreadGroup group) { super(group, "ProcExecTimeout"); setDaemon(true); this.executor = executor; @@ -65,7 +65,7 @@ class TimeoutExecutorThread extends StoppableThread { if (task instanceof InlineChore) { execInlineChore((InlineChore) task); } else if (task instanceof DelayedProcedure) { - execDelayedProcedure((DelayedProcedure) task); + execDelayedProcedure((DelayedProcedure) task); } else { LOG.error("CODE-BUG unknown timeout task type {}", task); } @@ -77,15 +77,15 @@ class TimeoutExecutorThread extends StoppableThread { queue.add(chore); } - public void add(Procedure procedure) { + public void add(Procedure procedure) { assert procedure.getState() == ProcedureState.WAITING_TIMEOUT; LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), procedure.getTimeoutTimestamp()); - queue.add(new DelayedProcedure(procedure)); + queue.add(new DelayedProcedure<>(procedure)); } - public boolean remove(Procedure procedure) { - return queue.remove(new DelayedProcedure(procedure)); + public boolean remove(Procedure procedure) { + return queue.remove(new DelayedProcedure<>(procedure)); } private void execInlineChore(InlineChore chore) { @@ -93,13 +93,13 @@ class TimeoutExecutorThread extends StoppableThread { add(chore); } - private void execDelayedProcedure(DelayedProcedure delayed) { + private void execDelayedProcedure(DelayedProcedure delayed) { // TODO: treat this as a normal procedure, add it to the scheduler and // let one of the workers handle it. // Today we consider ProcedureInMemoryChore as InlineChores - Procedure procedure = delayed.getObject(); + Procedure procedure = delayed.getObject(); if (procedure instanceof ProcedureInMemoryChore) { - executeInMemoryChore((ProcedureInMemoryChore) procedure); + executeInMemoryChore((ProcedureInMemoryChore) procedure); // if the procedure is in a waiting state again, put it back in the queue procedure.updateTimestamp(); if (procedure.isWaiting()) { @@ -111,7 +111,7 @@ class TimeoutExecutorThread extends StoppableThread { } } - private void executeInMemoryChore(ProcedureInMemoryChore chore) { + private void executeInMemoryChore(ProcedureInMemoryChore chore) { if (!chore.isWaiting()) { return; } @@ -126,7 +126,7 @@ class TimeoutExecutorThread extends StoppableThread { } } - private void executeTimedoutProcedure(Procedure proc) { + private void executeTimedoutProcedure(Procedure proc) { // The procedure received a timeout. if the procedure itself does not handle it, // call abort() and add the procedure back in the queue for rollback. if (proc.setTimeoutFailure(executor.getEnvironment())) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java index 319ddb2..2bbd53d 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; @@ -42,7 +43,12 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; -@Category({MasterTests.class, LargeTests.class}) +/** + * For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so + * we should use lock to obtain the correct order. Ignored. + */ +@Ignore +@Category({ MasterTests.class, LargeTests.class }) public class TestProcedureReplayOrder { @ClassRule diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java index a9e919c..c1c9187 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java @@ -227,7 +227,6 @@ public class TestProcedureSuspended { protected void releaseLock(final TestProcEnv env) { LOG.info("RELEASE LOCK " + this + " " + hasLock); lock.set(false); - hasLock = false; } @Override @@ -235,11 +234,6 @@ public class TestProcedureSuspended { return true; } - @Override - protected boolean hasLock(final TestProcEnv env) { - return hasLock; - } - public ArrayList getTimestamps() { return timestamps; } diff --git a/hbase-protocol-shaded/src/main/protobuf/Procedure.proto b/hbase-protocol-shaded/src/main/protobuf/Procedure.proto index 2c5f1aa..b4a3107 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Procedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Procedure.proto @@ -63,6 +63,9 @@ message Procedure { // Nonce to prevent same procedure submit by multiple times optional uint64 nonce_group = 13 [default = 0]; optional uint64 nonce = 14 [default = 0]; + + // whether the procedure has held the lock + optional bool locked = 16 [default = false]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java index 7ad5b56..5af7614 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java @@ -81,8 +81,8 @@ class ClusterSchemaServiceImpl extends AbstractService implements ClusterSchemaS return this.tableNamespaceManager; } - private long submitProcedure(final Procedure procedure, final NonceKey nonceKey) - throws ServiceNotRunningException { + private long submitProcedure(final Procedure procedure, + final NonceKey nonceKey) throws ServiceNotRunningException { checkIsRunning(); ProcedureExecutor pe = this.masterServices.getMasterProcedureExecutor(); return pe.submitProcedure(procedure, nonceKey); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index f1bec35..b7148d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -923,7 +923,7 @@ public class HMaster extends HRegionServer implements MasterServices { InitMetaProcedure initMetaProc = null; if (assignmentManager.getRegionStates().getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO) .isOffline()) { - Optional> optProc = procedureExecutor.getProcedures().stream() + Optional> optProc = procedureExecutor.getProcedures().stream() .filter(p -> p instanceof InitMetaProcedure).findAny(); if (optProc.isPresent()) { initMetaProc = (InitMetaProcedure) optProc.get(); @@ -3202,7 +3202,8 @@ public class HMaster extends HRegionServer implements MasterServices { cpHost.preGetProcedures(); } - final List> procList = this.procedureExecutor.getProcedures(); + @SuppressWarnings({ "unchecked", "rawtypes" }) + List> procList = (List) this.procedureExecutor.getProcedures(); if (cpHost != null) { cpHost.postGetProcedures(procList); @@ -3717,7 +3718,7 @@ public class HMaster extends HRegionServer implements MasterServices { HashMap>> replicationLoadSourceMap = new HashMap<>(peerList.size()); peerList.stream() - .forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList())); + .forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList<>())); for (ServerName serverName : serverNames) { List replicationLoadSources = getServerManager().getLoad(serverName).getReplicationLoadSourceList(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java index bbb27e1..0b6e45b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/GCRegionProcedure.java @@ -148,9 +148,4 @@ public class GCRegionProcedure extends AbstractStateMachineRegionProcedure { private static final Logger LOG = LoggerFactory.getLogger(MergeTableRegionsProcedure.class); private Boolean traceEnabled; - private volatile boolean lock = false; private ServerName regionLocation; private RegionInfo[] regionsToMerge; private RegionInfo mergedRegion; @@ -414,24 +413,20 @@ public class MergeTableRegionsProcedure @Override protected LockState acquireLock(final MasterProcedureEnv env) { - if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT; if (env.getProcedureScheduler().waitRegions(this, getTableName(), mergedRegion, regionsToMerge[0], regionsToMerge[1])) { try { LOG.debug(LockState.LOCK_EVENT_WAIT + " " + env.getProcedureScheduler().dumpLocks()); } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + // Ignore, just for logging } return LockState.LOCK_EVENT_WAIT; } - this.lock = true; return LockState.LOCK_ACQUIRED; } @Override protected void releaseLock(final MasterProcedureEnv env) { - this.lock = false; env.getProcedureScheduler().wakeRegions(this, getTableName(), mergedRegion, regionsToMerge[0], regionsToMerge[1]); } @@ -442,11 +437,6 @@ public class MergeTableRegionsProcedure } @Override - protected boolean hasLock(MasterProcedureEnv env) { - return this.lock; - } - - @Override public TableName getTableName() { return mergedRegion.getTable(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index c3b2458..4054778 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -34,14 +34,16 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; + /** * Base class for the Assign and Unassign Procedure. * @@ -114,8 +116,6 @@ public abstract class RegionTransitionProcedure */ private int attempt; - private volatile boolean lock = false; - // Required by the Procedure framework to create the procedure on replay public RegionTransitionProcedure() {} @@ -419,15 +419,17 @@ public abstract class RegionTransitionProcedure } @Override - protected LockState acquireLock(final MasterProcedureEnv env) { + protected boolean waitInitialized(MasterProcedureEnv env) { // Unless we are assigning meta, wait for meta to be available and loaded. - if (!isMeta()) { - AssignmentManager am = env.getAssignmentManager(); - if (am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo)) { - return LockState.LOCK_EVENT_WAIT; - } + if (isMeta()) { + return false; } + AssignmentManager am = env.getAssignmentManager(); + return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo); + } + @Override + protected LockState acquireLock(final MasterProcedureEnv env) { // TODO: Revisit this and move it to the executor if (env.getProcedureScheduler().waitRegion(this, getRegionInfo())) { try { @@ -438,14 +440,12 @@ public abstract class RegionTransitionProcedure } return LockState.LOCK_EVENT_WAIT; } - this.lock = true; return LockState.LOCK_ACQUIRED; } @Override protected void releaseLock(final MasterProcedureEnv env) { env.getProcedureScheduler().wakeRegion(this, getRegionInfo()); - lock = false; } @Override @@ -454,11 +454,6 @@ public abstract class RegionTransitionProcedure } @Override - protected boolean hasLock(final MasterProcedureEnv env) { - return lock; - } - - @Override protected boolean shouldWaitClientAck(MasterProcedureEnv env) { // The operation is triggered internally on the server // the client does not know about this procedure. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java index b4c55f4..3a87bbc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java @@ -76,8 +76,6 @@ public final class LockProcedure extends Procedure private String description; // True when recovery of master lock from WALs private boolean recoveredMasterLock; - // this is for internal working - private boolean hasLock; private final ProcedureEvent event = new ProcedureEvent<>(this); // True if this proc acquired relevant locks. This value is for client checks. @@ -306,7 +304,6 @@ public final class LockProcedure extends Procedure protected LockState acquireLock(final MasterProcedureEnv env) { boolean ret = lock.acquireLock(env); locked.set(ret); - hasLock = ret; if (ret) { if (LOG.isDebugEnabled()) { LOG.debug("LOCKED " + toString()); @@ -321,7 +318,6 @@ public final class LockProcedure extends Procedure @Override protected void releaseLock(final MasterProcedureEnv env) { lock.releaseLock(env); - hasLock = false; } /** @@ -423,11 +419,6 @@ public final class LockProcedure extends Procedure return true; } - @Override - public boolean hasLock(final MasterProcedureEnv env) { - return hasLock; - } - /////////////////////// // LOCK IMPLEMENTATIONS /////////////////////// diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java index 574706a..341d116 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java @@ -66,8 +66,12 @@ public abstract class AbstractStateMachineNamespaceProcedure } @Override + protected boolean waitInitialized(MasterProcedureEnv env) { + return env.waitInitialized(this); + } + + @Override protected LockState acquireLock(final MasterProcedureEnv env) { - if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT; if (env.getProcedureScheduler().waitNamespaceExclusiveLock(this, getNamespaceName())) { return LockState.LOCK_EVENT_WAIT; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java index e711ca0..3b5e3b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; public abstract class AbstractStateMachineRegionProcedure extends AbstractStateMachineTableProcedure { private RegionInfo hri; - private volatile boolean lock = false; public AbstractStateMachineRegionProcedure(final MasterProcedureEnv env, final RegionInfo hri) { @@ -100,25 +99,17 @@ public abstract class AbstractStateMachineRegionProcedure @Override protected LockState acquireLock(final MasterProcedureEnv env) { - if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT; if (env.getProcedureScheduler().waitRegions(this, getTableName(), getRegion())) { return LockState.LOCK_EVENT_WAIT; } - this.lock = true; return LockState.LOCK_ACQUIRED; } @Override protected void releaseLock(final MasterProcedureEnv env) { - this.lock = false; env.getProcedureScheduler().wakeRegions(this, getTableName(), getRegion()); } - @Override - protected boolean hasLock(final MasterProcedureEnv env) { - return this.lock; - } - protected void setFailure(Throwable cause) { super.setFailure(getClass().getSimpleName(), cause); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java index 1af2445..50a0149 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java @@ -89,10 +89,12 @@ public abstract class AbstractStateMachineTableProcedure } @Override + protected boolean waitInitialized(MasterProcedureEnv env) { + return env.waitInitialized(this); + } + + @Override protected LockState acquireLock(final MasterProcedureEnv env) { - if (env.waitInitialized(this)) { - return LockState.LOCK_EVENT_WAIT; - } if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) { return LockState.LOCK_EVENT_WAIT; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java index c63f420..2f56e83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java @@ -131,7 +131,7 @@ public class CreateNamespaceProcedure @Override protected CreateNamespaceState getState(final int stateId) { - return CreateNamespaceState.valueOf(stateId); + return CreateNamespaceState.forNumber(stateId); } @Override @@ -171,15 +171,18 @@ public class CreateNamespaceProcedure } @Override - protected LockState acquireLock(final MasterProcedureEnv env) { - if (!env.getMasterServices().isInitialized()) { - // Namespace manager might not be ready if master is not fully initialized, - // return false to reject user namespace creation; return true for default - // and system namespace creation (this is part of master initialization). - if (!isBootstrapNamespace() && env.waitInitialized(this)) { - return LockState.LOCK_EVENT_WAIT; - } + protected boolean waitInitialized(MasterProcedureEnv env) { + // Namespace manager might not be ready if master is not fully initialized, + // return false to reject user namespace creation; return true for default + // and system namespace creation (this is part of master initialization). + if (isBootstrapNamespace()) { + return false; } + return env.waitInitialized(this); + } + + @Override + protected LockState acquireLock(final MasterProcedureEnv env) { if (env.getProcedureScheduler().waitNamespaceExclusiveLock(this, getNamespaceName())) { return LockState.LOCK_EVENT_WAIT; } @@ -263,20 +266,6 @@ public class CreateNamespaceProcedure } } - /** - * remove quota for the namespace if exists - * @param env MasterProcedureEnv - * @throws IOException - **/ - private void rollbackSetNamespaceQuota(final MasterProcedureEnv env) throws IOException { - try { - DeleteNamespaceProcedure.removeNamespaceQuota(env, nsDescriptor.getName()); - } catch (Exception e) { - // Ignore exception - LOG.debug("Rollback of setNamespaceQuota throws exception: " + e); - } - } - private static TableNamespaceManager getTableNamespaceManager(final MasterProcedureEnv env) { return env.getMasterServices().getClusterSchema().getTableNamespaceManager(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java index acee1af..faad3dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -220,10 +220,16 @@ public class CreateTableProcedure } @Override - protected LockState acquireLock(final MasterProcedureEnv env) { - if (!getTableName().isSystemTable() && env.waitInitialized(this)) { - return LockState.LOCK_EVENT_WAIT; + protected boolean waitInitialized(MasterProcedureEnv env) { + if (getTableName().isSystemTable()) { + // Creating system table is part of the initialization, so do not wait here. + return false; } + return super.waitInitialized(env); + } + + @Override + protected LockState acquireLock(final MasterProcedureEnv env) { if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) { return LockState.LOCK_EVENT_WAIT; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java index 4736d65..d984632 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java @@ -63,8 +63,13 @@ public class InitMetaProcedure extends AbstractStateMachineTableProcedure> void doAdd(final FairQueue fairq, final Queue queue, final Procedure proc, final boolean addFront) { - if (!queue.getLockStatus().hasExclusiveLock() || - queue.getLockStatus().isLockOwner(proc.getProcId())) { - // if the queue was not remove for an xlock execution - // or the proc is the lock owner, put the queue back into execution + if (!queue.getLockStatus().hasExclusiveLock()) { + // if the queue was not remove for an xlock execution,put the queue back into execution queue.add(proc, addFront); addToRunQueue(fairq, queue); - } else if (queue.getLockStatus().hasParentLock(proc)) { - // always add it to front as its parent has the xlock - // usually the addFront is true if we arrive here as we will call addFront for adding sub - // proc, but sometimes we may retry on the proc which means we will arrive here through yield, - // so it is possible the addFront here is false. + } else if (queue.getLockStatus().hasLockAccess(proc)) { + // always add it to front as the have the lock access. queue.add(proc, true); - // our (proc) parent has the xlock, - // so the queue is not in the fairq (run-queue) - // add it back to let the child run (inherit the lock) addToRunQueue(fairq, queue); } else { queue.add(proc, addFront); @@ -386,9 +378,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { if (proc != null) { priority = MasterProcedureUtil.getServerPriority(proc); } else { - LOG.warn("Usually this should not happen as proc can only be null when calling from " + - "wait/wake lock, which means at least we should have one procedure in the queue which " + - "wants to acquire the lock or just released the lock."); priority = 1; } node = new ServerQueue(serverName, priority, locking.getServerLock(serverName)); @@ -848,9 +837,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { try { final LockAndQueue lock = locking.getServerLock(serverName); if (lock.tryExclusiveLock(procedure)) { - // We do not need to create a new queue so just pass null, as in tests we may pass - // procedures other than ServerProcedureInterface - removeFromRunQueue(serverRunQueue, getServerQueue(serverName, null)); + // In tests we may pass procedures other than ServerProcedureInterface, just pass null if + // so. + removeFromRunQueue(serverRunQueue, + getServerQueue(serverName, + procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure + : null)); return false; } waitProcedure(lock, procedure); @@ -873,9 +865,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final LockAndQueue lock = locking.getServerLock(serverName); // Only SCP will acquire/release server lock so do not need to check the return value here. lock.releaseExclusiveLock(procedure); - // We do not need to create a new queue so just pass null, as in tests we may pass procedures - // other than ServerProcedureInterface - addToRunQueue(serverRunQueue, getServerQueue(serverName, null)); + // In tests we may pass procedures other than ServerProcedureInterface, just pass null if + // so. + addToRunQueue(serverRunQueue, + getServerQueue(serverName, + procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure + : null)); int waitingCount = wakeWaitingProcedures(lock); wakePollIfNeeded(waitingCount); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java index 587cc82..58263d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java @@ -99,7 +99,7 @@ public final class MasterProcedureUtil { protected abstract void run() throws IOException; protected abstract String getDescription(); - protected long submitProcedure(final Procedure proc) { + protected long submitProcedure(final Procedure proc) { assert procId == null : "submitProcedure() was already called, running procId=" + procId; procId = getProcedureExecutor().submitProcedure(proc, nonceKey); return procId; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java index 86d8e43..0e80e2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java @@ -30,20 +30,6 @@ class PeerQueue extends Queue { } @Override - public boolean isAvailable() { - if (isEmpty()) { - return false; - } - if (getLockStatus().hasExclusiveLock()) { - // if we have an exclusive lock already taken - // only child of the lock owner can be executed - Procedure nextProc = peek(); - return nextProc != null && getLockStatus().hasLockAccess(nextProc); - } - return true; - } - - @Override public boolean requireExclusiveLock(Procedure proc) { return requirePeerExclusiveLock((PeerProcedureInterface) proc); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java index df0875e..328ac00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java @@ -106,7 +106,7 @@ public final class ProcedureSyncWait { } public static Future submitProcedure(final ProcedureExecutor procExec, - final Procedure proc) { + final Procedure proc) { if (proc.isInitializing()) { procExec.submitProcedure(proc); } @@ -114,7 +114,7 @@ public final class ProcedureSyncWait { } public static byte[] submitAndWaitProcedure(ProcedureExecutor procExec, - final Procedure proc) throws IOException { + final Procedure proc) throws IOException { if (proc.isInitializing()) { procExec.submitProcedure(proc); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java index f7bea2a..43e66d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java @@ -63,7 +63,18 @@ abstract class Queue> extends AvlLinkedNode nextProc = peek(); + return nextProc != null && getLockStatus().hasLockAccess(nextProc); + } + return true; } // ====================================================================== diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java index 458e073..e133a65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java @@ -36,8 +36,6 @@ public abstract class AbstractPeerProcedure protected String peerId; - private volatile boolean locked; - // used to keep compatible with old client where we can only returns after updateStorage. protected ProcedurePrepareLatch latch; @@ -59,17 +57,20 @@ public abstract class AbstractPeerProcedure } @Override + protected boolean waitInitialized(MasterProcedureEnv env) { + return env.waitInitialized(this); + } + + @Override protected LockState acquireLock(MasterProcedureEnv env) { if (env.getProcedureScheduler().waitPeerExclusiveLock(this, peerId)) { return LockState.LOCK_EVENT_WAIT; } - locked = true; return LockState.LOCK_ACQUIRED; } @Override protected void releaseLock(MasterProcedureEnv env) { - locked = false; env.getProcedureScheduler().wakePeerExclusiveLock(this, peerId); } @@ -79,11 +80,6 @@ public abstract class AbstractPeerProcedure } @Override - protected boolean hasLock(MasterProcedureEnv env) { - return locked; - } - - @Override protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { super.serializeStateData(serializer); serializer.serialize(PeerProcedureStateData.newBuilder().setPeerId(peerId).build()); diff --git a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp index 4e546cd..f617237 100644 --- a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp +++ b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp @@ -46,7 +46,7 @@ long millisFromLastRoll = walStore.getMillisFromLastRoll(); ArrayList procedureWALFiles = walStore.getActiveLogs(); Set corruptedWALFiles = walStore.getCorruptedLogs(); - List> procedures = procExecutor.getProcedures(); + List> procedures = procExecutor.getProcedures(); Collections.sort(procedures, new Comparator() { @Override public int compare(Procedure lhs, Procedure rhs) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestGetProcedureResult.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestGetProcedureResult.java index 8b1584f..4186594 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestGetProcedureResult.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestGetProcedureResult.java @@ -124,7 +124,7 @@ public class TestGetProcedureResult { @Test public void testRace() throws Exception { - ProcedureExecutor executor = + ProcedureExecutor executor = UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); DummyProcedure p = new DummyProcedure(); long procId = executor.submitProcedure(p); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java index 08ecb81..443bbab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; import org.apache.hadoop.hbase.procedure2.Procedure; @@ -434,7 +435,7 @@ public class TestAssignmentManager { am.wakeMetaLoadedEvent(); } - private Future submitProcedure(final Procedure proc) { + private Future submitProcedure(final Procedure proc) { return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java index 9a0e2f6..a56e842 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java @@ -111,7 +111,7 @@ public class TestMasterProcedureEvents { } private void testProcedureEventWaitWake(final HMaster master, final ProcedureEvent event, - final Procedure proc) throws Exception { + final Procedure proc) throws Exception { final ProcedureExecutor procExec = master.getMasterProcedureExecutor(); final MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureScheduler(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java index c003379..02f0257 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureAdmin.java @@ -207,7 +207,7 @@ public class TestProcedureAdmin { // Wait for one step to complete ProcedureTestingUtility.waitProcedure(procExec, procId); - List> procedures = procExec.getProcedures(); + List> procedures = procExec.getProcedures(); assertTrue(procedures.size() >= 1); boolean found = false; for (Procedure proc: procedures) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestFailedProcCleanup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestFailedProcCleanup.java index 3e21951..1402bbd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestFailedProcCleanup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestFailedProcCleanup.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.MasterObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -88,7 +89,7 @@ public class TestFailedProcCleanup { LOG.debug("Ignoring exception: ", e); Thread.sleep(evictionDelay * 3); } - List> procedureInfos = + List> procedureInfos = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getProcedures(); for (Procedure procedureInfo : procedureInfos) { if (procedureInfo.getProcName().equals("CreateTableProcedure") @@ -109,7 +110,7 @@ public class TestFailedProcCleanup { LOG.debug("Ignoring exception: ", e); Thread.sleep(evictionDelay * 3); } - List> procedureInfos = + List> procedureInfos = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getProcedures(); for (Procedure procedureInfo : procedureInfos) { if (procedureInfo.getProcName().equals("CreateTableProcedure") diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index a0b5d9d..163c2ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -587,7 +587,7 @@ public class TestAccessController extends SecureTestUtil { Procedure proc = new TestTableDDLProcedure(procExec.getEnvironment(), tableName); proc.setOwner(USER_OWNER); procExec.submitProcedure(proc); - final List> procList = procExec.getProcedures(); + final List> procList = procExec.getProcedures(); AccessTestAction getProceduresAction = new AccessTestAction() { @Override -- 2.7.4