diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 8b343d5..00b49aa 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -78,6 +78,9 @@ public abstract class Procedure implements Comparable { private int childrenLatch = 0; private long lastUpdate; + // TODO: it will be nice having pointers to allow the scheduler doing suspend/resume tricks + private boolean suspended = false; + private RemoteProcedureException exception = null; private byte[] result = null; @@ -271,7 +274,7 @@ public abstract class Procedure implements Comparable { } public long getParentProcId() { - return parentProcId; + return parentProcId.longValue(); } public NonceKey getNonceKey() { @@ -323,6 +326,23 @@ public abstract class Procedure implements Comparable { return false; } + /** + * @return true if the procedure is in a suspended state, + * waiting for the resources required to execute the procedure will became available. + */ + public synchronized boolean isSuspended() { + return suspended; + } + + public synchronized void suspend() { + suspended = true; + } + + public synchronized void resume() { + assert isSuspended() : this + " expected suspended state, got " + state; + suspended = false; + } + public synchronized RemoteProcedureException getException() { return exception; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 24b5790..7b35e25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; +import java.util.Arrays; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; @@ -106,6 +107,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } private void doAdd(final Procedure proc, final boolean addFront) { + doAdd(proc, addFront, true); + } + + private void doAdd(final Procedure proc, final boolean addFront, final boolean notify) { schedLock.lock(); try { if (isTableProcedure(proc)) { @@ -120,7 +125,9 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { throw new UnsupportedOperationException( "RQs for non-table/non-server procedures are not implemented yet"); } - schedWaitCond.signal(); + if (notify) { + schedWaitCond.signal(); + } } finally { schedLock.unlock(); } @@ -128,12 +135,28 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private > void doAdd(final FairQueue fairq, final Queue queue, final Procedure proc, final boolean addFront) { + if (proc.isSuspended()) return; + queue.add(proc, addFront); + if (!(queue.isSuspended() || queue.hasExclusiveLock())) { + // the queue is not suspended or removed from the fairq (run-queue) + // because someone has an xlock on it. + // so, if the queue is not-linked we should add it if (queue.size() == 1 && !IterableList.isLinked(queue)) { fairq.add(queue); } queueSize++; + } else if (proc.hasParent() && queue.isLockOwner(proc.getParentProcId())) { + assert addFront : "expected to add a child in the front"; + assert !queue.isSuspended() : "unexpected suspended state for the queue"; + // our (proc) parent has the xlock, + // so the queue is not in the fairq (run-queue) + // add it back to let the child run (inherit the lock) + if (!IterableList.isLinked(queue)) { + fairq.add(queue); + } + queueSize++; } } @@ -188,7 +211,16 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { this.queueSize--; if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) { removeFromRunQueue(fairq, rq); + } else if (pollResult.hasParent() && rq.isLockOwner(pollResult.getParentProcId())) { + // if the rq is in the fairq because of runnable child + // check if the next procedure is still a child. + // if not, remove the rq from the fairq and go back to the xlock state + Procedure nextProc = rq.peek(); + if (nextProc != null && nextProc.getParentProcId() != pollResult.getParentProcId()) { + removeFromRunQueue(fairq, rq); + } } + return pollResult; } @@ -303,14 +335,19 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) { + return waitEvent(event, /* lockEvent= */false, procedure, suspendQueue); + } + + private boolean waitEvent(ProcedureEvent event, boolean lockEvent, + Procedure procedure, boolean suspendQueue) { synchronized (event) { if (event.isReady()) { + if (lockEvent) { + event.setReady(false); + } return false; } - // TODO: Suspend single procedure not implemented yet, fallback to suspending the queue - if (!suspendQueue) suspendQueue = true; - if (isTableProcedure(procedure)) { waitTableEvent(event, procedure, suspendQueue); } else if (isServerProcedure(procedure)) { @@ -336,7 +373,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { TableQueue queue = getTableQueue(tableName); if (queue.isSuspended()) return; - // TODO: if !suspendQueue + if (!suspendQueue) { + suspendProcedure(event, procedure); + return; + } if (isDebugEnabled) { LOG.debug("Suspend table queue " + tableName); @@ -359,7 +399,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { ServerQueue queue = getServerQueue(serverName); if (queue.isSuspended()) return; - // TODO: if !suspendQueue + if (!suspendQueue) { + suspendProcedure(event, procedure); + return; + } if (isDebugEnabled) { LOG.debug("Suspend server queue " + serverName); @@ -402,6 +445,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { addToRunQueue(serverRunQueue, queue); } + while (event.hasWaitingProcedures()) { + wakeProcedure(event.popWaitingProcedure(false)); + } + if (queueSize > 1) { schedWaitCond.signalAll(); } else if (queueSize > 0) { @@ -413,7 +460,41 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - public static class ProcedureEvent { + private void suspendProcedure(BaseProcedureEvent event, Procedure procedure) { + procedure.suspend(); + event.suspendProcedure(procedure); + } + + private void wakeProcedure(Procedure procedure) { + procedure.resume(); + doAdd(procedure, /* addFront= */ true, /* notify= */false); + } + + private static abstract class BaseProcedureEvent { + private ArrayDeque waitingProcedures = null; + + protected void suspendProcedure(Procedure proc) { + if (waitingProcedures == null) { + waitingProcedures = new ArrayDeque(); + } + waitingProcedures.addLast(proc); + } + + protected boolean hasWaitingProcedures() { + return waitingProcedures != null; + } + + protected Procedure popWaitingProcedure(boolean popFront) { + // it will be nice to use IterableList on a procedure and avoid allocations... + Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast(); + if (waitingProcedures.isEmpty()) { + waitingProcedures = null; + } + return proc; + } + } + + public static class ProcedureEvent extends BaseProcedureEvent { private final String description; private Queue waitingServers = null; @@ -588,9 +669,38 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } + private static class RegionEvent extends BaseProcedureEvent { + private final HRegionInfo regionInfo; + private boolean xlock = false; + + public RegionEvent(HRegionInfo regionInfo) { + this.regionInfo = regionInfo; + } + + public boolean tryExclusiveLock() { + boolean gotLock = !xlock; + xlock = true; + return gotLock; + } + + private void releaseExclusiveLock() { + xlock = false; + } + + public HRegionInfo getRegionInfo() { + return regionInfo; + } + + @Override + public String toString() { + return String.format("region %s event", regionInfo.getRegionNameAsString()); + } + } + public static class TableQueue extends QueueImpl { private final NamespaceQueue namespaceQueue; + private HashMap regionEventMap; private TableLock tableLock = null; public TableQueue(TableName tableName, NamespaceQueue namespaceQueue, int priority) { @@ -604,7 +714,41 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { @Override public synchronized boolean isAvailable() { - return super.isAvailable() && !namespaceQueue.hasExclusiveLock(); + // if there are no items in the queue, or the namespace is locked. + // we can't execute operation on this table + if (isEmpty() || namespaceQueue.hasExclusiveLock()) { + return false; + } + + if (hasExclusiveLock()) { + // if we have an exclusive lock already taken + // only child of the lock owner can be executed + Procedure availProc = peek(); + return availProc != null && availProc.hasParent() && + isLockOwner(availProc.getParentProcId()); + } + + // no xlock + return true; + } + + public synchronized RegionEvent getRegionEvent(final HRegionInfo regionInfo) { + if (regionEventMap == null) { + regionEventMap = new HashMap(); + } + RegionEvent event = regionEventMap.get(regionInfo); + if (event == null) { + event = new RegionEvent(regionInfo); + regionEventMap.put(regionInfo, event); + } + return event; + } + + public synchronized void removeRegionEvent(final RegionEvent event) { + regionEventMap.remove(event.getRegionInfo()); + if (regionEventMap.isEmpty()) { + regionEventMap = null; + } } // TODO: We can abort pending/in-progress operation if the new call is @@ -633,6 +777,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return !tpi.getTableName().equals(TableName.NAMESPACE_TABLE_NAME); case READ: return false; + // region operations are using the shared-lock on the table + // and then they will grab an xlock on the region. + case SPLIT: + case MERGE: + case ASSIGN: + case UNASSIGN: + return false; default: break; } @@ -886,6 +1037,90 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } // ============================================================================ + // Region Locking Helpers + // ============================================================================ + public boolean waitRegions(final Procedure procedure, final TableName table, + final HRegionInfo... regionInfo) { + Arrays.sort(regionInfo); + + final TableQueue queue; + if (procedure.hasParent()) { + // the assumption is that the parent procedure have already the table xlock + queue = getTableQueueWithLock(table); + } else { + // acquire the table shared-lock + queue = tryAcquireTableQueueSharedLock(procedure, table); + if (queue == null) return false; + } + + // acquire region xlocks or wait + boolean hasLock = true; + final RegionEvent[] event = new RegionEvent[regionInfo.length]; + synchronized (queue) { + for (int i = 0; i < regionInfo.length; ++i) { + event[i] = queue.getRegionEvent(regionInfo[i]); + if (!event[i].tryExclusiveLock()) { + suspendProcedure(event[i], procedure); + hasLock = false; + while (i-- > 0) { + event[i].releaseExclusiveLock(); + } + break; + } + } + } + + if (!hasLock && !procedure.hasParent()) { + releaseTableSharedLock(procedure, table); + } + return hasLock; + } + + public void wakeRegions(final Procedure procedure,final TableName table, + final HRegionInfo... regionInfo) { + Arrays.sort(regionInfo); + + final TableQueue queue = getTableQueueWithLock(table); + + int numProcs = 0; + final Procedure[] nextProcs = new Procedure[regionInfo.length]; + synchronized (queue) { + for (int i = 0; i < regionInfo.length; ++i) { + RegionEvent event = queue.getRegionEvent(regionInfo[i]); + event.releaseExclusiveLock(); + if (event.hasWaitingProcedures()) { + // release one procedure at the time since regions has an xlock + nextProcs[numProcs++] = event.popWaitingProcedure(true); + } else { + queue.removeRegionEvent(event); + } + } + } + + // awake procedures if any + schedLock.lock(); + try { + for (int i = numProcs - 1; i >= 0; --i) { + wakeProcedure(nextProcs[i]); + } + + if (numProcs > 1) { + schedWaitCond.signalAll(); + } else if (numProcs > 0) { + schedWaitCond.signal(); + } + + if (!procedure.hasParent()) { + // release the table shared-lock. + // (if we have a parent, it is holding an xlock so we didn't take the shared-lock) + releaseTableSharedLock(procedure, table); + } + } finally { + schedLock.unlock(); + } + } + + // ============================================================================ // Namespace Locking Helpers // ============================================================================ /** @@ -1079,6 +1314,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return sharedLock == 1; } + public synchronized boolean isLockOwner(long procId) { + return exclusiveLockProcIdOwner == procId; + } + public synchronized boolean tryExclusiveLock(long procIdOwner) { assert procIdOwner != Long.MIN_VALUE; if (isLocked()) return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java index cc088f3..deaf406 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; public interface TableProcedureInterface { public enum TableOperationType { CREATE, DELETE, DISABLE, EDIT, ENABLE, READ, + SPLIT, MERGE, ASSIGN, UNASSIGN, /* region operations */ }; /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index bb5fe6a..37954e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.procedure; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -34,12 +33,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; @@ -66,7 +67,7 @@ public class TestMasterProcedureScheduler { @After public void tearDown() throws IOException { - assertEquals(0, queue.size()); + assertEquals("proc-queue expected to be empty", 0, queue.size()); } @Test @@ -352,6 +353,139 @@ public class TestMasterProcedureScheduler { assertEquals(4, procId); } + @Test + public void testVerifyRegionLocks() throws Exception { + final TableName tableName = TableName.valueOf("testtb"); + final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b")); + final HRegionInfo regionB = new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c")); + final HRegionInfo regionC = new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d")); + + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + queue.addBack(new TestRegionProcedure(2, tableName, + TableProcedureInterface.TableOperationType.MERGE, regionA, regionB)); + queue.addBack(new TestRegionProcedure(3, tableName, + TableProcedureInterface.TableOperationType.SPLIT, regionA)); + queue.addBack(new TestRegionProcedure(4, tableName, + TableProcedureInterface.TableOperationType.SPLIT, regionB)); + queue.addBack(new TestRegionProcedure(5, tableName, + TableProcedureInterface.TableOperationType.UNASSIGN, regionC)); + + // Fetch the 1st item and take the write lock + Procedure proc = queue.poll(); + assertEquals(1, proc.getProcId()); + assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName)); + + // everything is locked by the table operation + assertEquals(null, queue.poll(0)); + + // release the table lock + queue.releaseTableExclusiveLock(proc, tableName); + + // Fetch the 2nd item and the the lock on regionA and regionB + Procedure mergeProc = queue.poll(); + assertEquals(2, mergeProc.getProcId()); + assertEquals(true, queue.waitRegions(mergeProc, tableName, regionA, regionB)); + + // Fetch the 3rd item and the try to lock region A which will fail + // because already locked. this procedure will go in waiting. + // (this stuff will be explicit until we get rid of the zk-lock) + Procedure procA = queue.poll(); + assertEquals(3, procA.getProcId()); + assertEquals(false, queue.waitRegions(procA, tableName, regionA)); + + // Fetch the 4th item, same story as the 3rd + Procedure procB = queue.poll(); + assertEquals(4, procB.getProcId()); + assertEquals(false, queue.waitRegions(procB, tableName, regionB)); + + // Fetch the 5th item, since it is a non-locked region we are able to execute it + Procedure procC = queue.poll(); + assertEquals(5, procC.getProcId()); + assertEquals(true, queue.waitRegions(procC, tableName, regionC)); + + // 3rd and 4th are in the region suspended queue + assertEquals(null, queue.poll(0)); + + // Release region A-B from merge operation (procId=2) + queue.wakeRegions(mergeProc, tableName, regionA, regionB); + + // Fetch the 3rd item, now the lock on the region is available + procA = queue.poll(); + assertEquals(3, procA.getProcId()); + assertEquals(true, queue.waitRegions(procA, tableName, regionA)); + + // Fetch the 4th item, now the lock on the region is available + procB = queue.poll(); + assertEquals(4, procB.getProcId()); + assertEquals(true, queue.waitRegions(procB, tableName, regionB)); + + // release the locks on the regions + queue.wakeRegions(procA, tableName, regionA); + queue.wakeRegions(procB, tableName, regionB); + queue.wakeRegions(procC, tableName, regionC); + } + + @Test + public void testVerifySubProcRegionLocks() throws Exception { + final TableName tableName = TableName.valueOf("testVerifySubProcRegionLocks"); + final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b")); + final HRegionInfo regionB = new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c")); + final HRegionInfo regionC = new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d")); + + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.ENABLE)); + + // Fetch the 1st item from the queue, "the root procedure" and take the table lock + Procedure rootProc = queue.poll(); + assertEquals(1, rootProc.getProcId()); + assertEquals(true, queue.tryAcquireTableExclusiveLock(rootProc, tableName)); + assertEquals(null, queue.poll(0)); + + // Execute the 1st step of the root-proc. + // we should get 3 sub-proc back, one for each region. + // (this step is done by the executor/rootProc, we are simulating it) + Procedure[] subProcs = new Procedure[] { + new TestRegionProcedure(1, 2, tableName, + TableProcedureInterface.TableOperationType.ASSIGN, regionA), + new TestRegionProcedure(1, 3, tableName, + TableProcedureInterface.TableOperationType.ASSIGN, regionB), + new TestRegionProcedure(1, 4, tableName, + TableProcedureInterface.TableOperationType.ASSIGN, regionC), + }; + + // at this point the rootProc is going in a waiting state + // and the sub-procedures will be added in the queue. + // (this step is done by the executor, we are simulating it) + for (int i = subProcs.length - 1; i >= 0; --i) { + queue.addFront(subProcs[i]); + } + assertEquals(subProcs.length, queue.size()); + + // we should be able to fetch and execute all the sub-procs, + // since they are operating on different regions + for (int i = 0; i < subProcs.length; ++i) { + TestRegionProcedure regionProc = (TestRegionProcedure)queue.poll(0); + assertEquals(subProcs[i].getProcId(), regionProc.getProcId()); + assertEquals(true, queue.waitRegions(regionProc, tableName, regionProc.getRegionInfo())); + } + + // nothing else in the queue + assertEquals(null, queue.poll(0)); + + // release all the region locks + for (int i = 0; i < subProcs.length; ++i) { + TestRegionProcedure regionProc = (TestRegionProcedure)subProcs[i]; + queue.wakeRegions(regionProc, tableName, regionProc.getRegionInfo()); + } + + // nothing else in the queue + assertEquals(null, queue.poll(0)); + + // release the table lock (for the root procedure) + queue.releaseTableExclusiveLock(rootProc, tableName); + } + /** * Verify that "write" operations for a single table are serialized, * but different tables can be executed in parallel. @@ -528,6 +662,32 @@ public class TestMasterProcedureScheduler { } } + public static class TestRegionProcedure extends TestTableProcedure { + private final HRegionInfo[] regionInfo; + + public TestRegionProcedure() { + throw new UnsupportedOperationException("recovery should not be triggered here"); + } + + public TestRegionProcedure(long procId, TableName tableName, TableOperationType opType, + HRegionInfo... regionInfo) { + this(-1, procId, tableName, opType, regionInfo); + } + + public TestRegionProcedure(long parentProcId, long procId, TableName tableName, + TableOperationType opType, HRegionInfo... regionInfo) { + super(procId, tableName, opType); + this.regionInfo = regionInfo; + if (parentProcId > 0) { + setParentProcId(parentProcId); + } + } + + public HRegionInfo[] getRegionInfo() { + return regionInfo; + } + } + public static class TestNamespaceProcedure extends TestProcedure implements TableProcedureInterface { private final TableOperationType opType;