diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index b67d43b..25f594f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -891,4 +891,16 @@ public abstract class Procedure implements Comparable { return proc; } + + /** + * @param a the first procedure to be compared. + * @param b the second procedure to be compared. + * @return true if the two procedures have the same parent + */ + public static boolean haveSameParent(final Procedure a, final Procedure b) { + if (a.hasParent() && b.hasParent()) { + return a.getParentProcId() == b.getParentProcId(); + } + return false; + } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java index 65df692..64c41ee 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.procedure2; +import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -69,6 +71,7 @@ public interface ProcedureRunnableSet { * Returns the number of elements in this collection. * @return the number of elements in this collection. */ + @VisibleForTesting int size(); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 548fb00..26ecd94 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.master.procedure; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; @@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode; import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; +import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; /** * ProcedureRunnableSet for the Master Procedures. @@ -78,7 +81,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private final FairQueue serverRunQueue = new FairQueue(); private final FairQueue tableRunQueue = new FairQueue(); - private int queueSize = 0; private final ServerQueue[] serverBuckets = new ServerQueue[128]; private NamespaceQueue namespaceMap = null; @@ -148,14 +150,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (proc.isSuspended()) return; queue.add(proc, addFront); - if (!(queue.isSuspended() || queue.hasExclusiveLock())) { + if (!(queue.isSuspended() || + (queue.hasExclusiveLock() && !queue.isLockOwner(proc.getProcId())))) { // the queue is not suspended or removed from the fairq (run-queue) // because someone has an xlock on it. // so, if the queue is not-linked we should add it if (queue.size() == 1 && !AvlIterableList.isLinked(queue)) { fairq.add(queue); } - queueSize++; } else if (queue.hasParentLock(proc)) { assert addFront : "expected to add a child in the front"; assert !queue.isSuspended() : "unexpected suspended state for the queue"; @@ -165,7 +167,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (!AvlIterableList.isLinked(queue)) { fairq.add(queue); } - queueSize++; } } @@ -179,13 +180,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { Procedure pollResult = null; schedLock.lock(); try { - if (queueSize == 0) { + if (!hasRunnables()) { if (waitNsec < 0) { schedWaitCond.await(); } else { schedWaitCond.awaitNanos(waitNsec); } - if (queueSize == 0) { + if (!hasRunnables()) { return null; } } @@ -209,6 +210,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return pollResult; } + private boolean hasRunnables() { + return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables(); + } + private > Procedure doPoll(final FairQueue fairq) { final Queue rq = fairq.poll(); if (rq == null || !rq.isAvailable()) { @@ -218,13 +223,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { assert !rq.isSuspended() : "rq=" + rq + " is suspended"; final Procedure pollResult = rq.peek(); final boolean xlockReq = rq.requireExclusiveLock(pollResult); - if (xlockReq && rq.isLocked() && !rq.hasParentLock(pollResult)) { + if (xlockReq && rq.isLocked() && !rq.hasLockAccess(pollResult)) { // someone is already holding the lock (e.g. shared lock). avoid a yield return null; } rq.poll(); - this.queueSize--; if (rq.isEmpty() || xlockReq) { removeFromRunQueue(fairq, rq); } else if (rq.hasParentLock(pollResult)) { @@ -232,7 +236,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { // check if the next procedure is still a child. // if not, remove the rq from the fairq and go back to the xlock state Procedure nextProc = rq.peek(); - if (nextProc != null && nextProc.getParentProcId() != pollResult.getParentProcId()) { + if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) { removeFromRunQueue(fairq, rq); } } @@ -255,7 +259,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); tableMap = null; - assert queueSize == 0 : "expected queue size to be 0, got " + queueSize; + assert size() == 0 : "expected queue size to be 0, got " + size(); } finally { schedLock.unlock(); } @@ -271,6 +275,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } + private void wakePollIfNeeded(final int waitingCount) { + if (waitingCount > 1) { + schedWaitCond.signalAll(); + } else if (waitingCount > 0) { + schedWaitCond.signal(); + } + } + @Override public void signalAll() { schedLock.lock(); @@ -285,14 +297,30 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { public int size() { schedLock.lock(); try { - return queueSize; + int count = 0; + + // Server queues + final AvlTreeIterator serverIter = new AvlTreeIterator(); + for (int i = 0; i < serverBuckets.length; ++i) { + serverIter.seekFirst(serverBuckets[i]); + while (serverIter.hasNext()) { + count += serverIter.next().size(); + } + } + + // Table queues + final AvlTreeIterator tableIter = new AvlTreeIterator(tableMap); + while (tableIter.hasNext()) { + count += tableIter.next().size(); + } + return count; } finally { schedLock.unlock(); } } @Override - public void completionCleanup(Procedure proc) { + public void completionCleanup(final Procedure proc) { if (proc instanceof TableProcedureInterface) { TableProcedureInterface iProcTable = (TableProcedureInterface)proc; boolean tableDeleted; @@ -310,7 +338,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); } if (tableDeleted) { - markTableAsDeleted(iProcTable.getTableName()); + markTableAsDeleted(iProcTable.getTableName(), proc); return; } } else { @@ -323,14 +351,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (AvlIterableList.isLinked(queue)) return; if (!queue.isEmpty()) { fairq.add(queue); - queueSize += queue.size(); } } private > void removeFromRunQueue(FairQueue fairq, Queue queue) { if (!AvlIterableList.isLinked(queue)) return; fairq.remove(queue); - queueSize -= queue.size(); } // ============================================================================ @@ -470,13 +496,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { schedLock.lock(); try { - popEventWaitingObjects(event); - - if (queueSize > 1) { - schedWaitCond.signalAll(); - } else if (queueSize > 0) { - schedWaitCond.signal(); - } + final int waitingCount = popEventWaitingObjects(event); + wakePollIfNeeded(waitingCount); } finally { schedLock.unlock(); } @@ -493,6 +514,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { final boolean isTraceEnabled = LOG.isTraceEnabled(); schedLock.lock(); try { + int waitingCount = 0; for (int i = 0; i < count; ++i) { final ProcedureEvent event = events[i]; synchronized (event) { @@ -500,36 +522,36 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (isTraceEnabled) { LOG.trace("Wake event " + event); } - popEventWaitingObjects(event); + waitingCount += popEventWaitingObjects(event); } } - - if (queueSize > 1) { - schedWaitCond.signalAll(); - } else if (queueSize > 0) { - schedWaitCond.signal(); - } + wakePollIfNeeded(waitingCount); } finally { schedLock.unlock(); } } - private void popEventWaitingObjects(final ProcedureEvent event) { + private int popEventWaitingObjects(final ProcedureEvent event) { + int count = 0; while (event.hasWaitingTables()) { final Queue queue = event.popWaitingTable(); queue.setSuspended(false); addToRunQueue(tableRunQueue, queue); + count += queue.size(); } // TODO: This will change once we have the new AM while (event.hasWaitingServers()) { final Queue queue = event.popWaitingServer(); queue.setSuspended(false); addToRunQueue(serverRunQueue, queue); + count += queue.size(); } while (event.hasWaitingProcedures()) { wakeProcedure(event.popWaitingProcedure(false)); + count++; } + return count; } private void suspendProcedure(final BaseProcedureEvent event, final Procedure procedure) { @@ -823,8 +845,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (hasExclusiveLock()) { // if we have an exclusive lock already taken // only child of the lock owner can be executed - Procedure availProc = peek(); - return availProc != null && hasParentLock(availProc); + final Procedure nextProc = peek(); + return nextProc != null && hasLockAccess(nextProc); } // no xlock @@ -1011,26 +1033,31 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { schedLock.lock(); TableQueue queue = getTableQueue(table); if (!queue.getNamespaceQueue().trySharedLock()) { + schedLock.unlock(); return false; } - if (!queue.tryExclusiveLock(procedure.getProcId())) { + if (!queue.tryExclusiveLock(procedure)) { queue.getNamespaceQueue().releaseSharedLock(); schedLock.unlock(); return false; } removeFromRunQueue(tableRunQueue, queue); + boolean hasParentLock = queue.hasParentLock(procedure); schedLock.unlock(); - // Zk lock is expensive... - boolean hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString()); - if (!hasXLock) { - schedLock.lock(); - queue.releaseExclusiveLock(); - queue.getNamespaceQueue().releaseSharedLock(); - addToRunQueue(tableRunQueue, queue); - schedLock.unlock(); + boolean hasXLock = true; + if (!hasParentLock) { + // Zk lock is expensive... + hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString()); + if (!hasXLock) { + schedLock.lock(); + if (!hasParentLock) queue.releaseExclusiveLock(); + queue.getNamespaceQueue().releaseSharedLock(); + addToRunQueue(tableRunQueue, queue); + schedLock.unlock(); + } } return hasXLock; } @@ -1041,15 +1068,16 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { * @param table the name of the table that has the exclusive lock */ public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) { - schedLock.lock(); - TableQueue queue = getTableQueue(table); - schedLock.unlock(); + final TableQueue queue = getTableQueueWithLock(table); + final boolean hasParentLock = queue.hasParentLock(procedure); - // Zk lock is expensive... - queue.releaseZkExclusiveLock(lockManager); + if (!hasParentLock) { + // Zk lock is expensive... + queue.releaseZkExclusiveLock(lockManager); + } schedLock.lock(); - queue.releaseExclusiveLock(); + if (!hasParentLock) queue.releaseExclusiveLock(); queue.getNamespaceQueue().releaseSharedLock(); addToRunQueue(tableRunQueue, queue); schedLock.unlock(); @@ -1116,17 +1144,19 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { * If there are new operations pending (e.g. a new create), * the remove will not be performed. * @param table the name of the table that should be marked as deleted + * @param procedure the procedure that is removing the table * @return true if deletion succeeded, false otherwise meaning that there are * other new operations pending for that table (e.g. a new create). */ - protected boolean markTableAsDeleted(final TableName table) { + @VisibleForTesting + protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) { final ReentrantLock l = schedLock; l.lock(); try { TableQueue queue = getTableQueue(table); if (queue == null) return true; - if (queue.isEmpty() && queue.tryExclusiveLock(0)) { + if (queue.isEmpty() && queue.tryExclusiveLock(procedure)) { // remove the table from the run-queue and the map if (AvlIterableList.isLinked(queue)) { tableRunQueue.remove(queue); @@ -1256,11 +1286,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { wakeProcedure(nextProcs[i]); } - if (numProcs > 1) { - schedWaitCond.signalAll(); - } else if (numProcs > 0) { - schedWaitCond.signal(); - } + wakePollIfNeeded(numProcs); if (!procedure.hasParent()) { // release the table shared-lock. @@ -1289,7 +1315,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (!tableQueue.trySharedLock()) return false; NamespaceQueue nsQueue = getNamespaceQueue(nsName); - boolean hasLock = nsQueue.tryExclusiveLock(procedure.getProcId()); + boolean hasLock = nsQueue.tryExclusiveLock(procedure); if (!hasLock) { tableQueue.releaseSharedLock(); } @@ -1333,7 +1359,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { schedLock.lock(); try { ServerQueue queue = getServerQueue(serverName); - if (queue.tryExclusiveLock(procedure.getProcId())) { + if (queue.tryExclusiveLock(procedure)) { removeFromRunQueue(serverRunQueue, queue); return true; } @@ -1473,10 +1499,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return proc.hasParent() && isLockOwner(proc.getParentProcId()); } - public synchronized boolean tryExclusiveLock(final long procIdOwner) { - assert procIdOwner != Long.MIN_VALUE; - if (isLocked() && !isLockOwner(procIdOwner)) return false; - exclusiveLockProcIdOwner = procIdOwner; + public synchronized boolean hasLockAccess(final Procedure proc) { + return isLockOwner(proc.getProcId()) || hasParentLock(proc); + } + + public synchronized boolean tryExclusiveLock(final Procedure proc) { + if (isLocked()) return hasLockAccess(proc); + exclusiveLockProcIdOwner = proc.getProcId(); return true; } @@ -1564,6 +1593,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private Queue currentQueue = null; private Queue queueHead = null; private int currentQuantum = 0; + private int size = 0; public FairQueue() { this(1); @@ -1573,9 +1603,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { this.quantum = quantum; } + public boolean hasRunnables() { + return size > 0; + } + public void add(Queue queue) { queueHead = AvlIterableList.append(queueHead, queue); if (currentQueue == null) setNextQueue(queueHead); + size++; } public void remove(Queue queue) { @@ -1584,6 +1619,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (currentQueue == queue) { setNextQueue(queueHead != null ? nextQueue : null); } + size--; } public Queue poll() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 7feeec5..58eaf3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -20,10 +20,7 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,8 +34,8 @@ import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -51,7 +48,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -@Category({MasterTests.class, MediumTests.class}) +@Category({MasterTests.class, SmallTests.class}) public class TestMasterProcedureScheduler { private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class); @@ -70,60 +67,6 @@ public class TestMasterProcedureScheduler { queue.clear(); } - @Test - public void testConcurrentCreateDelete() throws Exception { - final MasterProcedureScheduler procQueue = queue; - final TableName table = TableName.valueOf("testtb"); - final AtomicBoolean running = new AtomicBoolean(true); - final AtomicBoolean failure = new AtomicBoolean(false); - Thread createThread = new Thread() { - @Override - public void run() { - try { - TestTableProcedure proc = new TestTableProcedure(1, table, - TableProcedureInterface.TableOperationType.CREATE); - while (running.get() && !failure.get()) { - if (procQueue.tryAcquireTableExclusiveLock(proc, table)) { - procQueue.releaseTableExclusiveLock(proc, table); - } - } - } catch (Throwable e) { - LOG.error("create failed", e); - failure.set(true); - } - } - }; - - Thread deleteThread = new Thread() { - @Override - public void run() { - try { - TestTableProcedure proc = new TestTableProcedure(2, table, - TableProcedureInterface.TableOperationType.DELETE); - while (running.get() && !failure.get()) { - if (procQueue.tryAcquireTableExclusiveLock(proc, table)) { - procQueue.releaseTableExclusiveLock(proc, table); - } - procQueue.markTableAsDeleted(table); - } - } catch (Throwable e) { - LOG.error("delete failed", e); - failure.set(true); - } - } - }; - - createThread.start(); - deleteThread.start(); - for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) { - Thread.sleep(100); - } - running.set(false); - createThread.join(); - deleteThread.join(); - assertEquals(false, failure.get()); - } - /** * Verify simple create/insert/fetch/delete of the table queue. */ @@ -159,9 +102,11 @@ public class TestMasterProcedureScheduler { assertEquals(0, queue.size()); for (int i = 1; i <= NUM_TABLES; ++i) { - TableName tableName = TableName.valueOf(String.format("test-%04d", i)); + final TableName tableName = TableName.valueOf(String.format("test-%04d", i)); + final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName, + TableProcedureInterface.TableOperationType.DELETE); // complete the table deletion - assertTrue(queue.markTableAsDeleted(tableName)); + assertTrue(queue.markTableAsDeleted(tableName, dummyProc)); } } @@ -173,11 +118,14 @@ public class TestMasterProcedureScheduler { public void testCreateDeleteTableOperationsWithWriteLock() throws Exception { TableName tableName = TableName.valueOf("testtb"); + final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName, + TableProcedureInterface.TableOperationType.DELETE); + queue.addBack(new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT)); // table can't be deleted because one item is in the queue - assertFalse(queue.markTableAsDeleted(tableName)); + assertFalse(queue.markTableAsDeleted(tableName, dummyProc)); // fetch item and take a lock Procedure proc = queue.poll(); @@ -186,11 +134,11 @@ public class TestMasterProcedureScheduler { assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); // table can't be deleted because we have the lock assertEquals(0, queue.size()); - assertFalse(queue.markTableAsDeleted(tableName)); + assertFalse(queue.markTableAsDeleted(tableName, dummyProc)); // release the xlock queue.releaseTableExclusiveLock(proc, tableName); // complete the table deletion - assertTrue(queue.markTableAsDeleted(tableName)); + assertTrue(queue.markTableAsDeleted(tableName, proc)); } /** @@ -202,13 +150,16 @@ public class TestMasterProcedureScheduler { final TableName tableName = TableName.valueOf("testtb"); final int nitems = 2; + final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName, + TableProcedureInterface.TableOperationType.DELETE); + for (int i = 1; i <= nitems; ++i) { queue.addBack(new TestTableProcedure(i, tableName, TableProcedureInterface.TableOperationType.READ)); } // table can't be deleted because one item is in the queue - assertFalse(queue.markTableAsDeleted(tableName)); + assertFalse(queue.markTableAsDeleted(tableName, dummyProc)); Procedure[] procs = new Procedure[nitems]; for (int i = 0; i < nitems; ++i) { @@ -218,12 +169,12 @@ public class TestMasterProcedureScheduler { // take the rlock assertTrue(queue.tryAcquireTableSharedLock(proc, tableName)); // table can't be deleted because we have locks and/or items in the queue - assertFalse(queue.markTableAsDeleted(tableName)); + assertFalse(queue.markTableAsDeleted(tableName, dummyProc)); } for (int i = 0; i < nitems; ++i) { // table can't be deleted because we have locks - assertFalse(queue.markTableAsDeleted(tableName)); + assertFalse(queue.markTableAsDeleted(tableName, dummyProc)); // release the rlock queue.releaseTableSharedLock(procs[i], tableName); } @@ -231,7 +182,7 @@ public class TestMasterProcedureScheduler { // there are no items and no lock in the queeu assertEquals(0, queue.size()); // complete the table deletion - assertTrue(queue.markTableAsDeleted(tableName)); + assertTrue(queue.markTableAsDeleted(tableName, dummyProc)); } /** @@ -299,7 +250,7 @@ public class TestMasterProcedureScheduler { // remove table queue assertEquals(0, queue.size()); - assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName)); + assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName, wrProc)); } @Test @@ -355,6 +306,32 @@ public class TestMasterProcedureScheduler { } @Test + public void testVerifyNamespaceXLock() throws Exception { + String nsName = "ns1"; + TableName tableName = TableName.valueOf(nsName, "testtb"); + queue.addBack(new TestNamespaceProcedure(1, nsName, + TableProcedureInterface.TableOperationType.CREATE)); + queue.addBack(new TestTableProcedure(2, tableName, + TableProcedureInterface.TableOperationType.READ)); + + // Fetch the ns item and take the xlock + Procedure proc = queue.poll(); + assertEquals(1, proc.getProcId()); + assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(proc, nsName)); + + // the table operation can't be executed because the ns is locked + assertEquals(null, queue.poll(0)); + + // release the ns lock + queue.releaseNamespaceExclusiveLock(proc, nsName); + + proc = queue.poll(); + assertEquals(2, proc.getProcId()); + assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName)); + queue.releaseTableExclusiveLock(proc, tableName); + } + + @Test public void testSharedZkLock() throws Exception { final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); final String dir = TEST_UTIL.getDataTestDir("TestSharedZkLock").toString(); @@ -625,154 +602,80 @@ public class TestMasterProcedureScheduler { assertEquals(null, queue.poll(0)); } - /** - * Verify that "write" operations for a single table are serialized, - * but different tables can be executed in parallel. - */ - @Test(timeout=90000) - public void testConcurrentWriteOps() throws Exception { - final TestTableProcSet procSet = new TestTableProcSet(queue); + @Test + public void testParentXLockAndChildrenSharedLock() throws Exception { + final TableName tableName = TableName.valueOf("testParentXLockAndChildrenSharedLock"); + final HRegionInfo[] regions = new HRegionInfo[] { + new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b")), + new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c")), + new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d")), + }; - final int NUM_ITEMS = 10; - final int NUM_TABLES = 4; - final AtomicInteger opsCount = new AtomicInteger(0); - for (int i = 0; i < NUM_TABLES; ++i) { - TableName tableName = TableName.valueOf(String.format("testtb-%04d", i)); - for (int j = 1; j < NUM_ITEMS; ++j) { - procSet.addBack(new TestTableProcedure(i * 100 + j, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - opsCount.incrementAndGet(); - } - } - assertEquals(opsCount.get(), queue.size()); - - final Thread[] threads = new Thread[NUM_TABLES * 2]; - final HashSet concurrentTables = new HashSet(); - final ArrayList failures = new ArrayList(); - final AtomicInteger concurrentCount = new AtomicInteger(0); - for (int i = 0; i < threads.length; ++i) { - threads[i] = new Thread() { - @Override - public void run() { - while (opsCount.get() > 0) { - try { - Procedure proc = procSet.acquire(); - if (proc == null) { - queue.signalAll(); - if (opsCount.get() > 0) { - continue; - } - break; - } - - TableName tableId = procSet.getTableName(proc); - synchronized (concurrentTables) { - assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId)); - } - assertTrue(opsCount.decrementAndGet() >= 0); - try { - long procId = proc.getProcId(); - int concurrent = concurrentCount.incrementAndGet(); - assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES, - concurrent >= 1 && concurrent <= NUM_TABLES); - LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); - Thread.sleep(2000); - concurrent = concurrentCount.decrementAndGet(); - LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); - assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES); - } finally { - synchronized (concurrentTables) { - assertTrue(concurrentTables.remove(tableId)); - } - procSet.release(proc); - } - } catch (Throwable e) { - LOG.error("Failed " + e.getMessage(), e); - synchronized (failures) { - failures.add(e.getMessage()); - } - } finally { - queue.signalAll(); - } - } - } - }; - threads[i].start(); - } - for (int i = 0; i < threads.length; ++i) { - threads[i].join(); - } - assertTrue(failures.toString(), failures.isEmpty()); - assertEquals(0, opsCount.get()); - assertEquals(0, queue.size()); + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.CREATE)); - for (int i = 1; i <= NUM_TABLES; ++i) { - TableName table = TableName.valueOf(String.format("testtb-%04d", i)); - assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table)); + // fetch and acquire first xlock proc + Procedure parentProc = queue.poll(); + assertEquals(1, parentProc.getProcId()); + assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName)); + + // add child procedure + for (int i = 0; i < regions.length; ++i) { + queue.addFront(new TestRegionProcedure(1, 1 + i, tableName, + TableProcedureInterface.TableOperationType.ASSIGN, regions[i])); } - } - public static class TestTableProcSet { - private final MasterProcedureScheduler queue; + // add another xlock procedure (no parent) + queue.addBack(new TestTableProcedure(100, tableName, + TableProcedureInterface.TableOperationType.EDIT)); - public TestTableProcSet(final MasterProcedureScheduler queue) { - this.queue = queue; + // fetch and execute child + for (int i = 0; i < regions.length; ++i) { + final int regionIdx = regions.length - i - 1; + Procedure childProc = queue.poll(); + LOG.debug("fetch children " + childProc); + assertEquals(1 + regionIdx, childProc.getProcId()); + assertEquals(false, queue.waitRegion(childProc, regions[regionIdx])); + queue.wakeRegion(childProc, regions[regionIdx]); } - public void addBack(Procedure proc) { - queue.addBack(proc); - } + // nothing available, until xlock release + assertEquals(null, queue.poll(0)); - public void addFront(Procedure proc) { - queue.addFront(proc); - } + // release xlock + queue.releaseTableExclusiveLock(parentProc, tableName); - public Procedure acquire() { - Procedure proc = null; - boolean avail = false; - while (!avail) { - proc = queue.poll(); - if (proc == null) break; - switch (getTableOperationType(proc)) { - case CREATE: - case DELETE: - case EDIT: - avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc)); - break; - case READ: - avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc)); - break; - default: - throw new UnsupportedOperationException(); - } - if (!avail) { - addFront(proc); - LOG.debug("yield procId=" + proc); - } - } - return proc; - } + // fetch the other xlock proc + Procedure proc = queue.poll(); + assertEquals(100, proc.getProcId()); + assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); + queue.releaseTableExclusiveLock(proc, tableName); + } - public void release(Procedure proc) { - switch (getTableOperationType(proc)) { - case CREATE: - case DELETE: - case EDIT: - queue.releaseTableExclusiveLock(proc, getTableName(proc)); - break; - case READ: - queue.releaseTableSharedLock(proc, getTableName(proc)); - break; - } - } + @Test + public void testParentXLockAndChildrenXLock() throws Exception { + final TableName tableName = TableName.valueOf("testParentXLockAndChildrenXLock"); - public TableName getTableName(Procedure proc) { - return ((TableProcedureInterface)proc).getTableName(); - } + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.EDIT)); - public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) { - return ((TableProcedureInterface)proc).getTableOperationType(); - } + // fetch and acquire first xlock proc + Procedure parentProc = queue.poll(); + assertEquals(1, parentProc.getProcId()); + assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName)); + + // add child procedure + queue.addFront(new TestTableProcedure(1, 2, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + + // fetch the other xlock proc + Procedure proc = queue.poll(); + assertEquals(2, proc.getProcId()); + assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); + queue.releaseTableExclusiveLock(proc, tableName); + + // release xlock + queue.releaseTableExclusiveLock(parentProc, tableName); } public static class TestTableProcedure extends TestProcedure @@ -813,6 +716,19 @@ public class TestMasterProcedureScheduler { } } + public static class TestTableProcedureWithEvent extends TestTableProcedure { + private final ProcedureEvent event; + + public TestTableProcedureWithEvent(long procId, TableName tableName, TableOperationType opType) { + super(procId, tableName, opType); + event = new ProcedureEvent(tableName + " procId=" + procId); + } + + public ProcedureEvent getEvent() { + return event; + } + } + public static class TestRegionProcedure extends TestTableProcedure { private final HRegionInfo[] regionInfo; @@ -839,7 +755,7 @@ public class TestMasterProcedureScheduler { public void toStringClassDetails(final StringBuilder sb) { sb.append(getClass().getSimpleName()); sb.append(" (region="); - sb.append(getRegionInfo()); + sb.append(Arrays.toString(getRegionInfo())); sb.append(")"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java new file mode 100644 index 0000000..380067d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; +import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure; +import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedureWithEvent; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, MediumTests.class}) +public class TestMasterProcedureSchedulerConcurrency { + private static final Log LOG = LogFactory.getLog(TestMasterProcedureSchedulerConcurrency.class); + + private MasterProcedureScheduler queue; + private Configuration conf; + + @Before + public void setUp() throws IOException { + conf = HBaseConfiguration.create(); + queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager()); + } + + @After + public void tearDown() throws IOException { + assertEquals("proc-queue expected to be empty", 0, queue.size()); + queue.clear(); + } + + @Test(timeout=60000) + public void testConcurrentCreateDelete() throws Exception { + final MasterProcedureScheduler procQueue = queue; + final TableName table = TableName.valueOf("testtb"); + final AtomicBoolean running = new AtomicBoolean(true); + final AtomicBoolean failure = new AtomicBoolean(false); + Thread createThread = new Thread() { + @Override + public void run() { + try { + TestTableProcedure proc = new TestTableProcedure(1, table, + TableProcedureInterface.TableOperationType.CREATE); + while (running.get() && !failure.get()) { + if (procQueue.tryAcquireTableExclusiveLock(proc, table)) { + procQueue.releaseTableExclusiveLock(proc, table); + } + } + } catch (Throwable e) { + LOG.error("create failed", e); + failure.set(true); + } + } + }; + + Thread deleteThread = new Thread() { + @Override + public void run() { + try { + TestTableProcedure proc = new TestTableProcedure(2, table, + TableProcedureInterface.TableOperationType.DELETE); + while (running.get() && !failure.get()) { + if (procQueue.tryAcquireTableExclusiveLock(proc, table)) { + procQueue.releaseTableExclusiveLock(proc, table); + } + procQueue.markTableAsDeleted(table, proc); + } + } catch (Throwable e) { + LOG.error("delete failed", e); + failure.set(true); + } + } + }; + + createThread.start(); + deleteThread.start(); + for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) { + Thread.sleep(100); + } + running.set(false); + createThread.join(); + deleteThread.join(); + assertEquals(false, failure.get()); + } + + /** + * Verify that "write" operations for a single table are serialized, + * but different tables can be executed in parallel. + */ + @Test(timeout=60000) + public void testConcurrentWriteOps() throws Exception { + final TestTableProcSet procSet = new TestTableProcSet(queue); + + final int NUM_ITEMS = 10; + final int NUM_TABLES = 4; + final AtomicInteger opsCount = new AtomicInteger(0); + for (int i = 0; i < NUM_TABLES; ++i) { + TableName tableName = TableName.valueOf(String.format("testtb-%04d", i)); + for (int j = 1; j < NUM_ITEMS; ++j) { + procSet.addBack(new TestTableProcedure(i * 100 + j, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + opsCount.incrementAndGet(); + } + } + assertEquals(opsCount.get(), queue.size()); + + final Thread[] threads = new Thread[NUM_TABLES * 2]; + final HashSet concurrentTables = new HashSet(); + final ArrayList failures = new ArrayList(); + final AtomicInteger concurrentCount = new AtomicInteger(0); + for (int i = 0; i < threads.length; ++i) { + threads[i] = new Thread() { + @Override + public void run() { + while (opsCount.get() > 0) { + try { + Procedure proc = procSet.acquire(); + if (proc == null) { + queue.signalAll(); + if (opsCount.get() > 0) { + continue; + } + break; + } + + TableName tableId = procSet.getTableName(proc); + synchronized (concurrentTables) { + assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId)); + } + assertTrue(opsCount.decrementAndGet() >= 0); + try { + long procId = proc.getProcId(); + int concurrent = concurrentCount.incrementAndGet(); + assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES, + concurrent >= 1 && concurrent <= NUM_TABLES); + LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); + Thread.sleep(2000); + concurrent = concurrentCount.decrementAndGet(); + LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); + assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES); + } finally { + synchronized (concurrentTables) { + assertTrue(concurrentTables.remove(tableId)); + } + procSet.release(proc); + } + } catch (Throwable e) { + LOG.error("Failed " + e.getMessage(), e); + synchronized (failures) { + failures.add(e.getMessage()); + } + } finally { + queue.signalAll(); + } + } + } + }; + threads[i].start(); + } + for (int i = 0; i < threads.length; ++i) { + threads[i].join(); + } + assertTrue(failures.toString(), failures.isEmpty()); + assertEquals(0, opsCount.get()); + assertEquals(0, queue.size()); + + for (int i = 1; i <= NUM_TABLES; ++i) { + final TableName table = TableName.valueOf(String.format("testtb-%04d", i)); + final TestTableProcedure dummyProc = new TestTableProcedure(100, table, + TableProcedureInterface.TableOperationType.DELETE); + assertTrue("queue should be deleted, table=" + table, + queue.markTableAsDeleted(table, dummyProc)); + } + } + + @Test(timeout=60000) + public void testConcurrentWaitWake() throws Exception { + testConcurrentWaitWake(false); + } + + @Test(timeout=60000) + public void testConcurrentWaitWakeBatch() throws Exception { + testConcurrentWaitWake(true); + } + + private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception { + final TableName tableName = TableName.valueOf("testtb"); + + final int NPROCS = 20; + final int NRUNS = 100; + + for (long i = 0; i < NPROCS; ++i) { + queue.addBack(new TestTableProcedureWithEvent(i, tableName, + TableProcedureInterface.TableOperationType.READ)); + } + + final Thread[] threads = new Thread[4]; + final AtomicInteger waitCount = new AtomicInteger(0); + final AtomicInteger wakeCount = new AtomicInteger(0); + + final ConcurrentSkipListSet waitQueue = + new ConcurrentSkipListSet(); + threads[0] = new Thread() { + @Override + public void run() { + while (true) { + if (useWakeBatch) { + ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()]; + for (int i = 0; i < ev.length; ++i) { + ev[i] = waitQueue.pollFirst().getEvent(); + LOG.debug("WAKE " + ev[i] + " total=" + wakeCount.get()); + } + queue.wakeEvents(ev, ev.length); + wakeCount.addAndGet(ev.length); + } else { + int size = waitQueue.size(); + while (size-- > 0) { + ProcedureEvent ev = waitQueue.pollFirst().getEvent(); + queue.wakeEvent(ev); + LOG.debug("WAKE " + ev + " total=" + wakeCount.get()); + wakeCount.incrementAndGet(); + } + } + if (wakeCount.get() >= NRUNS) { + break; + } + Threads.sleepWithoutInterrupt(25); + } + } + }; + + for (int i = 1; i < threads.length; ++i) { + threads[i] = new Thread() { + @Override + public void run() { + while (true) { + TestTableProcedureWithEvent proc = (TestTableProcedureWithEvent)queue.poll(); + if (proc == null) continue; + + waitQueue.add(proc); + queue.suspendEvent(proc.getEvent()); + queue.waitEvent(proc.getEvent(), proc); + LOG.debug("WAIT " + proc.getEvent()); + if (waitCount.incrementAndGet() >= NRUNS) { + break; + } + } + } + }; + } + + for (int i = 0; i < threads.length; ++i) { + threads[i].start(); + } + for (int i = 0; i < threads.length; ++i) { + threads[i].join(); + } + + queue.clear(); + } + + public static class TestTableProcSet { + private final MasterProcedureScheduler queue; + + public TestTableProcSet(final MasterProcedureScheduler queue) { + this.queue = queue; + } + + public void addBack(Procedure proc) { + queue.addBack(proc); + } + + public void addFront(Procedure proc) { + queue.addFront(proc); + } + + public Procedure acquire() { + Procedure proc = null; + boolean avail = false; + while (!avail) { + proc = queue.poll(); + if (proc == null) break; + switch (getTableOperationType(proc)) { + case CREATE: + case DELETE: + case EDIT: + avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc)); + break; + case READ: + avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc)); + break; + default: + throw new UnsupportedOperationException(); + } + if (!avail) { + addFront(proc); + LOG.debug("yield procId=" + proc); + } + } + return proc; + } + + public void release(Procedure proc) { + switch (getTableOperationType(proc)) { + case CREATE: + case DELETE: + case EDIT: + queue.releaseTableExclusiveLock(proc, getTableName(proc)); + break; + case READ: + queue.releaseTableSharedLock(proc, getTableName(proc)); + break; + } + } + + public TableName getTableName(Procedure proc) { + return ((TableProcedureInterface)proc).getTableName(); + } + + public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) { + return ((TableProcedureInterface)proc).getTableOperationType(); + } + } +}