From 224a66d90ca45d416cd0c1457f06a95758c78fab Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 16 Feb 2018 00:13:58 +0800 Subject: [PATCH] HBASE-20002 Reserve worker thread in ProcedureExecutor for high priority procedures --- .../procedure2/AbstractProcedureScheduler.java | 91 +++++++++------ .../hadoop/hbase/procedure2/LockAndQueue.java | 19 +-- .../apache/hadoop/hbase/procedure2/LockStatus.java | 11 +- .../hadoop/hbase/procedure2/ProcedureDeque.java | 3 +- .../hadoop/hbase/procedure2/ProcedureExecutor.java | 44 ++++--- .../hbase/procedure2/ProcedureScheduler.java | 50 ++++++-- .../hbase/procedure2/SimpleProcedureScheduler.java | 14 +-- .../org/apache/hadoop/hbase/master/HMaster.java | 7 +- .../hadoop/hbase/master/procedure/FairQueue.java | 7 +- .../master/procedure/MasterProcedureScheduler.java | 130 ++++++++++++--------- .../hadoop/hbase/master/procedure/Queue.java | 6 +- .../hbase/master/procedure/SchemaLocking.java | 42 +++---- ...terProcedureSchedulerPerformanceEvaluation.java | 3 +- .../procedure/TestMasterProcedureScheduler.java | 41 +++---- .../TestMasterProcedureSchedulerConcurrency.java | 5 +- 15 files changed, 277 insertions(+), 196 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index fbfa5b2..ed20666 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -15,14 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2; import java.util.Iterator; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.TimeUnit; - import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,20 +76,20 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { * @param procedure the Procedure to add * @param addFront true if the item should be added to the front of the queue */ - protected abstract void enqueue(Procedure procedure, boolean addFront); + protected abstract void enqueue(Procedure procedure, boolean addFront); @Override - public void addFront(final Procedure procedure) { + public void addFront(final Procedure procedure) { push(procedure, true, true); } @Override - public void addFront(Iterator procedureIterator) { + public void addFront(Iterator> procedureIterator) { schedLock(); try { int count = 0; while (procedureIterator.hasNext()) { - Procedure procedure = procedureIterator.next(); + Procedure procedure = procedureIterator.next(); if (LOG.isTraceEnabled()) { LOG.trace("Wake " + procedure); } @@ -105,11 +103,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } @Override - public void addBack(final Procedure procedure) { + public void addBack(final Procedure procedure) { push(procedure, false, true); } - protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { + protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { schedLock(); try { enqueue(procedure, addFront); @@ -129,46 +127,63 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { * NOTE: this method is called with the sched lock held. * @return the Procedure to execute, or null if nothing is available. */ - protected abstract Procedure dequeue(); + protected abstract Procedure dequeue(int priority); @Override - public Procedure poll() { - return poll(-1); + public Procedure poll(int priority) { + return doPoll(priority, -1); } @Override - public Procedure poll(long timeout, TimeUnit unit) { - return poll(unit.toNanos(timeout)); + public Procedure poll(int priority, long timeout, TimeUnit unit) { + return doPoll(priority, unit.toNanos(timeout)); + } + + // return whether we have already timed out before await + private boolean doPollWait(long endTime) throws InterruptedException { + if (endTime < 0) { + schedWaitCond.await(); + } else { + long awaitTime = endTime - System.nanoTime(); + if (awaitTime <= 0) { + return false; + } + schedWaitCond.awaitNanos(awaitTime); + } + return true; } - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - public Procedure poll(final long nanos) { + private Procedure doPoll(int priority, long nanos) { + long endTime = nanos >= 0 ? System.nanoTime() + nanos : -1; schedLock(); try { if (!running) { LOG.debug("the scheduler is not running"); return null; } - - if (!queueHasRunnables()) { - // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller - // to take decisions after a wake/interruption. - if (nanos < 0) { - schedWaitCond.await(); - } else { - schedWaitCond.awaitNanos(nanos); - } + for (;;) { if (!queueHasRunnables()) { - nullPollCalls++; + if (!doPollWait(endTime)) { + return null; + } + continue; + } + Procedure pollResult = dequeue(priority); + pollCalls++; + if (pollResult != null) { + return pollResult; + } + nullPollCalls++; + // This could happen since now we have priority, maybe all the available procedures are low + // priority. And that's also why here we must use a loop, otherwise the worker thread which + // only polls high priority procedures will do dead loop if there are no high priority + // procedures. + if (!doPollWait(endTime)) { return null; } } - - final Procedure pollResult = dequeue(); - pollCalls++; - nullPollCalls += (pollResult == null) ? 1 : 0; - return pollResult; } catch (InterruptedException e) { + // This usually means we want to quit, so give up and return null. Thread.currentThread().interrupt(); nullPollCalls++; return null; @@ -253,22 +268,24 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { * Wakes up given waiting procedures by pushing them back into scheduler queues. * @return size of given {@code waitQueue}. */ - protected int wakeWaitingProcedures(final ProcedureDeque waitQueue) { + protected int wakeWaitingProcedures(final ProcedureDeque waitQueue) { int count = waitQueue.size(); // wakeProcedure adds to the front of queue, so we start from last in the // waitQueue' queue, so that the procedure which was added first goes in the front for // the scheduler queue. - addFront(waitQueue.descendingIterator()); + addFront((Iterator) waitQueue.descendingIterator()); waitQueue.clear(); return count; } - protected void waitProcedure(final ProcedureDeque waitQueue, final Procedure proc) { - waitQueue.addLast(proc); + protected void waitProcedure(final ProcedureDeque waitQueue, final Procedure proc) { + waitQueue.addLast((Procedure) proc); } - protected void wakeProcedure(final Procedure procedure) { - if (LOG.isTraceEnabled()) LOG.trace("Wake " + procedure); + protected void wakeProcedure(final Procedure procedure) { + if (LOG.isTraceEnabled()) { + LOG.trace("Wake " + procedure); + } push(procedure, /* addFront= */ true, /* notify= */false); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java index 427c1fc..c7666d0 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java @@ -45,7 +45,8 @@ import org.apache.yetus.audience.InterfaceAudience; * We do not use ReentrantReadWriteLock directly because of its high memory overhead. */ @InterfaceAudience.Private -public class LockAndQueue extends ProcedureDeque implements LockStatus { +public class LockAndQueue extends ProcedureDeque implements LockStatus { + private Procedure exclusiveLockOwnerProcedure = null; private int sharedLock = 0; @@ -69,12 +70,13 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus { } @Override - public boolean hasParentLock(final Procedure proc) { - return proc.hasParent() && (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); + public boolean hasParentLock(Procedure proc) { + return proc.hasParent() && + (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); } @Override - public boolean hasLockAccess(final Procedure proc) { + public boolean hasLockAccess(Procedure proc) { return isLockOwner(proc.getProcId()) || hasParentLock(proc); } @@ -100,9 +102,10 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus { // ====================================================================== // try/release Shared/Exclusive lock // ====================================================================== - public boolean trySharedLock() { - if (hasExclusiveLock()) return false; + if (hasExclusiveLock()) { + return false; + } sharedLock++; return true; } @@ -111,7 +114,7 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus { return --sharedLock == 0; } - public boolean tryExclusiveLock(final Procedure proc) { + public boolean tryExclusiveLock(Procedure proc) { if (isLocked()) return hasLockAccess(proc); exclusiveLockOwnerProcedure = proc; return true; @@ -120,7 +123,7 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus { /** * @return True if we released a lock. */ - public boolean releaseExclusiveLock(final Procedure proc) { + public boolean releaseExclusiveLock(Procedure proc) { if (isLockOwner(proc.getProcId())) { exclusiveLockOwnerProcedure = null; return true; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java index b0c8d29..c9aad7c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java @@ -15,15 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2; import org.apache.yetus.audience.InterfaceAudience; /** - * Interface to get status of a Lock without getting access to acquire/release lock. - * Currently used in MasterProcedureScheduler where we want to give Queues access to lock's - * status for scheduling purposes, but not the ability to acquire/release it. + * Interface to get status of a Lock without getting access to acquire/release lock. Currently used + * in MasterProcedureScheduler where we want to give Queues access to lock's status for scheduling + * purposes, but not the ability to acquire/release it. */ @InterfaceAudience.Private public interface LockStatus { @@ -33,9 +32,9 @@ public interface LockStatus { boolean isLockOwner(long procId); - boolean hasParentLock(final Procedure proc); + boolean hasParentLock(Procedure proc); - boolean hasLockAccess(final Procedure proc); + boolean hasLockAccess(Procedure proc); Procedure getExclusiveLockOwnerProcedure(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureDeque.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureDeque.java index 41b8ca9..b046f6d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureDeque.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureDeque.java @@ -16,7 +16,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2; import org.apache.yetus.audience.InterfaceAudience; @@ -30,5 +29,5 @@ import java.util.ArrayDeque; * more understanding that it's a queue of waiting procedures. */ @InterfaceAudience.Private -public class ProcedureDeque extends ArrayDeque { +public class ProcedureDeque extends ArrayDeque> { } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index b1ff426..4437775 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -269,6 +269,8 @@ public class ProcedureExecutor { private Configuration conf; private ThreadGroup threadGroup; + // for high priority + private List reservedWorkerThreads; private CopyOnWriteArrayList workerThreads; private TimeoutExecutorThread timeoutExecutor; private int corePoolSize; @@ -506,10 +508,12 @@ public class ProcedureExecutor { return; } - // We have numThreads executor + one timer thread used for timing out + int priorityLevels = scheduler.priorityLevels(); + // We have numThreads + priorityLevels - 1 executor + one timer thread used for timing out // procedures and triggering periodic procedures. - this.corePoolSize = numThreads; - LOG.info("Starting {} Workers (bigger of cpus/4 or 16)", corePoolSize); + this.corePoolSize = numThreads + priorityLevels - 1; + LOG.info("Starting {} Workers (bigger of cpus/4 or 16, plus priority levels minus 1)", + corePoolSize); // Create the Thread Group for the executors threadGroup = new ThreadGroup("PEWorkerGroup"); @@ -520,8 +524,12 @@ public class ProcedureExecutor { // Create the workers workerId.set(0); workerThreads = new CopyOnWriteArrayList<>(); + int priority = priorityLevels; for (int i = 0; i < corePoolSize; ++i) { - workerThreads.add(new WorkerThread(threadGroup)); + workerThreads.add(new WorkerThread(threadGroup, priority)); + if (priority > 1) { + priority--; + } } long st, et; @@ -1703,17 +1711,21 @@ public class ProcedureExecutor { // Worker Thread // ========================================================================== private final class WorkerThread extends StoppableThread { + + private final int priority; private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); private Procedure activeProcedure; - public WorkerThread(final ThreadGroup group) { - super(group, "PEWorker-" + workerId.incrementAndGet()); + public WorkerThread(ThreadGroup group, int priority) { + super(group, "PEWorker-" + workerId.incrementAndGet() + "-pri-" + priority); setDaemon(true); + this.priority = priority; } @Override public void sendStopSignal() { - scheduler.signalAll(); + // TODO: find a way to wake up all the threads without interrupting them, now we rely on the + // one second poll timeout } @Override @@ -1721,8 +1733,10 @@ public class ProcedureExecutor { long lastUpdate = EnvironmentEdgeManager.currentTime(); try { while (isRunning() && keepAlive(lastUpdate)) { - this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); - if (this.activeProcedure == null) continue; + this.activeProcedure = scheduler.poll(priority, 1, TimeUnit.SECONDS); + if (this.activeProcedure == null) { + continue; + } int activeCount = activeExecutorCount.incrementAndGet(); int runningCount = store.setRunningProcedureCount(activeCount); if (LOG.isTraceEnabled()) { @@ -1768,9 +1782,9 @@ public class ProcedureExecutor { return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); } - private boolean keepAlive(final long lastUpdate) { - if (workerThreads.size() <= corePoolSize) return true; - return (EnvironmentEdgeManager.currentTime() - lastUpdate) < keepAliveTime; + private boolean keepAlive(long lastUpdate) { + return workerThreads.size() <= corePoolSize || priority > 1 || + EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime; } } @@ -1994,10 +2008,10 @@ public class ProcedureExecutor { // add a new thread if the worker stuck percentage exceed the threshold limit // and every handler is active. - final float stuckPerc = ((float)stuckCount) / workerThreads.size(); + final float stuckPerc = ((float) stuckCount) / workerThreads.size(); if (stuckPerc >= addWorkerStuckPercentage && - activeExecutorCount.get() == workerThreads.size()) { - final WorkerThread worker = new WorkerThread(threadGroup); + activeExecutorCount.get() == workerThreads.size()) { + final WorkerThread worker = new WorkerThread(threadGroup, 1); workerThreads.add(worker); worker.start(); LOG.debug("Added new worker thread " + worker); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java index e7e1cdb..f2ca1d2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2; import java.util.Iterator; @@ -50,32 +49,32 @@ public interface ProcedureScheduler { * Inserts the specified element at the front of this queue. * @param proc the Procedure to add */ - void addFront(Procedure proc); + void addFront(Procedure proc); /** * Inserts all elements in the iterator at the front of this queue. */ - void addFront(Iterator procedureIterator); + void addFront(Iterator> procedureIterator); /** * Inserts the specified element at the end of this queue. * @param proc the Procedure to add */ - void addBack(Procedure proc); + void addBack(Procedure proc); /** * The procedure can't run at the moment. * add it back to the queue, giving priority to someone else. * @param proc the Procedure to add back to the list */ - void yield(Procedure proc); + void yield(Procedure proc); /** * The procedure in execution completed. * This can be implemented to perform cleanups. * @param proc the Procedure that completed the execution. */ - void completionCleanup(Procedure proc); + void completionCleanup(Procedure proc); /** * @return true if there are procedures available to process, otherwise false. @@ -83,18 +82,53 @@ public interface ProcedureScheduler { boolean hasRunnables(); /** + *

+ * Returns the number of priority levels. The priority will be started from 1 to this value, + * greater value means higher priority. + *

+ *

+ * Default 1 level, which means all the procedures will have the same priority. + *

+ */ + default int priorityLevels() { + return 1; + } + + /** + * Fetch one Procedure from the queue + * @return the Procedure to execute, or null if nothing present. + */ + default Procedure poll() { + return poll(1); + } + + /** + * Fetch one Procedure from the queue + * @param priority the priority of the return procedure should be greater than or equal to this + * value. + * @return the Procedure to execute, or null if nothing present. + */ + Procedure poll(int priority); + + /** * Fetch one Procedure from the queue + * @param timeout how long to wait before giving up, in units of unit + * @param unit a TimeUnit determining how to interpret the timeout parameter * @return the Procedure to execute, or null if nothing present. */ - Procedure poll(); + default Procedure poll(long timeout, TimeUnit unit) { + return poll(1, timeout, unit); + } /** * Fetch one Procedure from the queue + * @param priority the priority of the return procedure should be greater than or equal to this + * value. * @param timeout how long to wait before giving up, in units of unit * @param unit a TimeUnit determining how to interpret the timeout parameter * @return the Procedure to execute, or null if nothing present. */ - Procedure poll(long timeout, TimeUnit unit); + Procedure poll(int priority, long timeout, TimeUnit unit); /** * List lock queues. diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java index feab8be..77c7b1f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java @@ -31,19 +31,19 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceAudience.Private @InterfaceStability.Evolving public class SimpleProcedureScheduler extends AbstractProcedureScheduler { - private final ProcedureDeque runnables = new ProcedureDeque(); + private final ProcedureDeque runnables = new ProcedureDeque<>(); @Override - protected void enqueue(final Procedure procedure, final boolean addFront) { + protected void enqueue(Procedure procedure, boolean addFront) { if (addFront) { - runnables.addFirst(procedure); + runnables.addFirst((Procedure) procedure); } else { - runnables.addLast(procedure); + runnables.addLast((Procedure) procedure); } } @Override - protected Procedure dequeue() { + protected Procedure dequeue(int priority) { return runnables.poll(); } @@ -59,7 +59,7 @@ public class SimpleProcedureScheduler extends AbstractProcedureScheduler { } @Override - public void yield(final Procedure proc) { + public void yield(Procedure proc) { addBack(proc); } @@ -74,7 +74,7 @@ public class SimpleProcedureScheduler extends AbstractProcedureScheduler { } @Override - public void completionCleanup(Procedure proc) { + public void completionCleanup(Procedure proc) { } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 02fbc02..ba00760 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1213,13 +1213,12 @@ public class HMaster extends HRegionServer implements MasterServices { configurationManager.registerObserver(procEnv); int cpus = Runtime.getRuntime().availableProcessors(); - final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, - Math.max((cpus > 0? cpus/4: 0), - MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); + final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max( + (cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); final boolean abortOnCorruption = conf.getBoolean( MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); - procedureStore.start(numThreads); + procedureStore.start(numThreads + MasterProcedureUtil.getTablePriorityLevels() - 1); procedureExecutor.start(numThreads, abortOnCorruption); procEnv.getRemoteDispatcher().start(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java index ac8e577..33296d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java @@ -59,18 +59,13 @@ public class FairQueue> { size--; } - public Queue poll() { + public Queue peek() { if (queueHead == null) { return null; } Queue q = queueHead; do { if (q.isAvailable()) { - if (q.getPriority() == 1) { - // for the normal priority queue, remove it and append it to the tail - queueHead = AvlIterableList.remove(queueHead, q); - queueHead = AvlIterableList.append(queueHead, q); - } return q; } q = AvlIterableList.readNext(q); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 5cc9298..7af9f13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -113,12 +113,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private final SchemaLocking locking = new SchemaLocking(); @Override - public void yield(final Procedure proc) { + public int priorityLevels() { + return MasterProcedureUtil.getTablePriorityLevels(); + } + + @Override + public void yield(Procedure proc) { push(proc, isTableProcedure(proc), true); } @Override - protected void enqueue(final Procedure proc, final boolean addFront) { + protected void enqueue(Procedure proc, boolean addFront) { if (isTableProcedure(proc)) { doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); } else if (isServerProcedure(proc)) { @@ -158,28 +163,36 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } @Override - protected Procedure dequeue() { - // For now, let server handling have precedence over table handling; presumption is that it - // is more important handling crashed servers than it is running the - // enabling/disabling tables, etc. - Procedure pollResult = doPoll(serverRunQueue); + protected Procedure dequeue(int priority) { + // TODO: in general server procedure should have the same priority levels with table, which are + // carrying meta, carrying system regions, and only carrying user regions. And then we should + // peek the first queue for both serverRunQueue and tableRunQueue, and choose the one with the + // higher priority, and if the priority is the same then we choose server. But now we only know + // if a server is carrying meta, which means we will miss one level for serverRunQueue, then the + // above logic will be broken. It may happens that, the server which carries a system region is + // crashed, but the table procedures which modify a system table will have a higher priority and + // consume all the workers and then dead lock. So here we still treat server procedures as the + // first choice. Need to optimize later. + Procedure pollResult = doPoll(serverRunQueue, 1); if (pollResult == null) { - pollResult = doPoll(peerRunQueue); + pollResult = doPoll(tableRunQueue, priority); } if (pollResult == null) { - pollResult = doPoll(tableRunQueue); + pollResult = doPoll(peerRunQueue, priority); } return pollResult; } - private > Procedure doPoll(final FairQueue fairq) { - final Queue rq = fairq.poll(); - if (rq == null || !rq.isAvailable()) { + private > Procedure doPoll(FairQueue fairq, int priority) { + final Queue rq = fairq.peek(); + if (rq == null) { return null; } final Procedure pollResult = rq.peek(); if (pollResult == null) { + // the queue is empty. + removeFromRunQueue(fairq, rq); return null; } final boolean xlockReq = rq.requireExclusiveLock(pollResult); @@ -188,20 +201,28 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { removeFromRunQueue(fairq, rq); return null; } - + if (rq.getPriority() < priority) { + // low priority, give up + return null; + } rq.poll(); if (rq.isEmpty() || xlockReq) { removeFromRunQueue(fairq, rq); - } else if (rq.getLockStatus().hasParentLock(pollResult)) { + return pollResult; + } + if (rq.getLockStatus().hasParentLock(pollResult)) { // if the rq is in the fairq because of runnable child // check if the next procedure is still a child. // if not, remove the rq from the fairq and go back to the xlock state Procedure nextProc = rq.peek(); if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) { removeFromRunQueue(fairq, rq); + return pollResult; } } - + // remove and add it to the tail to give chance to other queues. + fairq.remove(rq); + fairq.add(rq); return pollResult; } @@ -288,7 +309,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } @Override - public void completionCleanup(final Procedure proc) { + public void completionCleanup(final Procedure proc) { if (proc instanceof TableProcedureInterface) { TableProcedureInterface iProcTable = (TableProcedureInterface) proc; boolean tableDeleted; @@ -408,7 +429,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return; } - final LockAndQueue lock = locking.getPeerLock(peerId); + final LockAndQueue lock = locking.getPeerLock(peerId); if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { removeFromRunQueue(peerRunQueue, queue); removePeerQueue(peerId); @@ -457,12 +478,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param table Table to lock * @return true if the procedure has to wait for the table to be available */ - public boolean waitTableExclusiveLock(final Procedure procedure, final TableName table) { + public boolean waitTableExclusiveLock(Procedure procedure, TableName table) { schedLock(); try { final String namespace = table.getNamespaceAsString(); - final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); - final LockAndQueue tableLock = locking.getTableLock(table); + final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); + final LockAndQueue tableLock = locking.getTableLock(table); if (!namespaceLock.trySharedLock()) { waitProcedure(namespaceLock, procedure); logLockedResource(LockedResourceType.NAMESPACE, namespace); @@ -486,11 +507,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure releasing the lock * @param table the name of the table that has the exclusive lock */ - public void wakeTableExclusiveLock(final Procedure procedure, final TableName table) { + public void wakeTableExclusiveLock(Procedure procedure, TableName table) { schedLock(); try { - final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); - final LockAndQueue tableLock = locking.getTableLock(table); + final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); + final LockAndQueue tableLock = locking.getTableLock(table); int waitingCount = 0; if (!tableLock.hasParentLock(procedure)) { @@ -514,15 +535,15 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param table Table to lock * @return true if the procedure has to wait for the table to be available */ - public boolean waitTableSharedLock(final Procedure procedure, final TableName table) { + public boolean waitTableSharedLock(Procedure procedure, TableName table) { return waitTableQueueSharedLock(procedure, table) == null; } - private TableQueue waitTableQueueSharedLock(final Procedure procedure, final TableName table) { + private TableQueue waitTableQueueSharedLock(Procedure procedure, final TableName table) { schedLock(); try { - final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); - final LockAndQueue tableLock = locking.getTableLock(table); + final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); + final LockAndQueue tableLock = locking.getTableLock(table); if (!namespaceLock.trySharedLock()) { waitProcedure(namespaceLock, procedure); return null; @@ -545,11 +566,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure releasing the lock * @param table the name of the table that has the shared lock */ - public void wakeTableSharedLock(final Procedure procedure, final TableName table) { + public void wakeTableSharedLock(Procedure procedure, TableName table) { schedLock(); try { - final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); - final LockAndQueue tableLock = locking.getTableLock(table); + final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); + final LockAndQueue tableLock = locking.getTableLock(table); int waitingCount = 0; if (tableLock.releaseSharedLock()) { addToRunQueue(tableRunQueue, getTableQueue(table)); @@ -578,7 +599,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { schedLock(); try { final TableQueue queue = getTableQueue(table); - final LockAndQueue tableLock = locking.getTableLock(table); + final LockAndQueue tableLock = locking.getTableLock(table); if (queue == null) return true; if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) { @@ -606,7 +627,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param regionInfo the region we are trying to lock * @return true if the procedure has to wait for the regions to be available */ - public boolean waitRegion(final Procedure procedure, final RegionInfo regionInfo) { + public boolean waitRegion(Procedure procedure, RegionInfo regionInfo) { return waitRegions(procedure, regionInfo.getTable(), regionInfo); } @@ -617,8 +638,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param regionInfo the list of regions we are trying to lock * @return true if the procedure has to wait for the regions to be available */ - public boolean waitRegions(final Procedure procedure, final TableName table, - final RegionInfo... regionInfo) { + public boolean waitRegions(Procedure procedure, TableName table, RegionInfo... regionInfo) { Arrays.sort(regionInfo, RegionInfo.COMPARATOR); schedLock(); try { @@ -631,7 +651,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // acquire region xlocks or wait boolean hasLock = true; - final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length]; + final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { LOG.info(procedure + ", table=" + table + ", " + regionInfo[i].getRegionNameAsString()); assert table != null; @@ -665,7 +685,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure that was holding the region * @param regionInfo the region the procedure was holding */ - public void wakeRegion(final Procedure procedure, final RegionInfo regionInfo) { + public void wakeRegion(Procedure procedure, RegionInfo regionInfo) { wakeRegions(procedure, regionInfo.getTable(), regionInfo); } @@ -674,18 +694,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure that was holding the regions * @param regionInfo the list of regions the procedure was holding */ - public void wakeRegions(final Procedure procedure,final TableName table, - final RegionInfo... regionInfo) { + public void wakeRegions(Procedure procedure, TableName table, RegionInfo... regionInfo) { Arrays.sort(regionInfo, RegionInfo.COMPARATOR); schedLock(); try { int numProcs = 0; - final Procedure[] nextProcs = new Procedure[regionInfo.length]; + final Procedure[] nextProcs = new Procedure[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { assert regionInfo[i].getTable().equals(table); assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; - LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName()); + LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName()); if (regionLock.releaseExclusiveLock(procedure)) { if (!regionLock.isEmpty()) { // release one procedure at the time since regions has an xlock @@ -721,19 +740,19 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param namespace Namespace to lock * @return true if the procedure has to wait for the namespace to be available */ - public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String namespace) { + public boolean waitNamespaceExclusiveLock(Procedure procedure, String namespace) { schedLock(); try { - final LockAndQueue systemNamespaceTableLock = - locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); + final LockAndQueue systemNamespaceTableLock = + locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); if (!systemNamespaceTableLock.trySharedLock()) { waitProcedure(systemNamespaceTableLock, procedure); logLockedResource(LockedResourceType.TABLE, - TableName.NAMESPACE_TABLE_NAME.getNameAsString()); + TableName.NAMESPACE_TABLE_NAME.getNameAsString()); return true; } - final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); + final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); if (!namespaceLock.tryExclusiveLock(procedure)) { systemNamespaceTableLock.releaseSharedLock(); waitProcedure(namespaceLock, procedure); @@ -752,12 +771,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure releasing the lock * @param namespace the namespace that has the exclusive lock */ - public void wakeNamespaceExclusiveLock(final Procedure procedure, final String namespace) { + public void wakeNamespaceExclusiveLock(Procedure procedure, String namespace) { schedLock(); try { - final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); - final LockAndQueue systemNamespaceTableLock = - locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); + final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); + final LockAndQueue systemNamespaceTableLock = + locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); namespaceLock.releaseExclusiveLock(procedure); int waitingCount = 0; if (systemNamespaceTableLock.releaseSharedLock()) { @@ -781,11 +800,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param serverName Server to lock * @return true if the procedure has to wait for the server to be available */ - public boolean waitServerExclusiveLock(final Procedure procedure, - final ServerName serverName) { + public boolean waitServerExclusiveLock(Procedure procedure, ServerName serverName) { schedLock(); try { - final LockAndQueue lock = locking.getServerLock(serverName); + final LockAndQueue lock = locking.getServerLock(serverName); if (lock.tryExclusiveLock(procedure)) { removeFromRunQueue(serverRunQueue, getServerQueue(serverName)); return false; @@ -804,10 +822,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure releasing the lock * @param serverName the server that has the exclusive lock */ - public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) { + public void wakeServerExclusiveLock(Procedure procedure, ServerName serverName) { schedLock(); try { - final LockAndQueue lock = locking.getServerLock(serverName); + final LockAndQueue lock = locking.getServerLock(serverName); lock.releaseExclusiveLock(procedure); addToRunQueue(serverRunQueue, getServerQueue(serverName)); int waitingCount = wakeWaitingProcedures(lock); @@ -830,7 +848,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public boolean waitPeerExclusiveLock(Procedure procedure, String peerId) { schedLock(); try { - final LockAndQueue lock = locking.getPeerLock(peerId); + final LockAndQueue lock = locking.getPeerLock(peerId); if (lock.tryExclusiveLock(procedure)) { removeFromRunQueue(peerRunQueue, getPeerQueue(peerId)); return false; @@ -852,7 +870,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public void wakePeerExclusiveLock(Procedure procedure, String peerId) { schedLock(); try { - final LockAndQueue lock = locking.getPeerLock(peerId); + final LockAndQueue lock = locking.getPeerLock(peerId); lock.releaseExclusiveLock(procedure); addToRunQueue(peerRunQueue, getPeerQueue(peerId)); int waitingCount = wakeWaitingProcedures(lock); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java index f7bea2a..7173cec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java @@ -33,7 +33,7 @@ abstract class Queue> extends AvlLinkedNode runnables = new ProcedureDeque<>(); // Reference to status of lock on entity this queue represents. private final LockStatus lockStatus; @@ -71,9 +71,9 @@ abstract class Queue> extends AvlLinkedNode proc, boolean addToFront) { if (addToFront) { - runnables.addFirst(proc); + runnables.addFirst((Procedure) proc); } else { - runnables.addLast(proc); + runnables.addLast((Procedure) proc); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java index 5dcc121..b3a8aa9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java @@ -43,56 +43,56 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private class SchemaLocking { - private final Map serverLocks = new HashMap<>(); - private final Map namespaceLocks = new HashMap<>(); - private final Map tableLocks = new HashMap<>(); + private final Map> serverLocks = new HashMap<>(); + private final Map> namespaceLocks = new HashMap<>(); + private final Map> tableLocks = new HashMap<>(); // Single map for all regions irrespective of tables. Key is encoded region name. - private final Map regionLocks = new HashMap<>(); - private final Map peerLocks = new HashMap<>(); + private final Map> regionLocks = new HashMap<>(); + private final Map> peerLocks = new HashMap<>(); - private LockAndQueue getLock(Map map, T key) { - LockAndQueue lock = map.get(key); + private LockAndQueue getLock(Map> map, T key) { + LockAndQueue lock = map.get(key); if (lock == null) { - lock = new LockAndQueue(); + lock = new LockAndQueue<>(); map.put(key, lock); } return lock; } - LockAndQueue getTableLock(TableName tableName) { + LockAndQueue getTableLock(TableName tableName) { return getLock(tableLocks, tableName); } - LockAndQueue removeTableLock(TableName tableName) { + LockAndQueue removeTableLock(TableName tableName) { return tableLocks.remove(tableName); } - LockAndQueue getNamespaceLock(String namespace) { + LockAndQueue getNamespaceLock(String namespace) { return getLock(namespaceLocks, namespace); } - LockAndQueue getRegionLock(String encodedRegionName) { + LockAndQueue getRegionLock(String encodedRegionName) { return getLock(regionLocks, encodedRegionName); } - LockAndQueue removeRegionLock(String encodedRegionName) { + LockAndQueue removeRegionLock(String encodedRegionName) { return regionLocks.remove(encodedRegionName); } - LockAndQueue getServerLock(ServerName serverName) { + LockAndQueue getServerLock(ServerName serverName) { return getLock(serverLocks, serverName); } - LockAndQueue getPeerLock(String peerId) { + LockAndQueue getPeerLock(String peerId) { return getLock(peerLocks, peerId); } - LockAndQueue removePeerLock(String peerId) { + LockAndQueue removePeerLock(String peerId) { return peerLocks.remove(peerId); } private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName, - LockAndQueue queue) { + LockAndQueue queue) { LockType lockType; Procedure exclusiveLockOwnerProcedure; int sharedLockCount; @@ -122,7 +122,7 @@ class SchemaLocking { } private void addToLockedResources(List lockedResources, - Map locks, Function keyTransformer, + Map> locks, Function keyTransformer, LockedResourceType resourcesType) { locks.entrySet().stream().filter(e -> e.getValue().isLocked()) .map(e -> createLockedResource(resourcesType, keyTransformer.apply(e.getKey()), e.getValue())) @@ -152,7 +152,7 @@ class SchemaLocking { * locked. */ LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { - LockAndQueue queue; + LockAndQueue queue; switch (resourceType) { case SERVER: queue = serverLocks.get(ServerName.valueOf(resourceName)); @@ -196,10 +196,10 @@ class SchemaLocking { filterUnlocked(this.peerLocks); } - private String filterUnlocked(Map locks) { + private String filterUnlocked(Map> locks) { StringBuilder sb = new StringBuilder("{"); int initialLength = sb.length(); - for (Map.Entry entry : locks.entrySet()) { + for (Map.Entry> entry : locks.entrySet()) { if (!entry.getValue().isLocked()) { continue; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java index d86d083..b43b741 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.Random; @@ -216,7 +217,7 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase public void run() { while (completed.get() < numOps) { // With lock/unlock being ~100ns, and no other workload, 1000ns wait seams reasonable. - TestProcedure proc = (TestProcedure)procedureScheduler.poll(1000); + TestProcedure proc = (TestProcedure) procedureScheduler.poll(1, TimeUnit.MICROSECONDS); if (proc == null) { yield.incrementAndGet(); continue; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 65757db..35cac76 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -27,6 +27,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; @@ -219,7 +220,7 @@ public class TestMasterProcedureScheduler { assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); // Fetch the 2nd item and verify that the lock can't be acquired - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // Release the write lock and acquire the read lock queue.wakeTableExclusiveLock(proc, tableName); @@ -230,7 +231,7 @@ public class TestMasterProcedureScheduler { assertEquals(false, queue.waitTableSharedLock(rdProc, tableName)); // Fetch the 3rd item and verify that the lock can't be acquired - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // release the rdlock of item 2 and take the wrlock for the 3d item queue.wakeTableSharedLock(rdProc, tableName); @@ -240,7 +241,7 @@ public class TestMasterProcedureScheduler { assertEquals(false, queue.waitTableExclusiveLock(wrProc, tableName)); // Fetch 4th item and verify that the lock can't be acquired - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // Release the write lock and acquire the read lock queue.wakeTableExclusiveLock(wrProc, tableName); @@ -299,7 +300,7 @@ public class TestMasterProcedureScheduler { assertFalse(queue.waitNamespaceExclusiveLock(procNs2, nsName2)); // ns1 and ns2 are both locked so we get nothing - assertNull(queue.poll()); + assertNull(queue.poll(0, TimeUnit.NANOSECONDS)); // release the ns1 lock queue.wakeNamespaceExclusiveLock(procNs1, nsName1); @@ -331,7 +332,7 @@ public class TestMasterProcedureScheduler { assertEquals(false, queue.waitNamespaceExclusiveLock(proc, nsName)); // the table operation can't be executed because the ns is locked - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // release the ns lock queue.wakeNamespaceExclusiveLock(proc, nsName); @@ -360,7 +361,7 @@ public class TestMasterProcedureScheduler { assertEquals(false, queue.waitRegion(proc, regionA)); // the xlock operation in the queue can't be executed - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // release the shared lock queue.wakeRegion(proc, regionA); @@ -371,7 +372,7 @@ public class TestMasterProcedureScheduler { assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); // everything is locked by the table operation - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // release the table xlock queue.wakeTableExclusiveLock(proc, tableName); @@ -382,7 +383,7 @@ public class TestMasterProcedureScheduler { // lock and unlock the region assertEquals(false, queue.waitRegion(proc, regionA)); - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); queue.wakeRegion(proc, regionA); } @@ -410,7 +411,7 @@ public class TestMasterProcedureScheduler { assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); // everything is locked by the table operation - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // release the table lock queue.wakeTableExclusiveLock(proc, tableName); @@ -438,7 +439,7 @@ public class TestMasterProcedureScheduler { assertEquals(false, queue.waitRegions(procC, tableName, regionC)); // 3rd and 4th are in the region suspended queue - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // Release region A-B from merge operation (procId=2) queue.wakeRegions(mergeProc, tableName, regionA, regionB); @@ -473,7 +474,7 @@ public class TestMasterProcedureScheduler { Procedure rootProc = queue.poll(); assertEquals(1, rootProc.getProcId()); assertEquals(false, queue.waitTableExclusiveLock(rootProc, tableName)); - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // Execute the 1st step of the root-proc. // we should get 3 sub-proc back, one for each region. @@ -498,13 +499,13 @@ public class TestMasterProcedureScheduler { // we should be able to fetch and execute all the sub-procs, // since they are operating on different regions for (int i = 0; i < subProcs.length; ++i) { - TestRegionProcedure regionProc = (TestRegionProcedure)queue.poll(0); + TestRegionProcedure regionProc = (TestRegionProcedure) queue.poll(0, TimeUnit.NANOSECONDS); assertEquals(subProcs[i].getProcId(), regionProc.getProcId()); assertEquals(false, queue.waitRegions(regionProc, tableName, regionProc.getRegionInfo())); } // nothing else in the queue - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // release all the region locks for (int i = 0; i < subProcs.length; ++i) { @@ -513,7 +514,7 @@ public class TestMasterProcedureScheduler { } // nothing else in the queue - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // release the table lock (for the root procedure) queue.wakeTableExclusiveLock(rootProc, tableName); @@ -550,7 +551,7 @@ public class TestMasterProcedureScheduler { queue.wakeRegion(childProc, region); // nothing in the queue (proc-3 is suspended) - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // release the root lock queue.wakeRegion(rootProc, region); @@ -580,14 +581,14 @@ public class TestMasterProcedureScheduler { proc = queue.poll(); assertEquals(2, proc.getProcId()); - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // resume event.wake(queue); proc = queue.poll(); assertEquals(1, proc.getProcId()); - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); } private static HRegionInfo[] generateRegionInfo(final TableName tableName) { @@ -656,7 +657,7 @@ public class TestMasterProcedureScheduler { } // nothing available, until xlock release - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // release xlock queue.wakeTableExclusiveLock(parentProc, tableName); @@ -724,7 +725,7 @@ public class TestMasterProcedureScheduler { assertEquals(false, queue.waitTableExclusiveLock(proc, tableName)); // nothing available, until xlock release - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // put the proc in the queue queue.yield(proc); @@ -762,7 +763,7 @@ public class TestMasterProcedureScheduler { assertEquals(false, queue.waitTableSharedLock(proc2, tableName)); // nothing available, until xlock release - assertEquals(null, queue.poll(0)); + assertEquals(null, queue.poll(0, TimeUnit.NANOSECONDS)); // put the procs back in the queue queue.yield(proc2); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java index 1313cdb..7d2e7a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.TableName; @@ -261,7 +262,7 @@ public class TestMasterProcedureSchedulerConcurrency { Procedure proc = null; boolean waiting = true; while (waiting && queue.size() > 0) { - proc = queue.poll(100000000L); + proc = queue.poll(1, TimeUnit.MILLISECONDS); if (proc == null) continue; switch (getTableOperationType(proc)) { case CREATE: @@ -318,7 +319,7 @@ public class TestMasterProcedureSchedulerConcurrency { TestPeerProcedure proc = null; boolean waiting = true; while (waiting && queue.size() > 0) { - proc = (TestPeerProcedure) queue.poll(100000000L); + proc = (TestPeerProcedure) queue.poll(1, TimeUnit.MILLISECONDS); if (proc == null) { continue; } -- 2.7.4