diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 6403cfd..71e13dc 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; +import java.util.concurrent.TimeUnit; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -463,8 +464,13 @@ public abstract class Procedure implements Comparable { /** * @return the remaining time before the timeout */ - public long getTimeRemaining() { - return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime)); + public synchronized long getTimeRemainingBeforeTimeout(final TimeUnit unit) { + final long currentTime = EnvironmentEdgeManager.currentTime(); + final long timeoutTime = getLastUpdate() + getTimeout(); + if (currentTime >= timeoutTime) { + return 0; + } + return unit.convert(timeoutTime - currentTime, TimeUnit.MILLISECONDS); } @VisibleForTesting diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index c2838ba..145e308 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -29,12 +29,15 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -48,8 +51,6 @@ import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue; -import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -96,17 +97,52 @@ public class ProcedureExecutor { } /** - * Used by the TimeoutBlockingQueue to get the timeout interval of the procedure + * Used by the DelayQueue to get the timeout interval of the procedure */ - private static class ProcedureTimeoutRetriever implements TimeoutRetriever { + private static class DelayedContainer implements Delayed { + // Used to wake up the timeoutLoop() on shutdown + static final DelayedContainer POISON = new DelayedContainer(); + + /** null if poison */ + final Procedure proc; + + DelayedContainer(final Procedure proc) { + assert proc != null; + this.proc = proc; + } + + DelayedContainer() { + this.proc = null; + } + @Override - public long getTimeout(Procedure proc) { - return proc.getTimeRemaining(); + public long getDelay(final TimeUnit unit) { + return (proc != null) ? proc.getTimeRemainingBeforeTimeout(unit) : 0; } + /** + * @throws NullPointerException {@inheritDoc} + * @throws ClassCastException {@inheritDoc} + */ @Override - public TimeUnit getTimeUnit(Procedure proc) { - return TimeUnit.MILLISECONDS; + public int compareTo(final Delayed o) { + return Long.compare(getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS)); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof DelayedContainer)) { + return false; + } + return Objects.equals(proc, ((DelayedContainer)obj).proc); + } + + @Override + public int hashCode() { + return (proc != null) ? proc.hashCode() : 0; } } @@ -225,8 +261,8 @@ public class ProcedureExecutor { * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state * or periodic procedures. */ - private final TimeoutBlockingQueue waitingTimeout = - new TimeoutBlockingQueue(new ProcedureTimeoutRetriever()); + private final DelayQueue waitingTimeout = + new DelayQueue(); /** * Queue that contains runnable procedures. @@ -527,7 +563,7 @@ public class ProcedureExecutor { LOG.info("Stopping the procedure executor"); runnables.signalAll(); - waitingTimeout.signalAll(); + waitingTimeout.add(DelayedContainer.POISON); } public void join() { @@ -610,8 +646,8 @@ public class ProcedureExecutor { * @param chore the chore to add */ public void addChore(final ProcedureInMemoryChore chore) { - chore.setState(ProcedureState.RUNNABLE); - waitingTimeout.add(chore); + chore.setState(ProcedureState.WAITING_TIMEOUT); + addToWaitingQueue(chore); } /** @@ -621,7 +657,16 @@ public class ProcedureExecutor { */ public boolean removeChore(final ProcedureInMemoryChore chore) { chore.setState(ProcedureState.FINISHED); - return waitingTimeout.remove(chore); + return removeFromWaitingQueue(chore); + } + + private void addToWaitingQueue(final Procedure procedure) { + assert procedure.getState() == ProcedureState.WAITING_TIMEOUT; + waitingTimeout.add(new DelayedContainer(procedure)); + } + + private boolean removeFromWaitingQueue(final Procedure procedure) { + return waitingTimeout.remove(new DelayedContainer(procedure)); } /** @@ -910,16 +955,22 @@ public class ProcedureExecutor { private void timeoutLoop() { while (isRunning()) { - Procedure proc = waitingTimeout.poll(); - if (proc == null) continue; + final DelayedContainer task; + try { + task = waitingTimeout.take(); + } catch (InterruptedException e) { + // Just consume the interruption. + continue; + } - if (proc.getTimeRemaining() > 100) { - // got an early wake, maybe a stop? - // re-enqueue the task in case was not a stop or just a signal - waitingTimeout.add(proc); + if (task == null || task == DelayedContainer.POISON) { + // the executor may be shutting down, + // and the task is just the shutdown request continue; } + final Procedure proc = task.proc; + // ---------------------------------------------------------------------------- // TODO-MAYBE: Should we provide a notification to the store with the // full set of procedures pending and completed to write a compacted @@ -932,14 +983,14 @@ public class ProcedureExecutor { // instead of bringing the Chore class in, we reuse this timeout thread for // this special case. if (proc instanceof ProcedureInMemoryChore) { - if (proc.isRunnable()) { + if (proc.isWaiting()) { try { ((ProcedureInMemoryChore)proc).periodicExecute(getEnvironment()); } catch (Throwable e) { LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e); } - proc.setStartTime(EnvironmentEdgeManager.currentTime()); - if (proc.isRunnable()) waitingTimeout.add(proc); + proc.updateTimestamp(); + if (proc.isWaiting()) addToWaitingQueue(proc); } continue; } @@ -954,7 +1005,8 @@ public class ProcedureExecutor { runnables.addFront(proc); continue; } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) { - waitingTimeout.add(proc); + proc.updateTimestamp(); + addToWaitingQueue(proc); } } } @@ -1154,7 +1206,7 @@ public class ProcedureExecutor { procedure.setState(ProcedureState.WAITING); break; case WAITING_TIMEOUT: - waitingTimeout.add(procedure); + addToWaitingQueue(procedure); break; default: break; @@ -1162,7 +1214,7 @@ public class ProcedureExecutor { } } } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { - waitingTimeout.add(procedure); + addToWaitingQueue(procedure); } else if (!isSuspended) { // No subtask, so we are done procedure.setState(ProcedureState.FINISHED); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java deleted file mode 100644 index 2292e63..0000000 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2.util; - -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class TimeoutBlockingQueue { - public static interface TimeoutRetriever { - long getTimeout(T object); - TimeUnit getTimeUnit(T object); - } - - private final ReentrantLock lock = new ReentrantLock(); - private final Condition waitCond = lock.newCondition(); - private final TimeoutRetriever timeoutRetriever; - - private E[] objects; - private int head = 0; - private int tail = 0; - - public TimeoutBlockingQueue(TimeoutRetriever timeoutRetriever) { - this(32, timeoutRetriever); - } - - @SuppressWarnings("unchecked") - public TimeoutBlockingQueue(int capacity, TimeoutRetriever timeoutRetriever) { - this.objects = (E[])new Object[capacity]; - this.timeoutRetriever = timeoutRetriever; - } - - public void dump() { - for (int i = 0; i < objects.length; ++i) { - if (i == head) { - System.out.print("[" + objects[i] + "] "); - } else if (i == tail) { - System.out.print("]" + objects[i] + "[ "); - } else { - System.out.print(objects[i] + " "); - } - } - System.out.println(); - } - - public void clear() { - lock.lock(); - try { - if (head != tail) { - for (int i = head; i < tail; ++i) { - objects[i] = null; - } - head = 0; - tail = 0; - waitCond.signal(); - } - } finally { - lock.unlock(); - } - } - - public void add(E e) { - if (e == null) throw new NullPointerException(); - - lock.lock(); - try { - addElement(e); - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - public boolean remove(E e) { - if (e == null) return false; - lock.lock(); - try { - for (int i = 0; i < objects.length; ++i) { - if (e.equals(objects[i])) { - objects[i] = null; - return true; - } - } - return false; - } finally { - lock.unlock(); - } - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - public E poll() { - lock.lock(); - try { - if (isEmpty()) { - waitCond.await(); - return null; - } - - E elem = objects[head]; - long nanos = getNanosTimeout(elem); - nanos = waitCond.awaitNanos(nanos); - return nanos > 0 ? null : removeFirst(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } finally { - lock.unlock(); - } - } - - public int size() { - return tail - head; - } - - public boolean isEmpty() { - return (tail - head) == 0; - } - - public void signalAll() { - lock.lock(); - try { - waitCond.signalAll(); - } finally { - lock.unlock(); - } - } - - private void addElement(E elem) { - int size = (tail - head); - if ((objects.length - size) == 0) { - int capacity = size + ((size < 64) ? (size + 2) : (size >> 1)); - E[] newObjects = (E[])new Object[capacity]; - - if (compareTimeouts(objects[tail - 1], elem) <= 0) { - // Append - System.arraycopy(objects, head, newObjects, 0, tail); - tail -= head; - newObjects[tail++] = elem; - } else if (compareTimeouts(objects[head], elem) > 0) { - // Prepend - System.arraycopy(objects, head, newObjects, 1, tail); - newObjects[0] = elem; - tail -= (head - 1); - } else { - // Insert in the middle - int index = upperBound(head, tail - 1, elem); - int newIndex = (index - head); - System.arraycopy(objects, head, newObjects, 0, newIndex); - newObjects[newIndex] = elem; - System.arraycopy(objects, index, newObjects, newIndex + 1, tail - index); - tail -= (head - 1); - } - head = 0; - objects = newObjects; - } else { - if (tail == objects.length) { - // shift down |-----AAAAAAA| - tail -= head; - System.arraycopy(objects, head, objects, 0, tail); - head = 0; - } - - if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) { - // Append - objects[tail++] = elem; - } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) { - // Prepend - objects[--head] = elem; - } else { - // Insert in the middle - int index = upperBound(head, tail - 1, elem); - System.arraycopy(objects, index, objects, index + 1, tail - index); - objects[index] = elem; - tail++; - } - } - } - - private E removeFirst() { - E elem = objects[head]; - objects[head] = null; - head = (head + 1) % objects.length; - if (head == 0) tail = 0; - return elem; - } - - private int upperBound(int start, int end, E key) { - while (start < end) { - int mid = (start + end) >>> 1; - E mitem = objects[mid]; - int cmp = compareTimeouts(mitem, key); - if (cmp > 0) { - end = mid; - } else { - start = mid + 1; - } - } - return start; - } - - private int compareTimeouts(final E a, final E b) { - long t1 = getNanosTimeout(a); - long t2 = getNanosTimeout(b); - return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0; - } - - private long getNanosTimeout(final E obj) { - if (obj == null) return 0; - TimeUnit unit = timeoutRetriever.getTimeUnit(obj); - long timeout = timeoutRetriever.getTimeout(obj); - return unit.toNanos(timeout); - } -} diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java index 8bc8fa8..50ccfa6 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java @@ -76,17 +76,17 @@ public class TestProcedureInMemoryChore { CountDownLatch latch = new CountDownLatch(nCountDown); TestLatchChore chore = new TestLatchChore(timeoutMSec, latch); procExecutor.addChore(chore); - assertTrue(chore.isRunnable()); + assertTrue(chore.isWaiting()); latch.await(); // remove the chore and verify it is no longer executed - assertTrue(chore.isRunnable()); + assertTrue(chore.isWaiting()); procExecutor.removeChore(chore); latch = new CountDownLatch(nCountDown); chore.setLatch(latch); latch.await(timeoutMSec * nCountDown, TimeUnit.MILLISECONDS); LOG.info("chore latch count=" + latch.getCount()); - assertFalse(chore.isRunnable()); + assertFalse(chore.isWaiting()); assertTrue("latchCount=" + latch.getCount(), latch.getCount() > 0); } @@ -104,6 +104,7 @@ public class TestProcedureInMemoryChore { @Override protected void periodicExecute(final TestProcEnv env) { + LOG.info("periodic execute " + this); latch.countDown(); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java deleted file mode 100644 index 1f901b5..0000000 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2.util; - - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.hbase.CategoryBasedTimeout; -import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever; -import org.apache.hadoop.hbase.testclassification.MasterTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestRule; - -@Category({MasterTests.class, MediumTests.class}) -public class TestTimeoutBlockingQueue { - @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()). - withLookingForStuckThread(true).build(); - static class TestObject { - private long timeout; - private int seqId; - - public TestObject(int seqId, long timeout) { - this.timeout = timeout; - this.seqId = seqId; - } - - public long getTimeout() { - return timeout; - } - - public String toString() { - return String.format("(%03d, %03d)", seqId, timeout); - } - } - - static class TestObjectTimeoutRetriever implements TimeoutRetriever { - @Override - public long getTimeout(TestObject obj) { - return obj.getTimeout(); - } - - @Override - public TimeUnit getTimeUnit(TestObject obj) { - return TimeUnit.MILLISECONDS; - } - } - - @Test - public void testOrder() { - TimeoutBlockingQueue queue = - new TimeoutBlockingQueue(8, new TestObjectTimeoutRetriever()); - - long[] timeouts = new long[] {500, 200, 700, 300, 600, 600, 200, 800, 500}; - - for (int i = 0; i < timeouts.length; ++i) { - for (int j = 0; j <= i; ++j) { - queue.add(new TestObject(j, timeouts[j])); - queue.dump(); - } - - long prev = 0; - for (int j = 0; j <= i; ++j) { - TestObject obj = queue.poll(); - assertTrue(obj.getTimeout() >= prev); - prev = obj.getTimeout(); - queue.dump(); - } - } - } - - @Test - public void testTimeoutBlockingQueue() { - TimeoutBlockingQueue queue; - - int[][] testArray = new int[][] { - {200, 400, 600}, // append - {200, 400, 100}, // prepend - {200, 400, 300}, // insert - }; - - for (int i = 0; i < testArray.length; ++i) { - int[] sortedArray = Arrays.copyOf(testArray[i], testArray[i].length); - Arrays.sort(sortedArray); - - // test with head == 0 - queue = new TimeoutBlockingQueue(2, new TestObjectTimeoutRetriever()); - for (int j = 0; j < testArray[i].length; ++j) { - queue.add(new TestObject(j, testArray[i][j])); - queue.dump(); - } - - for (int j = 0; !queue.isEmpty(); ++j) { - assertEquals(sortedArray[j], queue.poll().getTimeout()); - } - - queue = new TimeoutBlockingQueue(2, new TestObjectTimeoutRetriever()); - queue.add(new TestObject(0, 50)); - assertEquals(50, queue.poll().getTimeout()); - - // test with head > 0 - for (int j = 0; j < testArray[i].length; ++j) { - queue.add(new TestObject(j, testArray[i][j])); - queue.dump(); - } - - for (int j = 0; !queue.isEmpty(); ++j) { - assertEquals(sortedArray[j], queue.poll().getTimeout()); - } - } - } - - @Test - public void testRemove() { - TimeoutBlockingQueue queue = - new TimeoutBlockingQueue(2, new TestObjectTimeoutRetriever()); - - final int effectiveLen = 5; - TestObject[] objs = new TestObject[6]; - for (int i = 0; i < effectiveLen; ++i) { - objs[i] = new TestObject(0, i * 10); - queue.add(objs[i]); - } - objs[effectiveLen] = new TestObject(0, effectiveLen * 10); - queue.dump(); - - for (int i = 0; i < effectiveLen; i += 2) { - assertTrue(queue.remove(objs[i])); - } - assertTrue(!queue.remove(objs[effectiveLen])); - - for (int i = 0; i < effectiveLen; ++i) { - TestObject x = queue.poll(); - assertEquals((i % 2) == 0 ? null : objs[i], x); - } - } -}