Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1484962) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -32,10 +32,12 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -1135,30 +1137,48 @@ // No event in progress on this region => we can submit a new task immediately. regionsInProgress.add(regRunnable.getRegionName()); - zkEventWorkers.submit(new Runnable() { - @Override - public void run() { - try { - regRunnable.run(); - } finally { - // now that we have finished, let's see if there is an event for the same region in the - // waiting list. If it's the case, we can now submit it to the pool. - synchronized (regionsInProgress) { - regionsInProgress.remove(regRunnable.getRegionName()); - synchronized (zkEventWorkerWaitingList) { - java.util.Set waiting = zkEventWorkerWaitingList.get( - regRunnable.getRegionName()); - if (!waiting.isEmpty()) { - // We want the first object only. The only way to get it is through an iterator. - RegionRunnable toSubmit = waiting.iterator().next(); - zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit); - zkEventWorkersSubmit(toSubmit); + + final AtomicReference toSubmit = + new AtomicReference(regRunnable); + final AtomicInteger attempts = new AtomicInteger(0); + while (toSubmit.get() != null) { + try { + zkEventWorkers.submit(new Runnable() { + @Override + public void run() { + try { + toSubmit.get().run(); + toSubmit.set(null); // assume there is no one waiting + attempts.set(0); + } finally { + if (toSubmit.get() != null) return; + // now that we have finished, let's see if there is an event for the same region in + // the waiting list. If it's the case, we can now submit it to the pool. + synchronized (regionsInProgress) { + regionsInProgress.remove(regRunnable.getRegionName()); + synchronized (zkEventWorkerWaitingList) { + java.util.Set waiting = zkEventWorkerWaitingList.get( + regRunnable.getRegionName()); + if (!waiting.isEmpty()) { + // We want the first object only. The only way to get it is thru an iterator + RegionRunnable next = waiting.iterator().next(); + zkEventWorkerWaitingList.remove(next.getRegionName(), next); + toSubmit.set(next); + } + } } } } + }); + break; + } catch (RejectedExecutionException ree) { + if (attempts.get() > maximumAttempts) { + this.server.abort("Unable to execute within " + maximumAttempts + " attempts", ree); } + // retry + attempts.incrementAndGet(); } - }); + } } }