From 4a32a3ca19f67a58e652153c84d64aa4dde04d0c Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 25 Oct 2018 21:46:23 +0800 Subject: [PATCH] HBASE-21375 Revisit the lock and queue implementation in MasterProcedureScheduler --- .../hadoop/hbase/procedure2/LockAndQueue.java | 31 +- .../hadoop/hbase/procedure2/LockStatus.java | 41 ++- .../hadoop/hbase/procedure2/Procedure.java | 12 +- .../hbase/procedure2/ProcedureExecutor.java | 28 +- .../procedure2/ProcedureTestingUtility.java | 2 +- .../hbase/procedure2/TestLockAndQueue.java | 68 +++++ .../master/procedure/MasterProcedureEnv.java | 3 +- .../procedure/MasterProcedureScheduler.java | 85 +++--- .../hbase/master/procedure/SchemaLocking.java | 11 +- ...ocedureSchedulerPerformanceEvaluation.java | 2 +- .../TestMasterProcedureScheduler.java | 2 +- ...stMasterProcedureSchedulerConcurrency.java | 2 +- .../procedure/TestSchedulerQueueDeadLock.java | 276 ++++++++++++++++++ 13 files changed, 482 insertions(+), 81 deletions(-) create mode 100644 hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestLockAndQueue.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java index 4365a2c195..21d95218a4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.procedure2; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Stream; import org.apache.yetus.audience.InterfaceAudience; @@ -48,6 +49,8 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private public class LockAndQueue implements LockStatus { + + private final Function> procedureRetriever; private final ProcedureDeque queue = new ProcedureDeque(); private Procedure exclusiveLockOwnerProcedure = null; private int sharedLock = 0; @@ -56,6 +59,10 @@ public class LockAndQueue implements LockStatus { // Lock Status // ====================================================================== + public LockAndQueue(Function> procedureRetriever) { + this.procedureRetriever = procedureRetriever; + } + @Override public boolean isLocked() { return hasExclusiveLock() || sharedLock > 0; @@ -72,16 +79,28 @@ public class LockAndQueue implements LockStatus { } @Override - public boolean hasParentLock(Procedure proc) { - // TODO: need to check all the ancestors. need to passed in the procedures - // to find the ancestors. - return proc.hasParent() && - (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); + public boolean hasAncestorLock(Procedure proc) { + if (!proc.hasParent()) { + return false; + } + // fast path to test root procedure + if (isLockOwner(proc.getRootProcId())) { + return true; + } + for (Procedure p = proc;;) { + if (isLockOwner(p.getParentProcId())) { + return true; + } + p = procedureRetriever.apply(p.getParentProcId()); + if (p == null || !p.hasParent()) { + return false; + } + } } @Override public boolean hasLockAccess(Procedure proc) { - return isLockOwner(proc.getProcId()) || hasParentLock(proc); + return isLockOwner(proc.getProcId()) || hasAncestorLock(proc); } @Override diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java index 031b8bb54a..925ae5b26e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java @@ -21,25 +21,58 @@ package org.apache.hadoop.hbase.procedure2; import org.apache.yetus.audience.InterfaceAudience; /** - * Interface to get status of a Lock without getting access to acquire/release lock. - * Currently used in MasterProcedureScheduler where we want to give Queues access to lock's - * status for scheduling purposes, but not the ability to acquire/release it. + * Interface to get status of a Lock without getting access to acquire/release lock. Currently used + * in MasterProcedureScheduler where we want to give Queues access to lock's status for scheduling + * purposes, but not the ability to acquire/release it. */ @InterfaceAudience.Private public interface LockStatus { + + /** + * Return whether this lock has already been held, + *

+ * Notice that, holding the exclusive lock or shared lock are both considered as locked, i.e, this + * method usually equals to {@code hasExclusiveLock() || getSharedLockCount() > 0}. + */ boolean isLocked(); + /** + * Whether the exclusive lock has been held. + */ boolean hasExclusiveLock(); + /** + * Return true if the exclusive lock has been held by the given procedure. + */ boolean isLockOwner(long procId); - boolean hasParentLock(Procedure proc); + /** + * Return true if any ancestors of the give procedure hold the exclusive lock. + */ + boolean hasAncestorLock(Procedure proc); + /** + * Return true if the procedure itself holds the exclusive lock, or any ancestors of the give + * procedure hold the exclusive lock, i.e, this method usually equals to + * {@code isLockOwner(proc.getProcId() || hasAncestorLock(proc}. + */ boolean hasLockAccess(Procedure proc); + /** + * Get the procedure which holds the exclusive lock. + */ Procedure getExclusiveLockOwnerProcedure(); + /** + * Return the id of the procedure which holds the exclusive lock, if exists. Or a negative value + * which means no one holds the exclusive lock. + *

+ * Notice that, in HBase, we assume that the procedure id is positive, or at least non-negative. + */ long getExclusiveLockProcIdOwner(); + /** + * Get the number of procedures which hold the shared lock. + */ int getSharedLockCount(); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index a85ccb1292..ece4412338 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -711,12 +711,20 @@ public abstract class Procedure implements Comparable + * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in + * front of a queue. + */ public boolean isLockedWhenLoading() { return lockedWhenLoading; } @@ -990,7 +998,7 @@ public abstract class Procedure implements Comparable { proc.afterReplay(getEnvironment()); } }); + // 4. restore locks + restoreLocks(); - // 4. Push the procedures to the timeout executor + // 5. Push the procedures to the timeout executor waitingTimeoutList.forEach(proc -> { proc.afterReplay(getEnvironment()); timeoutExecutor.add(proc); }); - // 5. restore locks - restoreLocks(); + // 6. Push the procedure to the scheduler failedList.forEach(scheduler::addBack); runnableList.forEach(p -> { @@ -652,26 +653,7 @@ public class ProcedureExecutor { if (!p.hasParent()) { sendProcedureLoadedNotification(p.getProcId()); } - // If the procedure holds the lock, put the procedure in front - // If its parent holds the lock, put the procedure in front - // TODO. Is that possible that its ancestor holds the lock? - // For now, the deepest procedure hierarchy is: - // ModifyTableProcedure -> ReopenTableProcedure -> - // MoveTableProcedure -> Unassign/AssignProcedure - // But ModifyTableProcedure and ReopenTableProcedure won't hold the lock - // So, check parent lock is enough(a tricky case is resovled by HBASE-21384). - // If some one change or add new procedures making 'grandpa' procedure - // holds the lock, but parent procedure don't hold the lock, there will - // be a problem here. We have to check one procedure's ancestors. - // And we need to change LockAndQueue.hasParentLock(Procedure proc) method - // to check all ancestors too. - if (p.isLockedWhenLoading() || (p.hasParent() && procedures - .get(p.getParentProcId()).isLockedWhenLoading())) { - scheduler.addFront(p, false); - } else { - // if it was not, it can wait. - scheduler.addBack(p, false); - } + scheduler.addBack(p); }); // After all procedures put into the queue, signal the worker threads. // Otherwise, there is a race condition. See HBASE-21364. diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 95e032043f..5a7a664c00 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -403,7 +403,7 @@ public class ProcedureTestingUtility { public NoopProcedure() {} @Override - protected Procedure[] execute(TEnv env) + protected Procedure[] execute(TEnv env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { return null; } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestLockAndQueue.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestLockAndQueue.java new file mode 100644 index 0000000000..9b10bc64d4 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestLockAndQueue.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestLockAndQueue { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestLockAndQueue.class); + + @Test + public void testHasAncestorLock() { + Map> procMap = new HashMap<>(); + for (long i = 1; i <= 10; i++) { + NoopProcedure proc = new NoopProcedure<>(); + proc.setProcId(i); + if (i > 1) { + proc.setParentProcId(i - 1); + proc.setRootProcId(1); + } + procMap.put(i, proc); + } + LockAndQueue laq = new LockAndQueue(procMap::get); + for (long i = 1; i <= 10; i++) { + assertFalse(laq.hasAncestorLock(procMap.get(i))); + } + for (long i = 1; i <= 10; i++) { + NoopProcedure procHasLock = procMap.get(i); + laq.tryExclusiveLock(procHasLock); + for (long j = 1; j <= i; j++) { + assertFalse(laq.hasAncestorLock(procMap.get(j))); + } + for (long j = i + 1; j <= 10; j++) { + assertTrue(laq.hasAncestorLock(procMap.get(j))); + } + laq.releaseExclusiveLock(procHasLock); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index cd402344c9..4fcf7e0c64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -83,7 +83,8 @@ public class MasterProcedureEnv implements ConfigurationObserver { public MasterProcedureEnv(final MasterServices master, final RSProcedureDispatcher remoteDispatcher) { this.master = master; - this.procSched = new MasterProcedureScheduler(); + this.procSched = new MasterProcedureScheduler( + procId -> master.getMasterProcedureExecutor().getProcedure(procId)); this.remoteDispatcher = remoteDispatcher; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 5ee5f49684..6e15acb60f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.function.Function; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; @@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; import org.apache.hadoop.hbase.procedure2.LockAndQueue; +import org.apache.hadoop.hbase.procedure2.LockStatus; import org.apache.hadoop.hbase.procedure2.LockedResource; import org.apache.hadoop.hbase.procedure2.LockedResourceType; import org.apache.hadoop.hbase.procedure2.Procedure; @@ -113,11 +115,15 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private PeerQueue peerMap = null; private MetaQueue metaMap = null; - private final SchemaLocking locking = new SchemaLocking(); + private final SchemaLocking locking; + + public MasterProcedureScheduler(Function> procedureRetriever) { + locking = new SchemaLocking(procedureRetriever); + } @Override public void yield(final Procedure proc) { - push(proc, isTableProcedure(proc), true); + push(proc, false, true); } @Override @@ -141,18 +147,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - private > void doAdd(final FairQueue fairq, - final Queue queue, final Procedure proc, final boolean addFront) { - if (!queue.getLockStatus().hasExclusiveLock()) { - // if the queue was not remove for an xlock execution,put the queue back into execution - queue.add(proc, addFront); - addToRunQueue(fairq, queue); - } else if (queue.getLockStatus().hasLockAccess(proc)) { - // always add it to front as the have the lock access. - queue.add(proc, true); + private > void doAdd(FairQueue fairq, Queue queue, + Procedure proc, boolean addFront) { + queue.add(proc, addFront); + // For the following conditions, we will put the queue back into execution + // 1. The procedure has already held the lock, or the lock has been restored when restarting, + // which means it can be executed immediately. + // 2. The exclusive lock for this queue has not been held. + // 3. The given procedure has the exclusive lock permission for this queue. + if (proc.hasLock() || proc.isLockedWhenLoading() || !queue.getLockStatus().hasExclusiveLock() || + queue.getLockStatus().hasLockAccess(proc)) { addToRunQueue(fairq, queue); - } else { - queue.add(proc, addFront); } } @@ -181,38 +186,40 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return pollResult; } - private > Procedure doPoll(final FairQueue fairq) { - final Queue rq = fairq.poll(); - if (rq == null || !rq.isAvailable()) { - return null; + private > boolean isLockReady(Procedure proc, Queue rq) { + LockStatus s = rq.getLockStatus(); + // if we have the lock access, we are ready + if (s.hasLockAccess(proc)) { + return true; } + boolean xlockReq = rq.requireExclusiveLock(proc); + // if we need to hold the xlock, then we need to make sure that no one holds any lock, including + // the shared lock, otherwise, we just need to make sure that no one holds the xlock + return xlockReq ? !s.isLocked() : !s.hasExclusiveLock(); + } - final Procedure pollResult = rq.peek(); - if (pollResult == null) { - return null; - } - final boolean xlockReq = rq.requireExclusiveLock(pollResult); - if (xlockReq && rq.getLockStatus().isLocked() && !rq.getLockStatus().hasLockAccess(pollResult)) { - // someone is already holding the lock (e.g. shared lock). avoid a yield - removeFromRunQueue(fairq, rq); + private > Procedure doPoll(final FairQueue fairq) { + Queue rq = fairq.poll(); + if (rq == null || !rq.isAvailable()) { return null; } - - rq.poll(); - if (rq.isEmpty() || xlockReq) { - removeFromRunQueue(fairq, rq); - } else if (rq.getLockStatus().hasParentLock(pollResult)) { - // if the rq is in the fairq because of runnable child - // check if the next procedure is still a child. - // if not, remove the rq from the fairq and go back to the xlock state - Procedure nextProc = rq.peek(); - if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult) - && nextProc.getRootProcId() != pollResult.getRootProcId()) { - removeFromRunQueue(fairq, rq); + // loop until we find out a procedure which is ready to run, or if we have checked all the + // procedures, then we give up and remove the queue from run queue. + for (int i = 0, n = rq.size(); i < n; i++) { + Procedure proc = rq.poll(); + if (isLockReady(proc, rq)) { + // the queue is empty, remove from run queue + if (rq.isEmpty()) { + removeFromRunQueue(fairq, rq); + } + return proc; } + // we are not ready to run, add back and try the next procedure + rq.add(proc, false); } - - return pollResult; + // no procedure is ready for execution, remove from run queue + removeFromRunQueue(fairq, rq); + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java index afd9185675..70e7c592ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java @@ -45,18 +45,25 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; */ @InterfaceAudience.Private class SchemaLocking { + + private final Function> procedureRetriever; private final Map serverLocks = new HashMap<>(); private final Map namespaceLocks = new HashMap<>(); private final Map tableLocks = new HashMap<>(); // Single map for all regions irrespective of tables. Key is encoded region name. private final Map regionLocks = new HashMap<>(); private final Map peerLocks = new HashMap<>(); - private final LockAndQueue metaLock = new LockAndQueue(); + private final LockAndQueue metaLock; + + public SchemaLocking(Function> procedureRetriever) { + this.procedureRetriever = procedureRetriever; + this.metaLock = new LockAndQueue(procedureRetriever); + } private LockAndQueue getLock(Map map, T key) { LockAndQueue lock = map.get(key); if (lock == null) { - lock = new LockAndQueue(); + lock = new LockAndQueue(procedureRetriever); map.put(key, lock); } return lock; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java index 767f30fe28..ae874d500e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java @@ -256,7 +256,7 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase @Override protected int doWork() throws Exception { - procedureScheduler = new MasterProcedureScheduler(); + procedureScheduler = new MasterProcedureScheduler(pid -> null); procedureScheduler.start(); setupOperations(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 957c5839a0..b4aedb65d8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -71,7 +71,7 @@ public class TestMasterProcedureScheduler { @Before public void setUp() throws IOException { - queue = new MasterProcedureScheduler(); + queue = new MasterProcedureScheduler(pid -> null); queue.start(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java index 1313cdba85..7a43f75573 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java @@ -54,7 +54,7 @@ public class TestMasterProcedureSchedulerConcurrency { @Before public void setUp() throws IOException { - queue = new MasterProcedureScheduler(); + queue = new MasterProcedureScheduler(pid -> null); queue.start(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java new file mode 100644 index 0000000000..5fc08b5414 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.concurrent.Semaphore; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestSchedulerQueueDeadLock { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSchedulerQueueDeadLock.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("deadlock"); + + private static final class TestEnv { + private final MasterProcedureScheduler scheduler; + + public TestEnv(MasterProcedureScheduler scheduler) { + this.scheduler = scheduler; + } + + public MasterProcedureScheduler getScheduler() { + return scheduler; + } + } + + public static class TableSharedProcedure extends NoopProcedure + implements TableProcedureInterface { + + private final Semaphore latch = new Semaphore(0); + + @Override + protected Procedure[] execute(TestEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + latch.acquire(); + return null; + } + + @Override + protected LockState acquireLock(TestEnv env) { + if (env.getScheduler().waitTableSharedLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(TestEnv env) { + env.getScheduler().wakeTableSharedLock(this, getTableName()); + } + + @Override + protected boolean holdLock(TestEnv env) { + return true; + } + + @Override + public TableName getTableName() { + return TABLE_NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.READ; + } + } + + public static class TableExclusiveProcedure extends NoopProcedure + implements TableProcedureInterface { + + private final Semaphore latch = new Semaphore(0); + + @Override + protected Procedure[] execute(TestEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + latch.acquire(); + return null; + } + + @Override + protected LockState acquireLock(TestEnv env) { + if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(TestEnv env) { + env.getScheduler().wakeTableExclusiveLock(this, getTableName()); + } + + @Override + protected boolean holdLock(TestEnv env) { + return true; + } + + @Override + public TableName getTableName() { + return TABLE_NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.EDIT; + } + } + + @AfterClass + public static void tearDownAfterClass() throws IOException { + UTIL.cleanupTestDir(); + } + + private WALProcedureStore procStore; + + private ProcedureExecutor procExec; + + @Rule + public final TestName name = new TestName(); + + @Before + public void setUp() throws IOException { + UTIL.getConfiguration().setInt("hbase.procedure.worker.stuck.threshold.msec", 6000000); + procStore = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), + UTIL.getDataTestDir(name.getMethodName())); + procStore.start(1); + MasterProcedureScheduler scheduler = new MasterProcedureScheduler(pid -> null); + procExec = new ProcedureExecutor<>(UTIL.getConfiguration(), new TestEnv(scheduler), procStore, + scheduler); + procExec.init(1, false); + } + + @After + public void tearDown() { + procExec.stop(); + procStore.stop(false); + } + + public static final class TableSharedProcedureWithId extends TableSharedProcedure { + + @Override + protected void setProcId(long procId) { + // this is a hack to make this procedure be loaded after the procedure below as we will sort + // the procedures by id when loading. + super.setProcId(2L); + } + } + + public static final class TableExclusiveProcedureWithId extends TableExclusiveProcedure { + + @Override + protected void setProcId(long procId) { + // this is a hack to make this procedure be loaded before the procedure above as we will + // sort the procedures by id when loading. + super.setProcId(1L); + } + } + + @Test + public void testTableProcedureDeadLockAfterRestarting() throws Exception { + // let the shared procedure run first, but let it have a greater procId so when loading it will + // be loaded at last. + long procId1 = procExec.submitProcedure(new TableSharedProcedureWithId()); + long procId2 = procExec.submitProcedure(new TableExclusiveProcedureWithId()); + procExec.startWorkers(); + UTIL.waitFor(10000, + () -> ((TableSharedProcedure) procExec.getProcedure(procId1)).latch.hasQueuedThreads()); + + ProcedureTestingUtility.restart(procExec); + + ((TableSharedProcedure) procExec.getProcedure(procId1)).latch.release(); + ((TableExclusiveProcedure) procExec.getProcedure(procId2)).latch.release(); + + UTIL.waitFor(10000, () -> procExec.isFinished(procId1)); + UTIL.waitFor(10000, () -> procExec.isFinished(procId2)); + } + + public static final class TableShardParentProcedure extends NoopProcedure + implements TableProcedureInterface { + + private boolean scheduled; + + @Override + protected Procedure[] execute(TestEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (!scheduled) { + scheduled = true; + return new Procedure[] { new TableSharedProcedure() }; + } + return null; + } + + @Override + protected LockState acquireLock(TestEnv env) { + if (env.getScheduler().waitTableSharedLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(TestEnv env) { + env.getScheduler().wakeTableSharedLock(this, getTableName()); + } + + @Override + protected boolean holdLock(TestEnv env) { + return true; + } + + @Override + public TableName getTableName() { + return TABLE_NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.READ; + } + } + + @Test + public void testTableProcedureSubProcedureDeadLock() throws Exception { + // the shared procedure will also schedule a shared procedure, but after the exclusive procedure + long procId1 = procExec.submitProcedure(new TableShardParentProcedure()); + long procId2 = procExec.submitProcedure(new TableExclusiveProcedure()); + procExec.startWorkers(); + UTIL.waitFor(10000, + () -> procExec.getProcedures().stream().anyMatch(p -> p instanceof TableSharedProcedure)); + procExec.getProcedures().stream().filter(p -> p instanceof TableSharedProcedure) + .map(p -> (TableSharedProcedure) p).forEach(p -> p.latch.release()); + ((TableExclusiveProcedure) procExec.getProcedure(procId2)).latch.release(); + + UTIL.waitFor(10000, () -> procExec.isFinished(procId1)); + UTIL.waitFor(10000, () -> procExec.isFinished(procId2)); + } +} -- 2.17.1