From 0563ab59404478f24c2d18432e709436946d5a10 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 25 Oct 2018 16:30:21 +0800 Subject: [PATCH] HBASE-21375 Revisit the lock and queue implementation in MasterProcedureScheduler --- .../hadoop/hbase/procedure2/Procedure.java | 12 +- .../hbase/procedure2/ProcedureExecutor.java | 7 +- .../procedure2/ProcedureTestingUtility.java | 2 +- .../procedure/MasterProcedureScheduler.java | 19 +- .../procedure/TestSchedulerQueueDeadLock.java | 269 ++++++++++++++++++ 5 files changed, 297 insertions(+), 12 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index a85ccb1292..ece4412338 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -711,12 +711,20 @@ public abstract class Procedure implements Comparable + * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in + * front of a queue. + */ public boolean isLockedWhenLoading() { return lockedWhenLoading; } @@ -990,7 +998,7 @@ public abstract class Procedure implements Comparable { proc.afterReplay(getEnvironment()); } }); + // 4. restore locks + restoreLocks(); - // 4. Push the procedures to the timeout executor + // 5. Push the procedures to the timeout executor waitingTimeoutList.forEach(proc -> { proc.afterReplay(getEnvironment()); timeoutExecutor.add(proc); }); - // 5. restore locks - restoreLocks(); + // 6. Push the procedure to the scheduler failedList.forEach(scheduler::addBack); runnableList.forEach(p -> { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 95e032043f..5a7a664c00 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -403,7 +403,7 @@ public class ProcedureTestingUtility { public NoopProcedure() {} @Override - protected Procedure[] execute(TEnv env) + protected Procedure[] execute(TEnv env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 5ee5f49684..7d910c1f4a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -117,7 +117,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { @Override public void yield(final Procedure proc) { - push(proc, isTableProcedure(proc), true); + push(proc, false, true); } @Override @@ -141,14 +141,21 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - private > void doAdd(final FairQueue fairq, - final Queue queue, final Procedure proc, final boolean addFront) { - if (!queue.getLockStatus().hasExclusiveLock()) { - // if the queue was not remove for an xlock execution,put the queue back into execution + private > void doAdd(final FairQueue fairq, final Queue queue, + final Procedure proc, final boolean addFront) { + if (proc.hasLock() || proc.isLockedWhenLoading()) { + // the procedure has already held the lock, which means it can be executed immediately, then + // add it to front, and put the queue back into execution + // this could happen when restarting, as now we will restore the locks before putting them + // into the scheduler. + queue.add(proc, true); + addToRunQueue(fairq, queue); + } else if (!queue.getLockStatus().hasExclusiveLock()) { + // if the queue was not remove for an xlock execution, put the queue back into execution queue.add(proc, addFront); addToRunQueue(fairq, queue); } else if (queue.getLockStatus().hasLockAccess(proc)) { - // always add it to front as the have the lock access. + // always add it to front as we have the lock access. queue.add(proc, true); addToRunQueue(fairq, queue); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java new file mode 100644 index 0000000000..bc288dfda6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.concurrent.Semaphore; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.IdLock; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestSchedulerQueueDeadLock { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSchedulerQueueDeadLock.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("deadlock"); + + private static final class TestEnv { + private final MasterProcedureScheduler scheduler; + + public TestEnv(MasterProcedureScheduler scheduler) { + this.scheduler = scheduler; + } + + public MasterProcedureScheduler getScheduler() { + return scheduler; + } + } + + public static class TableSharedProcedure extends NoopProcedure + implements TableProcedureInterface { + + private final Semaphore latch = new Semaphore(0); + + @Override + protected Procedure[] execute(TestEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + latch.acquire(); + return null; + } + + @Override + protected LockState acquireLock(TestEnv env) { + if (env.getScheduler().waitTableSharedLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(TestEnv env) { + env.getScheduler().wakeTableSharedLock(this, getTableName()); + } + + @Override + protected boolean holdLock(TestEnv env) { + return true; + } + + @Override + public TableName getTableName() { + return TABLE_NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.READ; + } + } + + public static class TableExclusiveProcedure extends NoopProcedure + implements TableProcedureInterface { + + private final Semaphore latch = new Semaphore(0); + + @Override + protected Procedure[] execute(TestEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + latch.acquire(); + return null; + } + + @Override + protected LockState acquireLock(TestEnv env) { + if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(TestEnv env) { + env.getScheduler().wakeTableExclusiveLock(this, getTableName()); + } + + @Override + protected boolean holdLock(TestEnv env) { + return true; + } + + @Override + public TableName getTableName() { + return TABLE_NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.EDIT; + } + } + + @AfterClass + public static void tearDownAfterClass() throws IOException { + UTIL.cleanupTestDir(); + } + + private WALProcedureStore procStore; + + private ProcedureExecutor procExec; + + @Rule + public final TestName name = new TestName(); + + @Before + public void setUp() throws IOException { + UTIL.getConfiguration().setInt("hbase.procedure.worker.stuck.threshold.msec", 6000000); + procStore = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), + UTIL.getDataTestDir(name.getMethodName())); + procStore.start(1); + MasterProcedureScheduler scheduler = new MasterProcedureScheduler(); + procExec = new ProcedureExecutor<>(UTIL.getConfiguration(), new TestEnv(scheduler), procStore, + scheduler); + procExec.init(1, false); + } + + @After + public void tearDown() { + procExec.stop(); + procStore.stop(false); + } + + public static final class TableSharedProcedureWithId extends TableSharedProcedure { + + @Override + protected void setProcId(long procId) { + // this is a hack to make this procedure be loaded after the procedure below as we will sort + // the procedures by id when loading. + super.setProcId(2L); + } + } + + public static final class TableExclusiveProcedureWithId extends TableExclusiveProcedure { + + @Override + protected void setProcId(long procId) { + // this is a hack to make this procedure be loaded before the procedure above as we will + // sort the procedures by id when loading. + super.setProcId(1L); + } + } + + @Test + public void testTableProcedureDeadLockAfterRestarting() throws Exception { + // let the shared procedure run first, but let it have a greater procId so when loading it will + // be loaded at last. + long procId1 = procExec.submitProcedure(new TableSharedProcedureWithId()); + long procId2 = procExec.submitProcedure(new TableExclusiveProcedureWithId()); + procExec.startWorkers(); + UTIL.waitFor(10000, + () -> ((TableSharedProcedure) procExec.getProcedure(procId1)).latch.hasQueuedThreads()); + + ProcedureTestingUtility.restart(procExec); + + ((TableSharedProcedure) procExec.getProcedure(procId1)).latch.release(); + ((TableExclusiveProcedure) procExec.getProcedure(procId2)).latch.release(); + + UTIL.waitFor(10000, () -> procExec.isFinished(procId1)); + UTIL.waitFor(10000, () -> procExec.isFinished(procId2)); + } + + public static final class TableShardParentProcedure extends NoopProcedure + implements TableProcedureInterface { + + @Override + protected Procedure[] execute(TestEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + return new Procedure[] { new TableSharedProcedure() }; + } + + @Override + protected LockState acquireLock(TestEnv env) { + if (env.getScheduler().waitTableSharedLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(TestEnv env) { + env.getScheduler().wakeTableSharedLock(this, getTableName()); + } + + @Override + protected boolean holdLock(TestEnv env) { + return true; + } + + @Override + public TableName getTableName() { + return TABLE_NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.READ; + } + } + + @Test + public void testTableProcedureSubProcedureDeadLock() throws Exception { + long procId1 = procExec.submitProcedure(new TableShardParentProcedure()); + long procId2 = procExec.submitProcedure(new TableExclusiveProcedure()); + procExec.startWorkers(); + UTIL.waitFor(10000, + () -> procExec.getProcedures().stream().anyMatch(p -> p instanceof TableSharedProcedure)); + procExec.getProcedures().stream().filter(p -> p instanceof TableSharedProcedure) + .map(p -> (TableSharedProcedure) p).forEach(p -> p.latch.release()); + + UTIL.waitFor(10000, () -> procExec.isFinished(procId1)); + UTIL.waitFor(10000, () -> procExec.isFinished(procId2)); + } +} -- 2.17.1