Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java (revision 1027787) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java (working copy) @@ -21,7 +21,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -76,7 +75,7 @@ } } - protected void addRegion(PriorityCompactionQueue pq, HRegion r, Priority p) { + protected void addRegion(PriorityCompactionQueue pq, HRegion r, int p) { pq.add(r, p); try { // Sleep 1 millisecond so 2 things are not put in the queue within the @@ -105,11 +104,11 @@ // test 1 // check fifo w/priority - addRegion(pq, r1, Priority.HIGH_BLOCKING); - addRegion(pq, r2, Priority.HIGH_BLOCKING); - addRegion(pq, r3, Priority.HIGH_BLOCKING); - addRegion(pq, r4, Priority.HIGH_BLOCKING); - addRegion(pq, r5, Priority.HIGH_BLOCKING); + addRegion(pq, r1, 0); + addRegion(pq, r2, 0); + addRegion(pq, r3, 0); + addRegion(pq, r4, 0); + addRegion(pq, r5, 0); getAndCheckRegion(pq, r1); getAndCheckRegion(pq, r2); @@ -118,26 +117,12 @@ getAndCheckRegion(pq, r5); // test 2 - // check fifo - addRegion(pq, r1, null); - addRegion(pq, r2, null); - addRegion(pq, r3, null); - addRegion(pq, r4, null); - addRegion(pq, r5, null); - - getAndCheckRegion(pq, r1); - getAndCheckRegion(pq, r2); - getAndCheckRegion(pq, r3); - getAndCheckRegion(pq, r4); - getAndCheckRegion(pq, r5); - - // test 3 // check fifo w/mixed priority - addRegion(pq, r1, Priority.HIGH_BLOCKING); - addRegion(pq, r2, Priority.NORMAL); - addRegion(pq, r3, Priority.HIGH_BLOCKING); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.HIGH_BLOCKING); + addRegion(pq, r1, 0); + addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, 0); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, 0); getAndCheckRegion(pq, r1); getAndCheckRegion(pq, r3); @@ -145,13 +130,13 @@ getAndCheckRegion(pq, r2); getAndCheckRegion(pq, r4); - // test 4 + // test 3 // check fifo w/mixed priority - addRegion(pq, r1, Priority.NORMAL); - addRegion(pq, r2, Priority.NORMAL); - addRegion(pq, r3, Priority.NORMAL); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.HIGH_BLOCKING); + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, 0); getAndCheckRegion(pq, r5); getAndCheckRegion(pq, r1); @@ -159,14 +144,14 @@ getAndCheckRegion(pq, r3); getAndCheckRegion(pq, r4); - // test 5 + // test 4 // check fifo w/mixed priority elevation time - addRegion(pq, r1, Priority.NORMAL); - addRegion(pq, r2, Priority.HIGH_BLOCKING); - addRegion(pq, r3, Priority.NORMAL); + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, 0); + addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); Thread.sleep(1000); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.HIGH_BLOCKING); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, 0); getAndCheckRegion(pq, r2); getAndCheckRegion(pq, r5); @@ -177,15 +162,15 @@ // reset the priority compaction queue back to a normal queue pq = new PriorityCompactionQueue(); - // test 7 + // test 5 // test that lower priority are removed from the queue when a high priority // is added - addRegion(pq, r1, Priority.NORMAL); - addRegion(pq, r2, Priority.NORMAL); - addRegion(pq, r3, Priority.NORMAL); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.NORMAL); - addRegion(pq, r3, Priority.HIGH_BLOCKING); + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, 0); getAndCheckRegion(pq, r3); getAndCheckRegion(pq, r1); @@ -195,18 +180,18 @@ Assert.assertTrue("Queue should be empty.", pq.size() == 0); - // test 8 + // test 6 // don't add the same region more than once - addRegion(pq, r1, Priority.NORMAL); - addRegion(pq, r2, Priority.NORMAL); - addRegion(pq, r3, Priority.NORMAL); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.NORMAL); - addRegion(pq, r1, Priority.NORMAL); - addRegion(pq, r2, Priority.NORMAL); - addRegion(pq, r3, Priority.NORMAL); - addRegion(pq, r4, Priority.NORMAL); - addRegion(pq, r5, Priority.NORMAL); + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r5, CompactSplitThread.PRIORITY_USER); getAndCheckRegion(pq, r1); getAndCheckRegion(pq, r2); @@ -215,5 +200,19 @@ getAndCheckRegion(pq, r5); Assert.assertTrue("Queue should be empty.", pq.size() == 0); + + // test 7 + // we can handle negative priorities + addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); + addRegion(pq, r2, -1); + addRegion(pq, r3, 0); + addRegion(pq, r4, -2); + + getAndCheckRegion(pq, r4); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r1); + + Assert.assertTrue("Queue should be empty.", pq.size() == 0); } -} \ No newline at end of file +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java (revision 1027787) +++ src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java (working copy) @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority; /** * This class delegates to the BlockingQueue but wraps all HRegions in @@ -47,22 +46,18 @@ */ private class CompactionRequest implements Comparable { private final HRegion r; - private final Priority p; + private final int p; private final Date date; - public CompactionRequest(HRegion r, Priority p) { + public CompactionRequest(HRegion r, int p) { this(r, p, null); } - public CompactionRequest(HRegion r, Priority p, Date d) { + public CompactionRequest(HRegion r, int p, Date d) { if (r == null) { throw new NullPointerException("HRegion cannot be null"); } - if (p == null) { - p = Priority.NORMAL; //the default priority - } - if (d == null) { d = new Date(); } @@ -91,7 +86,7 @@ } int compareVal; - compareVal = p.compareTo(request.p); //compare priority + compareVal = p - request.p; //compare priority if (compareVal != 0) { return compareVal; } @@ -111,7 +106,7 @@ } /** Gets the priority for the request */ - Priority getPriority() { + int getPriority() { return p; } @@ -140,13 +135,13 @@ * @param p If null it will use the default priority * @return returns a compaction request if it isn't already in the queue */ - protected CompactionRequest addToRegionsInQueue(HRegion r, Priority p) { + protected CompactionRequest addToRegionsInQueue(HRegion r, int p) { CompactionRequest queuedRequest = null; CompactionRequest newRequest = new CompactionRequest(r, p); synchronized (regionsInQueue) { queuedRequest = regionsInQueue.get(r); if (queuedRequest == null || - newRequest.getPriority().compareTo(queuedRequest.getPriority()) < 0) { + newRequest.getPriority() < queuedRequest.getPriority()) { LOG.trace("Inserting region in queue. " + newRequest); regionsInQueue.put(r, newRequest); } else { @@ -189,7 +184,7 @@ } } - public boolean add(HRegion e, Priority p) { + public boolean add(HRegion e, int p) { CompactionRequest request = this.addToRegionsInQueue(e, p); if (request != null) { boolean result = queue.add(request); @@ -201,20 +196,20 @@ @Override public boolean add(HRegion e) { - return add(e, null); + return add(e, e.getCompactPriority()); } - public boolean offer(HRegion e, Priority p) { + public boolean offer(HRegion e, int p) { CompactionRequest request = this.addToRegionsInQueue(e, p); return (request != null)? queue.offer(request): false; } @Override public boolean offer(HRegion e) { - return offer(e, null); + return offer(e, e.getCompactPriority()); } - public void put(HRegion e, Priority p) throws InterruptedException { + public void put(HRegion e, int p) throws InterruptedException { CompactionRequest request = this.addToRegionsInQueue(e, p); if (request != null) { queue.put(request); @@ -223,10 +218,10 @@ @Override public void put(HRegion e) throws InterruptedException { - put(e, null); + put(e, e.getCompactPriority()); } - public boolean offer(HRegion e, Priority p, long timeout, TimeUnit unit) + public boolean offer(HRegion e, int p, long timeout, TimeUnit unit) throws InterruptedException { CompactionRequest request = this.addToRegionsInQueue(e, p); return (request != null)? queue.offer(request, timeout, unit): false; @@ -235,7 +230,7 @@ @Override public boolean offer(HRegion e, long timeout, TimeUnit unit) throws InterruptedException { - return offer(e, null, timeout, unit); + return offer(e, e.getCompactPriority(), timeout, unit); } @Override Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1027787) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2021,7 +2021,8 @@ // force a compaction, split will be side-effect // TODO: flush/compact/split refactor will make it trivial to do this // sync/async (and won't require us to do a compaction to split!) - compactSplitThread.requestCompaction(region, "User-triggered split"); + compactSplitThread.requestCompaction(region, "User-triggered split", + CompactSplitThread.PRIORITY_USER); } @Override @@ -2031,7 +2032,8 @@ region.flushcache(); region.shouldSplit(true); compactSplitThread.requestCompaction(region, major, "User-triggered " - + (major ? "major " : "") + "compaction"); + + (major ? "major " : "") + "compaction", + CompactSplitThread.PRIORITY_USER); } /** @return the info server */ Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java (revision 1027787) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java (working copy) @@ -25,4 +25,11 @@ * @param why Why compaction was requested -- used in debug messages */ public void requestCompaction(final HRegion r, final String why); + + /** + * @param r Region to compact + * @param why Why compaction was requested -- used in debug messages + * @param pri Priority of this compaction. minHeap. <=0 is critical + */ + public void requestCompaction(final HRegion r, final String why, int pri); } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1027787) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -212,8 +212,7 @@ LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); } - this.server.compactSplitThread.requestCompaction(region, getName(), - CompactSplitThread.Priority.HIGH_BLOCKING); + this.server.compactSplitThread.requestCompaction(region, getName()); // Put back on the queue. Have it come back out of the queue // after a delay of this.blockingWaitTime / 100 ms. this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 1027787) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy) @@ -43,34 +43,11 @@ private final PriorityCompactionQueue compactionQueue = new PriorityCompactionQueue(); - /** The priorities for a compaction request. */ - public enum Priority implements Comparable { - //NOTE: All priorities should be numbered consecutively starting with 1. - //The highest priority should be 1 followed by all lower priorities. - //Priorities can be changed at anytime without requiring any changes to the - //queue. + /* The default priority for user-specified compaction requests. + * The user gets top priority unless we have blocking compactions. (Pri <= 0) + */ + public static final int PRIORITY_USER = 1; - /** HIGH_BLOCKING should only be used when an operation is blocked until a - * compact / split is done (e.g. a MemStore can't flush because it has - * "too many store files" and is blocking until a compact / split is done) - */ - HIGH_BLOCKING(1), - /** A normal compaction / split request */ - NORMAL(2), - /** A low compaction / split request -- not currently used */ - LOW(3); - - int value; - - Priority(int value) { - this.value = value; - } - - int getInt() { - return value; - } - } - /** * Splitting should not take place if the total number of regions exceed this. * This is not a hard limit to the number of regions but it is a guideline to @@ -102,10 +79,24 @@ if(!this.server.isStopped()) { // Don't interrupt us while we are working byte [] midKey = r.compactStores(); - if (shouldSplitRegion() && midKey != null && - !this.server.isStopped()) { - split(r, midKey); + if (LOG.isDebugEnabled()) { + HRegion next = this.compactionQueue.peek(); + LOG.debug("Just finished a compaction. " + + " Current Compaction Queue: size=" + + getCompactionQueueSize() + + ((next != null) ? + ", topPri=" + next.getCompactPriority() : "")); } + if (!this.server.isStopped()) { + // requests that were added during compaction will have a + // stale priority. remove and re-insert to update priority + boolean hadCompaction = compactionQueue.remove(r); + if (shouldSplitRegion() && midKey != null) { + split(r, midKey); + } else if (hadCompaction) { + compactionQueue.add(r); + } + } } } finally { lock.unlock(); @@ -135,30 +126,28 @@ public synchronized void requestCompaction(final HRegion r, final String why) { - requestCompaction(r, false, why, Priority.NORMAL); + requestCompaction(r, false, why, r.getCompactPriority()); } public synchronized void requestCompaction(final HRegion r, - final String why, Priority p) { + final String why, int p) { requestCompaction(r, false, why, p); } - public synchronized void requestCompaction(final HRegion r, - final boolean force, final String why) { - requestCompaction(r, force, why, Priority.NORMAL); - } - /** * @param r HRegion store belongs to * @param force Whether next compaction should be major * @param why Why compaction requested -- used in debug messages */ public synchronized void requestCompaction(final HRegion r, - final boolean force, final String why, Priority priority) { + final boolean force, final String why, int priority) { if (this.server.isStopped()) { return; } - r.setForceMajorCompaction(force); + // tell the region to major-compact (and don't downgrade it) + if (force) { + r.setForceMajorCompaction(force); + } if (compactionQueue.add(r, priority) && LOG.isDebugEnabled()) { LOG.debug("Compaction " + (force? "(major) ": "") + "requested for region " + r.getRegionNameAsString() + Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1027787) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3255,6 +3255,17 @@ } /** + * @return The priority that this region should have in the compaction queue + */ + public int getCompactPriority() { + int count = Integer.MAX_VALUE; + for(Store store : stores.values()) { + count = Math.min(count, store.getCompactPriority()); + } + return count; + } + + /** * Checks every store to see if one has too many * store files * @return true if any store has too many store files Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1027787) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -94,6 +94,7 @@ /* how many bytes to write between status checks */ static int closeCheckInterval = 0; private final long desiredMaxFileSize; + private final int blockingStoreFileCount; private volatile long storeSize = 0L; private final Object flushLock = new Object(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -185,6 +186,8 @@ HConstants.DEFAULT_MAX_FILE_SIZE); } this.desiredMaxFileSize = maxFileSize; + this.blockingStoreFileCount = + conf.getInt("hbase.hstore.blockingStoreFiles", -1); this.majorCompactionTime = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 86400000); @@ -1322,6 +1325,13 @@ } return size; } + + /** + * @return The priority that this store should have in the compaction queue + */ + int getCompactPriority() { + return this.blockingStoreFileCount - this.storefiles.size(); + } /** * Datastructure that holds size and row to split a file around.