diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java index b525136..09531d1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java @@ -245,7 +245,7 @@ public class BoundedPriorityBlockingQueue extends AbstractQueue implements E result = null; try { while (queue.size() == 0 && nanos > 0) { - notEmpty.awaitNanos(nanos); + nanos = notEmpty.awaitNanos(nanos); } if (queue.size() > 0) { result = queue.poll(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java index f09c79c..7df0991 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java @@ -18,10 +18,16 @@ package org.apache.hadoop.hbase.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import java.util.Comparator; +import java.util.PriorityQueue; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.SmallTests; @@ -175,4 +181,57 @@ public class TestBoundedPriorityBlockingQueue { } assertEquals(null, queue.poll()); } + + @Test + public void testPoll() { + assertNull(queue.poll()); + PriorityQueue testList = new PriorityQueue(CAPACITY, new TestObjectComparator()); + + for (int i = 0; i < CAPACITY; ++i) { + TestObject obj = new TestObject(i, i); + testList.add(obj); + queue.offer(obj); + } + + for (int i = 0; i < CAPACITY; ++i) { + assertEquals(testList.poll(), queue.poll()); + } + + assertNull(null, queue.poll()); + } + + @Test + public void testPollInExecutor() { + + final TestObject testObj = new TestObject(0, 0); + + final CyclicBarrier threadsStarted = new CyclicBarrier(2); + ExecutorService executor = Executors.newFixedThreadPool(2); + executor.execute(new Runnable() { + public void run() { + try { + assertNull(queue.poll()); + threadsStarted.await(); + // queue.poll should be blocked until the other thread inserts an item into the queue + assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS)); + assertTrue(queue.isEmpty()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + executor.execute(new Runnable() { + public void run() { + try { + threadsStarted.await(); + queue.offer(testObj); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + executor.shutdown(); + } }