diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java index cb091ce..2b49113 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java @@ -259,7 +259,7 @@ implements TableProcedureInterface { @Override protected boolean acquireLock(final MasterProcedureEnv env) { - return env.getProcedureQueue().waitRegions( + return !env.getProcedureQueue().waitRegions( this, getTableName(), regionsToMerge[0], regionsToMerge[1]); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 84ecf22..51bdf52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -133,7 +133,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { // a group for all the non-table/non-server procedures or try to find a key for your // non-table/non-server procedures and implement something similar to the TableRunQueue. throw new UnsupportedOperationException( - "RQs for non-table/non-server procedures are not implemented yet"); + "RQs for non-table/non-server procedures are not implemented yet: " + proc); } if (notify) { schedWaitCond.signal(); @@ -291,7 +291,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { TableProcedureInterface iProcTable = (TableProcedureInterface)proc; boolean tableDeleted; if (proc.hasException()) { - IOException procEx = proc.getException().unwrapRemoteException(); + IOException procEx = proc.getException().unwrapRemoteException(); if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { // create failed because the table already exist tableDeleted = !(procEx instanceof TableExistsException); @@ -341,16 +341,30 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { // ============================================================================ // Event Helpers // ============================================================================ - public boolean waitEvent(ProcedureEvent event, Procedure procedure) { + /** + * Suspend the procedure if the event is not ready yet. + * @param event the event to wait on + * @param procedure the procedure waiting on the event + * @return true if the procedure has to wait for the event to be ready, false otherwise. + */ + public boolean waitEvent(final ProcedureEvent event, final Procedure procedure) { return waitEvent(event, procedure, false); } - public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) { + /** + * Suspend the procedure if the event is not ready yet. + * @param event the event to wait on + * @param procedure the procedure waiting on the event + * @param suspendQueue true if the entire queue of the procedure should be suspended + * @return true if the procedure has to wait for the event to be ready, false otherwise. + */ + public boolean waitEvent(final ProcedureEvent event, final Procedure procedure, + final boolean suspendQueue) { return waitEvent(event, /* lockEvent= */false, procedure, suspendQueue); } - private boolean waitEvent(ProcedureEvent event, boolean lockEvent, - Procedure procedure, boolean suspendQueue) { + private boolean waitEvent(final ProcedureEvent event, final boolean lockEvent, + final Procedure procedure, final boolean suspendQueue) { synchronized (event) { if (event.isReady()) { if (lockEvent) { @@ -371,13 +385,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { // a group for all the non-table/non-server procedures or try to find a key for your // non-table/non-server procedures and implement something similar to the TableRunQueue. throw new UnsupportedOperationException( - "RQs for non-table/non-server procedures are not implemented yet"); + "RQs for non-table/non-server procedures are not implemented yet: " + procedure); } } return true; } - private void waitTableEvent(ProcedureEvent event, Procedure procedure) { + private void waitTableEvent(final ProcedureEvent event, final Procedure procedure) { final TableName tableName = getTableName(procedure); final boolean isDebugEnabled = LOG.isDebugEnabled(); @@ -398,7 +412,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - private void waitServerEvent(ProcedureEvent event, Procedure procedure) { + private void waitServerEvent(final ProcedureEvent event, final Procedure procedure) { final ServerName serverName = getServerName(procedure); final boolean isDebugEnabled = LOG.isDebugEnabled(); @@ -420,39 +434,36 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - public void suspend(ProcedureEvent event) { - final boolean isDebugEnabled = LOG.isDebugEnabled(); + /** + * Mark the event has not ready. + * procedures calling waitEvent() will be suspended. + * @param event the event to mark as suspended/not ready + */ + public void suspend(final ProcedureEvent event) { + final boolean isTraceEnabled = LOG.isTraceEnabled(); synchronized (event) { event.setReady(false); - if (isDebugEnabled) { - LOG.debug("Suspend event " + event); + if (isTraceEnabled) { + LOG.trace("Suspend event " + event); } } } - public void wake(ProcedureEvent event) { - final boolean isDebugEnabled = LOG.isDebugEnabled(); + /** + * Wake every procedure waiting for the specified event + * @param event the event to wait + */ + public void wake(final ProcedureEvent event) { + final boolean isTraceEnabled = LOG.isTraceEnabled(); synchronized (event) { event.setReady(true); - if (isDebugEnabled) { - LOG.debug("Wake event " + event); + if (isTraceEnabled) { + LOG.trace("Wake event " + event); } schedLock.lock(); try { - while (event.hasWaitingTables()) { - Queue queue = event.popWaitingTable(); - addToRunQueue(tableRunQueue, queue); - } - // TODO: This will change once we have the new AM - while (event.hasWaitingServers()) { - Queue queue = event.popWaitingServer(); - addToRunQueue(serverRunQueue, queue); - } - - while (event.hasWaitingProcedures()) { - wakeProcedure(event.popWaitingProcedure(false)); - } + popEventWaitingObjects(event); if (queueSize > 1) { schedWaitCond.signalAll(); @@ -465,12 +476,58 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - private void suspendProcedure(BaseProcedureEvent event, Procedure procedure) { + /** + * Wake every procedure waiting for the specified events + * @param events the list of events to wake + * @param count the number of events in the array to wake + */ + public void wakeBatch(final ProcedureEvent[] events, final int count) { + final boolean isTraceEnabled = LOG.isTraceEnabled(); + schedLock.lock(); + try { + for (int i = 0; i < count; ++i) { + final ProcedureEvent event = events[i]; + synchronized (event) { + event.setReady(true); + if (isTraceEnabled) { + LOG.trace("Wake event " + event); + } + popEventWaitingObjects(event); + } + } + + if (queueSize > 1) { + schedWaitCond.signalAll(); + } else if (queueSize > 0) { + schedWaitCond.signal(); + } + } finally { + schedLock.unlock(); + } + } + + private void popEventWaitingObjects(final ProcedureEvent event) { + while (event.hasWaitingTables()) { + Queue queue = event.popWaitingTable(); + addToRunQueue(tableRunQueue, queue); + } + // TODO: This will change once we have the new AM + while (event.hasWaitingServers()) { + Queue queue = event.popWaitingServer(); + addToRunQueue(serverRunQueue, queue); + } + + while (event.hasWaitingProcedures()) { + wakeProcedure(event.popWaitingProcedure(false)); + } + } + + private void suspendProcedure(final BaseProcedureEvent event, final Procedure procedure) { procedure.suspend(); event.suspendProcedure(procedure); } - private void wakeProcedure(Procedure procedure) { + private void wakeProcedure(final Procedure procedure) { procedure.resume(); doAdd(procedure, /* addFront= */ true, /* notify= */false); } @@ -478,7 +535,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private static abstract class BaseProcedureEvent { private ArrayDeque waitingProcedures = null; - protected void suspendProcedure(Procedure proc) { + protected void suspendProcedure(final Procedure proc) { if (waitingProcedures == null) { waitingProcedures = new ArrayDeque(); } @@ -489,7 +546,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return waitingProcedures != null; } - protected Procedure popWaitingProcedure(boolean popFront) { + protected Procedure popWaitingProcedure(final boolean popFront) { // it will be nice to use IterableList on a procedure and avoid allocations... Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast(); if (waitingProcedures.isEmpty()) { @@ -506,6 +563,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private Queue waitingTables = null; private boolean ready = false; + public ProcedureEvent() { + this(null); + } + public ProcedureEvent(String description) { this.description = description; } @@ -548,9 +609,16 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return node; } + protected String getDescription() { + if (description == null) { + throw new UnsupportedOperationException(); + } + return description; + } + @Override public String toString() { - return String.format("ProcedureEvent(%s)", description); + return String.format("%s(%s)", getClass().getSimpleName(), getDescription()); } } @@ -692,9 +760,9 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return exclusiveLockProcIdOwner == procId; } - public boolean tryExclusiveLock(long procIdOwner) { + public boolean tryExclusiveLock(final long procIdOwner) { assert procIdOwner != Long.MIN_VALUE; - if (hasExclusiveLock()) return false; + if (hasExclusiveLock() && !isLockOwner(procIdOwner)) return false; exclusiveLockProcIdOwner = procIdOwner; return true; } @@ -1077,10 +1145,23 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { // ============================================================================ // Region Locking Helpers // ============================================================================ + /** + * Suspend the procedure if the specified region is already locked. + * @param procedure the procedure trying to acquire the lock on the region + * @param regionInfo the region we are trying to lock + * @return true if the procedure has to wait for the regions to be available + */ public boolean waitRegion(final Procedure procedure, final HRegionInfo regionInfo) { return waitRegions(procedure, regionInfo.getTable(), regionInfo); } + /** + * Suspend the procedure if the specified set of regions are already locked. + * @param procedure the procedure trying to acquire the lock on the regions + * @param table the table name of the regions we are trying to lock + * @param regionInfo the list of regions we are trying to lock + * @return true if the procedure has to wait for the regions to be available + */ public boolean waitRegions(final Procedure procedure, final TableName table, final HRegionInfo... regionInfo) { Arrays.sort(regionInfo); @@ -1092,7 +1173,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } else { // acquire the table shared-lock queue = tryAcquireTableQueueSharedLock(procedure, table); - if (queue == null) return false; + if (queue == null) return true; } // acquire region xlocks or wait @@ -1101,6 +1182,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { synchronized (queue) { for (int i = 0; i < regionInfo.length; ++i) { assert regionInfo[i].getTable().equals(table); + assert i == 0 || regionInfo[i] != regionInfo[i-1] : "duplicate region: " + regionInfo[i]; + event[i] = queue.getRegionEvent(regionInfo[i]); if (!event[i].tryExclusiveLock(procedure.getProcId())) { suspendProcedure(event[i], procedure); @@ -1116,13 +1199,23 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (!hasLock && !procedure.hasParent()) { releaseTableSharedLock(procedure, table); } - return hasLock; + return !hasLock; } + /** + * Wake the procedures waiting for the specified region + * @param procedure the procedure that was holding the region + * @param regionInfo the region the procedure was holding + */ public void wakeRegion(final Procedure procedure, final HRegionInfo regionInfo) { wakeRegions(procedure, regionInfo.getTable(), regionInfo); } + /** + * Wake the procedures waiting for the specified regions + * @param procedure the procedure that was holding the regions + * @param regionInfo the list of regions the procedure was holding + */ public void wakeRegions(final Procedure procedure,final TableName table, final HRegionInfo... regionInfo) { Arrays.sort(regionInfo); @@ -1132,8 +1225,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { int numProcs = 0; final Procedure[] nextProcs = new Procedure[regionInfo.length]; synchronized (queue) { + HRegionInfo prevRegion = null; for (int i = 0; i < regionInfo.length; ++i) { assert regionInfo[i].getTable().equals(table); + assert i == 0 || regionInfo[i] != regionInfo[i-1] : "duplicate region: " + regionInfo[i]; + RegionEvent event = queue.getRegionEvent(regionInfo[i]); event.releaseExclusiveLock(); if (event.hasWaitingProcedures()) { @@ -1365,9 +1461,9 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return exclusiveLockProcIdOwner == procId; } - public synchronized boolean tryExclusiveLock(long procIdOwner) { + public synchronized boolean tryExclusiveLock(final long procIdOwner) { assert procIdOwner != Long.MIN_VALUE; - if (isLocked()) return false; + if (isLocked() && !isLockOwner(procIdOwner)) return false; exclusiveLockProcIdOwner = procIdOwner; return true; } @@ -1415,6 +1511,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } public void add(final Procedure proc, final boolean addToFront) { + assert !runnables.contains(proc) : "proc already in the queue " + proc; if (addToFront) { addFront(proc); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 3de6d36..501f8db7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -417,24 +417,24 @@ public class TestMasterProcedureScheduler { // Fetch the 2nd item and the the lock on regionA and regionB Procedure mergeProc = queue.poll(); assertEquals(2, mergeProc.getProcId()); - assertEquals(true, queue.waitRegions(mergeProc, tableName, regionA, regionB)); + assertEquals(false, queue.waitRegions(mergeProc, tableName, regionA, regionB)); // Fetch the 3rd item and the try to lock region A which will fail // because already locked. this procedure will go in waiting. // (this stuff will be explicit until we get rid of the zk-lock) Procedure procA = queue.poll(); assertEquals(3, procA.getProcId()); - assertEquals(false, queue.waitRegions(procA, tableName, regionA)); + assertEquals(true, queue.waitRegions(procA, tableName, regionA)); // Fetch the 4th item, same story as the 3rd Procedure procB = queue.poll(); assertEquals(4, procB.getProcId()); - assertEquals(false, queue.waitRegions(procB, tableName, regionB)); + assertEquals(true, queue.waitRegions(procB, tableName, regionB)); // Fetch the 5th item, since it is a non-locked region we are able to execute it Procedure procC = queue.poll(); assertEquals(5, procC.getProcId()); - assertEquals(true, queue.waitRegions(procC, tableName, regionC)); + assertEquals(false, queue.waitRegions(procC, tableName, regionC)); // 3rd and 4th are in the region suspended queue assertEquals(null, queue.poll(0)); @@ -445,12 +445,12 @@ public class TestMasterProcedureScheduler { // Fetch the 3rd item, now the lock on the region is available procA = queue.poll(); assertEquals(3, procA.getProcId()); - assertEquals(true, queue.waitRegions(procA, tableName, regionA)); + assertEquals(false, queue.waitRegions(procA, tableName, regionA)); // Fetch the 4th item, now the lock on the region is available procB = queue.poll(); assertEquals(4, procB.getProcId()); - assertEquals(true, queue.waitRegions(procB, tableName, regionB)); + assertEquals(false, queue.waitRegions(procB, tableName, regionB)); // release the locks on the regions queue.wakeRegions(procA, tableName, regionA); @@ -499,7 +499,7 @@ public class TestMasterProcedureScheduler { for (int i = 0; i < subProcs.length; ++i) { TestRegionProcedure regionProc = (TestRegionProcedure)queue.poll(0); assertEquals(subProcs[i].getProcId(), regionProc.getProcId()); - assertEquals(true, queue.waitRegions(regionProc, tableName, regionProc.getRegionInfo())); + assertEquals(false, queue.waitRegions(regionProc, tableName, regionProc.getRegionInfo())); } // nothing else in the queue @@ -534,7 +534,7 @@ public class TestMasterProcedureScheduler { // Suspend // TODO: If we want to keep the zk-lock we need to retain the lock on suspend ProcedureEvent event = new ProcedureEvent("testSuspendedTableQueueEvent"); - queue.waitEvent(event, proc, true); + assertEquals(true, queue.waitEvent(event, proc, true)); queue.releaseTableExclusiveLock(proc, tableName); assertEquals(null, queue.poll(0)); @@ -566,7 +566,7 @@ public class TestMasterProcedureScheduler { // suspend ProcedureEvent event = new ProcedureEvent("testSuspendedProcedureEvent"); - queue.waitEvent(event, proc); + assertEquals(true, queue.waitEvent(event, proc)); proc = queue.poll(); assertEquals(2, proc.getProcId());