From c7edfb2e10d51c6b6a216f5673e89b4e382b97b4 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 9 Jul 2018 22:03:03 +0800 Subject: [PATCH] HBASE-20847 The parent procedure of RegionTransitionProcedure may not have the table lock --- .../procedure2/AbstractProcedureScheduler.java | 20 +++--- .../hadoop/hbase/procedure2/LockAndQueue.java | 83 +++++++++++++++++----- .../apache/hadoop/hbase/procedure2/LockStatus.java | 4 +- .../master/procedure/MasterProcedureScheduler.java | 55 +++++++------- .../hbase/master/procedure/SchemaLocking.java | 9 +-- 5 files changed, 103 insertions(+), 68 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index fbfa5b2..c036163 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -253,22 +253,16 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { * Wakes up given waiting procedures by pushing them back into scheduler queues. * @return size of given {@code waitQueue}. */ - protected int wakeWaitingProcedures(final ProcedureDeque waitQueue) { - int count = waitQueue.size(); - // wakeProcedure adds to the front of queue, so we start from last in the - // waitQueue' queue, so that the procedure which was added first goes in the front for - // the scheduler queue. - addFront(waitQueue.descendingIterator()); - waitQueue.clear(); - return count; + protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) { + return lockAndQueue.wakeWaitingProcedures(this); } - protected void waitProcedure(final ProcedureDeque waitQueue, final Procedure proc) { - waitQueue.addLast(proc); + protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) { + lockAndQueue.addLast(proc); } protected void wakeProcedure(final Procedure procedure) { - if (LOG.isTraceEnabled()) LOG.trace("Wake " + procedure); + LOG.trace("Wake {}", procedure); push(procedure, /* addFront= */ true, /* notify= */false); } @@ -285,7 +279,9 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } protected void wakePollIfNeeded(final int waitingCount) { - if (waitingCount <= 0) return; + if (waitingCount <= 0) { + return; + } if (waitingCount == 1) { schedWaitCond.signal(); } else { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java index 427c1fc..274940b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.procedure2; +import java.util.function.Predicate; +import java.util.stream.Stream; import org.apache.yetus.audience.InterfaceAudience; /** @@ -45,7 +47,8 @@ import org.apache.yetus.audience.InterfaceAudience; * We do not use ReentrantReadWriteLock directly because of its high memory overhead. */ @InterfaceAudience.Private -public class LockAndQueue extends ProcedureDeque implements LockStatus { +public class LockAndQueue implements LockStatus { + private final ProcedureDeque queue = new ProcedureDeque(); private Procedure exclusiveLockOwnerProcedure = null; private int sharedLock = 0; @@ -69,12 +72,14 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus { } @Override - public boolean hasParentLock(final Procedure proc) { - return proc.hasParent() && (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); + public boolean hasParentLock(Procedure proc) { + // TODO: need to check all the ancestors + return proc.hasParent() && + (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); } @Override - public boolean hasLockAccess(final Procedure proc) { + public boolean hasLockAccess(Procedure proc) { return isLockOwner(proc.getProcId()) || hasParentLock(proc); } @@ -101,37 +106,81 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus { // try/release Shared/Exclusive lock // ====================================================================== - public boolean trySharedLock() { - if (hasExclusiveLock()) return false; + /** + * @return whether we have succesfully acquired the shared lock. + */ + public boolean trySharedLock(Procedure proc) { + if (hasExclusiveLock()) { + // If the parent proc or we have already held the exclusive lock, then we return true here as + // exclusive lock is more powerful then shared lock. + return hasLockAccess(proc); + } sharedLock++; return true; } + /** + * @return whether we should wake the procedures waiting on the lock here. + */ public boolean releaseSharedLock() { + if (hasExclusiveLock()) { + // This usually means we acquire shared lock while we or our parent have held the exclusive + // lock, as we do not increase the sharedLock number so here we do not decrease it either. And + // also, since there are still an exclusive lock, we do not need to wake any procedures. + return false; + } return --sharedLock == 0; } - public boolean tryExclusiveLock(final Procedure proc) { - if (isLocked()) return hasLockAccess(proc); + public boolean tryExclusiveLock(Procedure proc) { + if (isLocked()) { + return hasLockAccess(proc); + } exclusiveLockOwnerProcedure = proc; return true; } /** - * @return True if we released a lock. + * @return whether we should wake the procedures waiting on the lock here. */ - public boolean releaseExclusiveLock(final Procedure proc) { - if (isLockOwner(proc.getProcId())) { - exclusiveLockOwnerProcedure = null; - return true; + public boolean releaseExclusiveLock(Procedure proc) { + if (!isLockOwner(proc.getProcId())) { + // We are not the lock owner, it is probably inherited from the parent procedures. + return false; } - return false; + exclusiveLockOwnerProcedure = null; + return true; + } + + public boolean isWaitingQueueEmpty() { + return queue.isEmpty(); + } + + public Procedure removeFirst() { + return queue.removeFirst(); + } + + public void addLast(Procedure proc) { + queue.addLast(proc); + } + + public int wakeWaitingProcedures(ProcedureScheduler scheduler) { + int count = queue.size(); + // wakeProcedure adds to the front of queue, so we start from last in the waitQueue' queue, so + // that the procedure which was added first goes in the front for the scheduler queue. + scheduler.addFront(queue.descendingIterator()); + queue.clear(); + return count; + } + + @SuppressWarnings("rawtypes") + public Stream filterWaitingQueue(Predicate predicate) { + return queue.stream().filter(predicate); } @Override public String toString() { - return "exclusiveLockOwner=" + (hasExclusiveLock()? getExclusiveLockProcIdOwner(): "NONE") + - ", sharedLockCount=" + getSharedLockCount() + - ", waitingProcCount=" + size(); + return "exclusiveLockOwner=" + (hasExclusiveLock() ? getExclusiveLockProcIdOwner() : "NONE") + + ", sharedLockCount=" + getSharedLockCount() + ", waitingProcCount=" + queue.size(); } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java index b0c8d29..031b8bb 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java @@ -33,9 +33,9 @@ public interface LockStatus { boolean isLockOwner(long procId); - boolean hasParentLock(final Procedure proc); + boolean hasParentLock(Procedure proc); - boolean hasLockAccess(final Procedure proc); + boolean hasLockAccess(Procedure proc); Procedure getExclusiveLockOwnerProcedure(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index efb8f9a..ecf72e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -531,7 +531,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final String namespace = table.getNamespaceAsString(); final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); final LockAndQueue tableLock = locking.getTableLock(table); - if (!namespaceLock.trySharedLock()) { + if (!namespaceLock.trySharedLock(procedure)) { waitProcedure(namespaceLock, procedure); logLockedResource(LockedResourceType.NAMESPACE, namespace); return true; @@ -560,9 +560,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); final LockAndQueue tableLock = locking.getTableLock(table); int waitingCount = 0; - - if (!tableLock.hasParentLock(procedure)) { - tableLock.releaseExclusiveLock(procedure); + if (tableLock.releaseExclusiveLock(procedure)) { waitingCount += wakeWaitingProcedures(tableLock); } if (namespaceLock.releaseSharedLock()) { @@ -591,12 +589,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { try { final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); final LockAndQueue tableLock = locking.getTableLock(table); - if (!namespaceLock.trySharedLock()) { + if (!namespaceLock.trySharedLock(procedure)) { waitProcedure(namespaceLock, procedure); return null; } - if (!tableLock.trySharedLock()) { + if (!tableLock.trySharedLock(procedure)) { namespaceLock.releaseSharedLock(); waitProcedure(tableLock, procedure); return null; @@ -690,11 +688,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { Arrays.sort(regionInfo, RegionInfo.COMPARATOR); schedLock(); try { - // If there is parent procedure, it would have already taken xlock, so no need to take - // shared lock here. Otherwise, take shared lock. - if (!procedure.hasParent() - && waitTableQueueSharedLock(procedure, table) == null) { - return true; + assert table != null; + if (waitTableSharedLock(procedure, table)) { + return true; } // acquire region xlocks or wait @@ -702,7 +698,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { LOG.info("{} checking lock on {}", procedure, regionInfo[i].getEncodedName()); - assert table != null; assert regionInfo[i] != null; assert regionInfo[i].getTable() != null; assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure; @@ -719,7 +714,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - if (!hasLock && !procedure.hasParent()) { + if (!hasLock) { wakeTableSharedLock(procedure, table); } return !hasLock; @@ -748,14 +743,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { schedLock(); try { int numProcs = 0; - final Procedure[] nextProcs = new Procedure[regionInfo.length]; + final Procedure[] nextProcs = new Procedure[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { assert regionInfo[i].getTable().equals(table); assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName()); if (regionLock.releaseExclusiveLock(procedure)) { - if (!regionLock.isEmpty()) { + if (!regionLock.isWaitingQueueEmpty()) { // release one procedure at the time since regions has an xlock nextProcs[numProcs++] = regionLock.removeFirst(); } else { @@ -769,11 +764,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { wakeProcedure(nextProcs[i]); } wakePollIfNeeded(numProcs); - if (!procedure.hasParent()) { - // release the table shared-lock. - // (if we have a parent, it is holding an xlock so we didn't take the shared-lock) - wakeTableSharedLock(procedure, table); - } + // release the table shared-lock. + wakeTableSharedLock(procedure, table); } finally { schedUnlock(); } @@ -789,15 +781,15 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param namespace Namespace to lock * @return true if the procedure has to wait for the namespace to be available */ - public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String namespace) { + public boolean waitNamespaceExclusiveLock(Procedure procedure, String namespace) { schedLock(); try { final LockAndQueue systemNamespaceTableLock = - locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); - if (!systemNamespaceTableLock.trySharedLock()) { + locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); + if (!systemNamespaceTableLock.trySharedLock(procedure)) { waitProcedure(systemNamespaceTableLock, procedure); logLockedResource(LockedResourceType.TABLE, - TableName.NAMESPACE_TABLE_NAME.getNameAsString()); + TableName.NAMESPACE_TABLE_NAME.getNameAsString()); return true; } @@ -826,13 +818,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); final LockAndQueue systemNamespaceTableLock = locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); - namespaceLock.releaseExclusiveLock(procedure); int waitingCount = 0; + if (namespaceLock.releaseExclusiveLock(procedure)) { + waitingCount += wakeWaitingProcedures(namespaceLock); + } if (systemNamespaceTableLock.releaseSharedLock()) { addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME)); waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); } - waitingCount += wakeWaitingProcedures(namespaceLock); wakePollIfNeeded(waitingCount); } finally { schedUnlock(); @@ -878,6 +871,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { schedLock(); try { final LockAndQueue lock = locking.getServerLock(serverName); + // Only SCP will acquire/release server lock so do not need to check the return value here. lock.releaseExclusiveLock(procedure); // We do not need to create a new queue so just pass null, as in tests we may pass procedures // other than ServerProcedureInterface @@ -925,10 +919,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { schedLock(); try { final LockAndQueue lock = locking.getPeerLock(peerId); - lock.releaseExclusiveLock(procedure); - addToRunQueue(peerRunQueue, getPeerQueue(peerId)); - int waitingCount = wakeWaitingProcedures(lock); - wakePollIfNeeded(waitingCount); + if (lock.releaseExclusiveLock(procedure)) { + addToRunQueue(peerRunQueue, getPeerQueue(peerId)); + int waitingCount = wakeWaitingProcedures(lock); + wakePollIfNeeded(waitingCount); + } } finally { schedUnlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java index 69266ee..afd9185 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java @@ -125,13 +125,8 @@ class SchemaLocking { List> waitingProcedures = new ArrayList<>(); - for (Procedure procedure : queue) { - if (!(procedure instanceof LockProcedure)) { - continue; - } - - waitingProcedures.add(procedure); - } + queue.filterWaitingQueue(p -> p instanceof LockProcedure) + .forEachOrdered(waitingProcedures::add); return new LockedResource(resourceType, resourceName, lockType, exclusiveLockOwnerProcedure, sharedLockCount, waitingProcedures); -- 2.7.4