diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index 5645f89..7ab1329 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -86,6 +86,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } @Override + public void addFront(final Procedure procedure, boolean notify) { + push(procedure, true, notify); + } + + @Override public void addFront(Iterator procedureIterator) { schedLock(); try { @@ -109,6 +114,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { push(procedure, false, true); } + @Override + public void addBack(final Procedure procedure, boolean notify) { + push(procedure, false, notify); + } + protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { schedLock(); try { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index a1391a5..a271d8f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -720,6 +720,10 @@ public abstract class Procedure implements Comparable { if (!p.hasParent()) { sendProcedureLoadedNotification(p.getProcId()); } - scheduler.addBack(p); + // If the procedure holds the lock, put the procedure in front + if (p.isLockedWhenLoading()) { + scheduler.addFront(p, false); + } else { + // if it was not, it can wait. + scheduler.addBack(p, false); + } }); + // After all procedures put into the queue, signal the worker threads. + // Otherwise, there is a race condition. See HBASE-21364. + scheduler.signalAll(); } /** diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java index e7e1cdb..adbb10b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java @@ -53,6 +53,12 @@ public interface ProcedureScheduler { void addFront(Procedure proc); /** + * Inserts the specified element at the front of this queue. + * @param proc the Procedure to add + */ + void addFront(Procedure proc, boolean notify); + + /** * Inserts all elements in the iterator at the front of this queue. */ void addFront(Iterator procedureIterator); @@ -64,6 +70,12 @@ public interface ProcedureScheduler { void addBack(Procedure proc); /** + * Inserts the specified element at the end of this queue. + * @param proc the Procedure to add + */ + void addBack(Procedure proc, boolean notify); + + /** * The procedure can't run at the moment. * add it back to the queue, giving priority to someone else. * @param proc the Procedure to add back to the list diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index cdd964b..46c5d44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -81,7 +81,9 @@ public class MasterProcedureEnv implements ConfigurationObserver { public MasterProcedureEnv(final MasterServices master, final RSProcedureDispatcher remoteDispatcher) { this.master = master; - this.procSched = new MasterProcedureScheduler(); + boolean verbose = master.getConfiguration() + .getBoolean("hbase.master.procedure.scheduler.verbose", false); + this.procSched = new MasterProcedureScheduler(verbose); this.remoteDispatcher = remoteDispatcher; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 51cbc3f..4d70084 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -111,6 +111,16 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private final SchemaLocking locking = new SchemaLocking(); + private boolean verbose = false; + + public MasterProcedureScheduler() { + + } + + public MasterProcedureScheduler(boolean verbose) { + this.verbose = verbose; + } + @Override public void yield(final Procedure proc) { push(proc, isTableProcedure(proc), true); @@ -140,11 +150,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { if (!queue.getLockStatus().hasExclusiveLock()) { // if the queue was not remove for an xlock execution,put the queue back into execution queue.add(proc, addFront); - addToRunQueue(fairq, queue); + addToRunQueue(fairq, queue, proc, "queue doesn't have exclusivelock"); } else if (queue.getLockStatus().hasLockAccess(proc)) { // always add it to front as the have the lock access. queue.add(proc, true); - addToRunQueue(fairq, queue); + addToRunQueue(fairq, queue, proc, "procedure has lock access"); } else { queue.add(proc, addFront); } @@ -175,6 +185,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private > Procedure doPoll(final FairQueue fairq) { final Queue rq = fairq.poll(); if (rq == null || !rq.isAvailable()) { + if (verbose && rq != null) { + LOG.debug("polled from {} but it is not available", rq); + } return null; } @@ -185,20 +198,30 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final boolean xlockReq = rq.requireExclusiveLock(pollResult); if (xlockReq && rq.getLockStatus().isLocked() && !rq.getLockStatus().hasLockAccess(pollResult)) { // someone is already holding the lock (e.g. shared lock). avoid a yield - removeFromRunQueue(fairq, rq); + removeFromRunQueue(fairq, rq, pollResult, + "exclusive lock not acquired: " + rq.getLockStatus()); return null; } rq.poll(); if (rq.isEmpty() || xlockReq) { - removeFromRunQueue(fairq, rq); + String why = ""; + if (rq.isEmpty()) { + why += "runqueue is empty "; + } + if (xlockReq) { + why += "xlock is required for next procedure"; + } + removeFromRunQueue(fairq, rq, pollResult, why); } else if (rq.getLockStatus().hasParentLock(pollResult)) { // if the rq is in the fairq because of runnable child // check if the next procedure is still a child. // if not, remove the rq from the fairq and go back to the xlock state Procedure nextProc = rq.peek(); if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) { - removeFromRunQueue(fairq, rq); + removeFromRunQueue(fairq, rq, pollResult, + " parent holds the lock and the next one is not the child of the parent, next procid=" + + nextProc.getProcId()); } } @@ -255,7 +278,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { while (treeMap != null) { Queue node = AvlTree.getFirst(treeMap); treeMap = AvlTree.remove(treeMap, node.getKey(), comparator); - if (fairq != null) removeFromRunQueue(fairq, node); + if (fairq != null) { + removeFromRunQueue(fairq, node, null, "clear is called"); + } } } @@ -309,16 +334,25 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - private static > void addToRunQueue(FairQueue fairq, Queue queue) { + private > void addToRunQueue( + FairQueue fairq, Queue queue, final Procedure proc, String why) { if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { fairq.add(queue); + if (verbose) { + LOG.debug("Adding {} to runqueue, triggered by {}, since {}", + queue.getKey(), proc, why); + } } } - private static > void removeFromRunQueue( - FairQueue fairq, Queue queue) { + private > void removeFromRunQueue( + FairQueue fairq, Queue queue, final Procedure proc, String why) { if (AvlIterableList.isLinked(queue)) { fairq.remove(queue); + if (verbose) { + LOG.debug("Remove {} from runqueue, triggered by {}, since {}", + queue.getKey(), proc, why); + } } } @@ -387,7 +421,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { LockAndQueue lock = locking.getServerLock(serverName); if (node.isEmpty() && lock.tryExclusiveLock(proc)) { - removeFromRunQueue(serverRunQueue, node); + removeFromRunQueue(serverRunQueue, node, proc, "tryCleanupServerQueue is called"); removeServerQueue(serverName); } } finally { @@ -472,7 +506,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { logLockedResource(LockedResourceType.TABLE, table.getNameAsString()); return true; } - removeFromRunQueue(tableRunQueue, getTableQueue(table)); + removeFromRunQueue(tableRunQueue, getTableQueue(table), procedure, + "need to wait TableExclusiveLock, lockstatus=" + tableLock); return false; } finally { schedUnlock(); @@ -496,7 +531,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { if (namespaceLock.releaseSharedLock()) { waitingCount += wakeWaitingProcedures(namespaceLock); } - addToRunQueue(tableRunQueue, getTableQueue(table)); + addToRunQueue(tableRunQueue, getTableQueue(table), procedure, + "table's exclusive lock is available"); wakePollIfNeeded(waitingCount); } finally { schedUnlock(); @@ -548,7 +584,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final LockAndQueue tableLock = locking.getTableLock(table); int waitingCount = 0; if (tableLock.releaseSharedLock()) { - addToRunQueue(tableRunQueue, getTableQueue(table)); + addToRunQueue(tableRunQueue, getTableQueue(table), procedure, + "table's shard lock is available"); waitingCount += wakeWaitingProcedures(tableLock); } if (namespaceLock.releaseSharedLock()) { @@ -756,7 +793,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { waitingCount += wakeWaitingProcedures(namespaceLock); } if (systemNamespaceTableLock.releaseSharedLock()) { - addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME)); + addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME), procedure, + "namespace exclusive lock is available"); waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); } wakePollIfNeeded(waitingCount); @@ -785,7 +823,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { removeFromRunQueue(serverRunQueue, getServerQueue(serverName, procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure - : null)); + : null), procedure, + "need to wait ServerExclusiveLock, ownerid=" + lock + .getExclusiveLockProcIdOwner()); return false; } waitProcedure(lock, procedure); @@ -813,7 +853,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { addToRunQueue(serverRunQueue, getServerQueue(serverName, procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure - : null)); + : null), procedure, "Server's exclusive lock is available"); int waitingCount = wakeWaitingProcedures(lock); wakePollIfNeeded(waitingCount); } finally { @@ -838,7 +878,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { try { final LockAndQueue lock = locking.getMetaLock(); if (lock.tryExclusiveLock(procedure)) { - removeFromRunQueue(metaRunQueue, getMetaQueue()); + removeFromRunQueue(metaRunQueue, getMetaQueue(), procedure, + "need to wait MetaExclusiveLock, lockStatus=" + lock); return false; } waitProcedure(lock, procedure); @@ -862,7 +903,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { try { final LockAndQueue lock = locking.getMetaLock(); lock.releaseExclusiveLock(procedure); - addToRunQueue(metaRunQueue, getMetaQueue()); + addToRunQueue(metaRunQueue, getMetaQueue(), procedure, + "meta's exclusive lock is available"); int waitingCount = wakeWaitingProcedures(lock); wakePollIfNeeded(waitingCount); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java index 43e66d0..4eb3390 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java @@ -118,9 +118,10 @@ abstract class Queue> extends AvlLinkedNode 0 ? peek() : "null")); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerOnRestart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerOnRestart.java new file mode 100644 index 0000000..206d33d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerOnRestart.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.master.DummyRegionProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + + +@Category({MasterTests.class, SmallTests.class}) +public class TestMasterProcedureSchedulerOnRestart { + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule + .forClass(TestMasterProcedureSchedulerOnRestart.class); + + private static final Logger LOG = LoggerFactory + .getLogger(TestMasterProcedureSchedulerOnRestart.class); + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + private static final TableName tablename = TableName.valueOf("test:TestProcedureScheduler"); + private static RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tablename).build(); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + + private static WALProcedureStore procStore; + + private static ProcedureExecutor procExecutor; + + private static HBaseCommonTestingUtility htu; + + private static MasterProcedureEnv masterProcedureEnv; + + + private static FileSystem fs; + private static Path testDir; + private static Path logDir; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + procExecutor = UTIL.getMiniHBaseCluster().getMaster() + .getMasterProcedureExecutor(); + } + + @Test + public void testScheduler() throws Exception { + // Add a region procedure, but stuck there + long regionProc = procExecutor.submitProcedure(new DummyRegionProcedure( + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor() + .getEnvironment(), regionInfo)); + WALProcedureStore walProcedureStore = (WALProcedureStore) procExecutor.getStore(); + // Roll the wal + walProcedureStore.rollWriterForTesting(); + Thread.sleep(500); + // Submit a table procedure + procExecutor.submitProcedure(new DummyTableProcedure( + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor() + .getEnvironment(), tablename)); + // Restart the procExecutor + ProcedureTestingUtility.restart(procExecutor); + while (procExecutor.getProcedure(regionProc) == null) { + Thread.sleep(500); + } + DummyRegionProcedure dummyRegionProcedure = (DummyRegionProcedure) procExecutor + .getProcedure(regionProc); + // Resume the region procedure + dummyRegionProcedure.resume(); + // The region procedure should finish normally + UTIL.waitFor(5000, () -> dummyRegionProcedure.isFinished()); + + } + + public static class DummyTableProcedure extends + AbstractStateMachineTableProcedure { + + private TableName tableName; + + public DummyTableProcedure() { + super(); + } + public DummyTableProcedure(final MasterProcedureEnv env, TableName tableName) { + super(null, null); + this.tableName = tableName; + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.CREATE; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, + DummyRegionTableState dummyRegionTableState) + throws ProcedureSuspendedException, ProcedureYieldException, + InterruptedException { + return null; + } + + @Override + protected void rollbackState(MasterProcedureEnv env, + DummyRegionTableState dummyRegionTableState) + throws IOException, InterruptedException { + + } + + @Override + protected DummyRegionTableState getState(int stateId) { + return DummyRegionTableState.STATE; + } + + @Override + protected int getStateId(DummyRegionTableState dummyRegionTableState) { + return 0; + } + + @Override + protected DummyRegionTableState getInitialState() { + return DummyRegionTableState.STATE; + } + + @Override + protected Procedure[] execute(final MasterProcedureEnv env) + throws ProcedureSuspendedException { + LOG.info("Finished execute"); + return null; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { + super.serializeStateData(serializer); + serializer.serialize(ProtobufUtil.toProtoTableName(tableName)); + + + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { + super.deserializeStateData(serializer); + tableName = ProtobufUtil + .toTableName(serializer.deserialize(HBaseProtos.TableName.class)); + + } + + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + return super.acquireLock(env); + } + + @Override + protected void releaseLock(MasterProcedureEnv env) { + super.releaseLock(env); + } + } + + public enum DummyRegionTableState { + STATE + } + + +}