Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java (revision 1004349) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java (working copy) @@ -79,7 +79,7 @@ protected void addRegion(PriorityCompactionQueue pq, HRegion r, Priority p) { pq.add(r, p); try { - // Sleep 10 millisecond so 2 things are not put in the queue within the + // Sleep 1 millisecond so 2 things are not put in the queue within the // same millisecond. The queue breaks ties arbitrarily between two // requests inserted at the same time. We want the ordering to // be consistent for our unit test. Index: src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java (revision 1004349) +++ src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java (working copy) @@ -167,13 +167,23 @@ /** Removes the request from the regions in queue * @param p If null it will use the default priority */ - protected CompactionRequest removeFromRegionsInQueue(HRegion r) { - if (r == null) return null; + protected CompactionRequest removeFromRegionsInQueue(CompactionRequest remove) { + if (remove == null) return null; synchronized (regionsInQueue) { - CompactionRequest cr = regionsInQueue.remove(r); + CompactionRequest cr = null; + cr = regionsInQueue.remove(remove.getHRegion()); + if (cr != null && !cr.equals(remove)) + { + //Because we don't synchronize across both this.regionsInQueue and this.queue + //a rare race condition exists where a higher priority compaction request replaces + //the lower priority request in this.regionsInQueue but the lower priority request + //is taken off this.queue before the higher can be added to this.queue. + //So if we didn't remove what we were expecting we put it back on. + regionsInQueue.put(cr.getHRegion(), cr); + } if (cr == null) { - LOG.warn("Removed a region it couldn't find in regionsInQueue: " + r); + LOG.warn("Removed a region it couldn't find in regionsInQueue: " + remove.getHRegion()); } return cr; } @@ -183,7 +193,6 @@ CompactionRequest request = this.addToRegionsInQueue(e, p); if (request != null) { boolean result = queue.add(request); - queue.peek(); return result; } else { return false; @@ -233,7 +242,7 @@ public HRegion take() throws InterruptedException { CompactionRequest cr = queue.take(); if (cr != null) { - removeFromRegionsInQueue(cr.getHRegion()); + removeFromRegionsInQueue(cr); return cr.getHRegion(); } return null; @@ -243,7 +252,7 @@ public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException { CompactionRequest cr = queue.poll(timeout, unit); if (cr != null) { - removeFromRegionsInQueue(cr.getHRegion()); + removeFromRegionsInQueue(cr); return cr.getHRegion(); } return null; @@ -251,8 +260,8 @@ @Override public boolean remove(Object r) { - if (r instanceof HRegion) { - CompactionRequest cr = removeFromRegionsInQueue((HRegion) r); + if (r instanceof CompactionRequest) { + CompactionRequest cr = removeFromRegionsInQueue((CompactionRequest) r); if (cr != null) { return queue.remove(cr); } @@ -265,7 +274,7 @@ public HRegion remove() { CompactionRequest cr = queue.remove(); if (cr != null) { - removeFromRegionsInQueue(cr.getHRegion()); + removeFromRegionsInQueue(cr); return cr.getHRegion(); } return null; @@ -275,7 +284,7 @@ public HRegion poll() { CompactionRequest cr = queue.poll(); if (cr != null) { - removeFromRegionsInQueue(cr.getHRegion()); + removeFromRegionsInQueue(cr); return cr.getHRegion(); } return null;