commit 6dc011eb2d0308fcdcdf4dbd65126aee5ddb9f9d Author: nspiegelberg Date: 37 seconds ago HBASE-3796 Per-Store Entries in Compaction Queue diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 48e7b00..8a091a8 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -27,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; /** @@ -70,15 +72,17 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { @Override public void run() { while (!this.server.isStopped()) { + CompactionRequest compactionRequest = null; HRegion r = null; try { - r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); - if (r != null) { + compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); + if (compactionRequest != null) { lock.lock(); try { if(!this.server.isStopped()) { // Don't interrupt us while we are working - byte [] midKey = r.compactStores(); + r = compactionRequest.getHRegion(); + byte [] midKey = r.compactStore(compactionRequest.getStore()); if (r.getLastCompactInfo() != null) { // compaction aborted? this.server.getMetrics().addCompaction(r.getLastCompactInfo()); } @@ -115,7 +119,9 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { public synchronized void requestCompaction(final HRegion r, final String why) { - requestCompaction(r, false, why, r.getCompactPriority()); + for(Store s : r.getStores().values()) { + requestCompaction(r, s, false, why, s.getCompactPriority()); + } } public synchronized void requestCompaction(final HRegion r, @@ -123,23 +129,33 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { requestCompaction(r, false, why, p); } + public synchronized void requestCompaction(final HRegion r, + final boolean force, final String why, int p) { + for(Store s : r.getStores().values()) { + requestCompaction(r, s, force, why, p); + } + } + /** * @param r HRegion store belongs to * @param force Whether next compaction should be major * @param why Why compaction requested -- used in debug messages */ - public synchronized void requestCompaction(final HRegion r, + public synchronized void requestCompaction(final HRegion r, final Store s, final boolean force, final String why, int priority) { if (this.server.isStopped()) { return; } // tell the region to major-compact (and don't downgrade it) if (force) { - r.setForceMajorCompaction(force); + s.setForceMajorCompaction(force); } - if (compactionQueue.add(r, priority) && LOG.isDebugEnabled()) { + CompactionRequest compactionRequest = new CompactionRequest(r, s, priority); + if (compactionQueue.add(compactionRequest) && LOG.isDebugEnabled()) { LOG.debug("Compaction " + (force? "(major) ": "") + - "requested for " + r.getRegionNameAsString() + + "requested for region " + r.getRegionNameAsString() + + "/" + r.getRegionInfo().getEncodedName() + + ", store " + s + (why != null && !why.isEmpty()? " because " + why: "") + "; priority=" + priority + ", compaction queue size=" + compactionQueue.size()); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c065702..b988d2f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -788,6 +788,20 @@ public class HRegion implements HeapSize { // , Writable{ return compactStores(); } + /** + * Compact all the stores and return the split key of the first store that needs + * to be split. + */ + public byte[] compactStores() throws IOException { + byte[] splitRow = null; + for(Store s : getStores().values()) { + if(splitRow == null) { + splitRow = compactStore(s); + } + } + return splitRow; + } + /* * Called by compaction thread and after region is opened to compact the * HStores if necessary. @@ -802,7 +816,7 @@ public class HRegion implements HeapSize { // , Writable{ * @return split row if split is needed * @throws IOException e */ - public byte [] compactStores() throws IOException { + public byte [] compactStore(Store store) throws IOException { if (this.closing.get()) { LOG.debug("Skipping compaction on " + this + " because closing"); return null; @@ -836,16 +850,12 @@ public class HRegion implements HeapSize { // , Writable{ long startTime = EnvironmentEdgeManager.currentTimeMillis(); doRegionCompactionPrep(); long lastCompactSize = 0; - long maxSize = -1; boolean completed = false; try { - for (Store store: stores.values()) { - final Store.StoreSize ss = store.compact(); - lastCompactSize += store.getLastCompactSize(); - if (ss != null && ss.getSize() > maxSize) { - maxSize = ss.getSize(); - splitRow = ss.getSplitRow(); - } + final Store.StoreSize ss = store.compact(); + lastCompactSize += store.getLastCompactSize(); + if (ss != null) { + splitRow = ss.getSplitRow(); } completed = true; } catch (InterruptedIOException iioe) { @@ -2224,6 +2234,10 @@ public class HRegion implements HeapSize { // , Writable{ return this.stores.get(column); } + public Map getStores() { + return this.stores; + } + ////////////////////////////////////////////////////////////////////////////// // Support code ////////////////////////////////////////////////////////////////////////////// @@ -2403,12 +2417,12 @@ public class HRegion implements HeapSize { // , Writable{ if (!(o instanceof HRegion)) { return false; } - return Bytes.equals(this.regionInfo.getRegionName(), ((HRegion)o).regionInfo.getRegionName()); + return Bytes.equals(this.getRegionName(), ((HRegion) o).getRegionName()); } @Override public int hashCode() { - return Bytes.hashCode(this.regionInfo.getRegionName()); + return Bytes.hashCode(this.getRegionName()); } @Override diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java b/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java index 5cab5bd..542cd6e 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java @@ -20,132 +20,69 @@ package org.apache.hadoop.hbase.regionserver; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Pair; /** - * This class delegates to the BlockingQueue but wraps all HRegions in + * This class delegates to the BlockingQueue but wraps all Stores in * compaction requests that hold the priority and the date requested. * * Implementation Note: With an elevation time of -1 there is the potential for * starvation of the lower priority compaction requests as long as there is a * constant stream of high priority requests. */ -public class PriorityCompactionQueue implements BlockingQueue { +public class PriorityCompactionQueue implements BlockingQueue { static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class); - - /** - * This class represents a compaction request and holds the region, priority, - * and time submitted. - */ - private class CompactionRequest implements Comparable { - private final HRegion r; - private final int p; - private final Date date; - - public CompactionRequest(HRegion r, int p) { - this(r, p, null); - } - - public CompactionRequest(HRegion r, int p, Date d) { - if (r == null) { - throw new NullPointerException("HRegion cannot be null"); - } - - if (d == null) { - d = new Date(); - } - - this.r = r; - this.p = p; - this.date = d; - } - - /** - * This function will define where in the priority queue the request will - * end up. Those with the highest priorities will be first. When the - * priorities are the same it will It will first compare priority then date - * to maintain a FIFO functionality. - * - *

Note: The date is only accurate to the millisecond which means it is - * possible that two requests were inserted into the queue within a - * millisecond. When that is the case this function will break the tie - * arbitrarily. - */ - @Override - public int compareTo(CompactionRequest request) { - //NOTE: The head of the priority queue is the least element - if (this.equals(request)) { - return 0; //they are the same request - } - int compareVal; - - compareVal = p - request.p; //compare priority - if (compareVal != 0) { - return compareVal; - } - - compareVal = date.compareTo(request.date); - if (compareVal != 0) { - return compareVal; - } - - //break the tie arbitrarily - return -1; - } - - /** Gets the HRegion for the request */ - HRegion getHRegion() { - return r; - } - - /** Gets the priority for the request */ - int getPriority() { - return p; - } - - public String toString() { - return "regionName=" + r.getRegionNameAsString() + - ", priority=" + p + ", date=" + date; - } - } - + /** The actual blocking queue we delegate to */ protected final BlockingQueue queue = new PriorityBlockingQueue(); - /** Hash map of the HRegions contained within the Compaction Queue */ - private final HashMap regionsInQueue = - new HashMap(); + /** Hash map of the Stores contained within the Compaction Queue */ + private final HashMap, CompactionRequest> storesInQueue = + new HashMap, CompactionRequest>(); /** Creates a new PriorityCompactionQueue with no priority elevation time */ public PriorityCompactionQueue() { LOG.debug("Create PriorityCompactionQueue"); } - /** If the region is not already in the queue it will add it and return a + protected Pair toPair(CompactionRequest cr) { + return Pair.newPair(cr.getHRegion(), cr.getStore()); + } + + /** If the store is not already in the queue it will add it and return a * new compaction request object. If it is already present in the queue * then it will return null. * @param p If null it will use the default priority * @return returns a compaction request if it isn't already in the queue */ - protected CompactionRequest addToRegionsInQueue(HRegion r, int p) { + protected CompactionRequest addToCompactionQueue(CompactionRequest newRequest) { CompactionRequest queuedRequest = null; - CompactionRequest newRequest = new CompactionRequest(r, p); - synchronized (regionsInQueue) { - queuedRequest = regionsInQueue.get(r); + synchronized (storesInQueue) { + queuedRequest = storesInQueue.get(toPair(newRequest)); if (queuedRequest == null || newRequest.getPriority() < queuedRequest.getPriority()) { - LOG.trace("Inserting region in queue. " + newRequest); - regionsInQueue.put(r, newRequest); + String reason = ""; + if (queuedRequest != null) { + if (newRequest.getPriority() < queuedRequest.getPriority()) { + reason = "Reason : priority changed from " + + queuedRequest.getPriority() + " to " + + newRequest.getPriority() + ". "; + } + } + LOG.debug("Inserting store in queue. " + reason + newRequest); + storesInQueue.put(toPair(newRequest), newRequest); } else { - LOG.trace("Region already in queue, skipping. Queued: " + queuedRequest + + LOG.debug("Store already in queue, skipping. Queued: " + queuedRequest + ", requested: " + newRequest); newRequest = null; // It is already present so don't add it } @@ -159,33 +96,34 @@ public class PriorityCompactionQueue implements BlockingQueue { return newRequest; } - /** Removes the request from the regions in queue + /** Removes the request from the stores in queue * @param remove */ - protected CompactionRequest removeFromRegionsInQueue(CompactionRequest remove) { - if (remove == null) return null; + protected CompactionRequest removeFromQueue(CompactionRequest c) { + if (c == null) return null; - synchronized (regionsInQueue) { - CompactionRequest cr = null; - cr = regionsInQueue.remove(remove.getHRegion()); - if (cr != null && !cr.equals(remove)) + synchronized (storesInQueue) { + CompactionRequest cr = storesInQueue.remove(toPair(c)); + if (cr != null && !cr.equals(c)) { //Because we don't synchronize across both this.regionsInQueue and this.queue //a rare race condition exists where a higher priority compaction request replaces //the lower priority request in this.regionsInQueue but the lower priority request //is taken off this.queue before the higher can be added to this.queue. //So if we didn't remove what we were expecting we put it back on. - regionsInQueue.put(cr.getHRegion(), cr); + storesInQueue.put(toPair(cr), cr); } if (cr == null) { - LOG.warn("Removed a region it couldn't find in regionsInQueue: " + remove.getHRegion()); + LOG.warn("Removed a compaction request it couldn't find in storesInQueue: " + + "region = " + c.getHRegion() + ", store = " + c.getStore()); } return cr; } } - public boolean add(HRegion e, int p) { - CompactionRequest request = this.addToRegionsInQueue(e, p); + @Override + public boolean add(CompactionRequest e) { + CompactionRequest request = this.addToCompactionQueue(e); if (request != null) { boolean result = queue.add(request); return result; @@ -195,68 +133,50 @@ public class PriorityCompactionQueue implements BlockingQueue { } @Override - public boolean add(HRegion e) { - return add(e, e.getCompactPriority()); - } - - public boolean offer(HRegion e, int p) { - CompactionRequest request = this.addToRegionsInQueue(e, p); + public boolean offer(CompactionRequest e) { + CompactionRequest request = this.addToCompactionQueue(e); return (request != null)? queue.offer(request): false; } @Override - public boolean offer(HRegion e) { - return offer(e, e.getCompactPriority()); - } - - public void put(HRegion e, int p) throws InterruptedException { - CompactionRequest request = this.addToRegionsInQueue(e, p); + public void put(CompactionRequest e) throws InterruptedException { + CompactionRequest request = this.addToCompactionQueue(e); if (request != null) { queue.put(request); } } @Override - public void put(HRegion e) throws InterruptedException { - put(e, e.getCompactPriority()); - } - - public boolean offer(HRegion e, int p, long timeout, TimeUnit unit) + public boolean offer(CompactionRequest e, long timeout, TimeUnit unit) throws InterruptedException { - CompactionRequest request = this.addToRegionsInQueue(e, p); + CompactionRequest request = this.addToCompactionQueue(e); return (request != null)? queue.offer(request, timeout, unit): false; } @Override - public boolean offer(HRegion e, long timeout, TimeUnit unit) - throws InterruptedException { - return offer(e, e.getCompactPriority(), timeout, unit); - } - - @Override - public HRegion take() throws InterruptedException { + public CompactionRequest take() throws InterruptedException { CompactionRequest cr = queue.take(); if (cr != null) { - removeFromRegionsInQueue(cr); - return cr.getHRegion(); + removeFromQueue(cr); + return cr; } return null; } @Override - public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException { + public CompactionRequest poll(long timeout, TimeUnit unit) throws InterruptedException { CompactionRequest cr = queue.poll(timeout, unit); if (cr != null) { - removeFromRegionsInQueue(cr); - return cr.getHRegion(); + removeFromQueue(cr); + return cr; } return null; } @Override - public boolean remove(Object r) { - if (r instanceof CompactionRequest) { - CompactionRequest cr = removeFromRegionsInQueue((CompactionRequest) r); + public boolean remove(Object o) { + if (o instanceof CompactionRequest) { + CompactionRequest cr = removeFromQueue((CompactionRequest) o); if (cr != null) { return queue.remove(cr); } @@ -266,21 +186,21 @@ public class PriorityCompactionQueue implements BlockingQueue { } @Override - public HRegion remove() { + public CompactionRequest remove() { CompactionRequest cr = queue.remove(); if (cr != null) { - removeFromRegionsInQueue(cr); - return cr.getHRegion(); + removeFromQueue(cr); + return cr; } return null; } @Override - public HRegion poll() { + public CompactionRequest poll() { CompactionRequest cr = queue.poll(); if (cr != null) { - removeFromRegionsInQueue(cr); - return cr.getHRegion(); + removeFromQueue(cr); + return cr; } return null; } @@ -292,9 +212,9 @@ public class PriorityCompactionQueue implements BlockingQueue { @Override public boolean contains(Object r) { - if (r instanceof HRegion) { - synchronized (regionsInQueue) { - return regionsInQueue.containsKey((HRegion) r); + if (r instanceof CompactionRequest) { + synchronized (storesInQueue) { + return storesInQueue.containsKey(toPair((CompactionRequest) r)); } } else if (r instanceof CompactionRequest) { return queue.contains(r); @@ -303,15 +223,15 @@ public class PriorityCompactionQueue implements BlockingQueue { } @Override - public HRegion element() { + public CompactionRequest element() { CompactionRequest cr = queue.element(); - return (cr != null)? cr.getHRegion(): null; + return (cr != null)? cr: null; } @Override - public HRegion peek() { + public CompactionRequest peek() { CompactionRequest cr = queue.peek(); - return (cr != null)? cr.getHRegion(): null; + return (cr != null)? cr: null; } @Override @@ -326,14 +246,14 @@ public class PriorityCompactionQueue implements BlockingQueue { @Override public void clear() { - regionsInQueue.clear(); + storesInQueue.clear(); queue.clear(); } // Unimplemented methods, collection methods @Override - public Iterator iterator() { + public Iterator iterator() { throw new UnsupportedOperationException("Not supported."); } @@ -353,7 +273,7 @@ public class PriorityCompactionQueue implements BlockingQueue { } @Override - public boolean addAll(Collection c) { + public boolean addAll(Collection c) { throw new UnsupportedOperationException("Not supported."); } @@ -368,12 +288,12 @@ public class PriorityCompactionQueue implements BlockingQueue { } @Override - public int drainTo(Collection c) { + public int drainTo(Collection c) { throw new UnsupportedOperationException("Not supported."); } @Override - public int drainTo(Collection c, int maxElements) { + public int drainTo(Collection c, int maxElements) { throw new UnsupportedOperationException("Not supported."); } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index b389849..47c5781 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -1479,7 +1479,7 @@ public class Store implements HeapSize { /** * @return The priority that this store should have in the compaction queue */ - int getCompactPriority() { + public int getCompactPriority() { return this.blockingStoreFileCount - this.storefiles.size(); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java new file mode 100644 index 0000000..efda100 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -0,0 +1,103 @@ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.util.Date; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; + + /** + * This class represents a compaction request and holds the region, priority, + * and time submitted. + */ + public class CompactionRequest implements Comparable { + static final Log LOG = LogFactory.getLog(CompactionRequest.class); + private final HRegion r; + private final Store s; + private int p; + private final Date date; + + public CompactionRequest(HRegion r, Store s) { + this(r, s, s.getCompactPriority()); + } + + public CompactionRequest(HRegion r, Store s, int p) { + this(r, s, p, null); + } + + public CompactionRequest(HRegion r, Store s, int p, Date d) { + if (r == null) { + throw new NullPointerException("HRegion cannot be null"); + } + + if (d == null) { + d = new Date(); + } + + this.r = r; + this.s = s; + this.p = p; + this.date = d; + } + + /** + * This function will define where in the priority queue the request will + * end up. Those with the highest priorities will be first. When the + * priorities are the same it will It will first compare priority then date + * to maintain a FIFO functionality. + * + *

Note: The date is only accurate to the millisecond which means it is + * possible that two requests were inserted into the queue within a + * millisecond. When that is the case this function will break the tie + * arbitrarily. + */ + @Override + public int compareTo(CompactionRequest request) { + //NOTE: The head of the priority queue is the least element + if (this.equals(request)) { + return 0; //they are the same request + } + int compareVal; + + compareVal = p - request.p; //compare priority + if (compareVal != 0) { + return compareVal; + } + + compareVal = date.compareTo(request.date); + if (compareVal != 0) { + return compareVal; + } + + //break the tie arbitrarily + return -1; + } + + /** Gets the HRegion for the request */ + public HRegion getHRegion() { + return r; + } + + /** Gets the Store for the request */ + public Store getStore() { + return s; + } + + /** Gets the priority for the request */ + public int getPriority() { + return p; + } + + /** Gets the priority for the request */ + public void setPriority(int p) { + this.p = p; + } + + public String toString() { + return "regionName=" + r.getRegionNameAsString() + + ((s == null) ? "" + : "storeName = " + new String(s.getFamily().getName())) + + ", priority=" + p + ", date=" + date; + } + } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java index dc2743f..c5d876d 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -61,6 +63,10 @@ public class TestPriorityCompactionQueue { return "[DummyHRegion " + name + "]"; } + public byte[] getRegionName() { + return Bytes.toBytes(name); + } + public String getRegionNameAsString() { return name; } @@ -68,7 +74,7 @@ public class TestPriorityCompactionQueue { protected void getAndCheckRegion(PriorityCompactionQueue pq, HRegion checkRegion) { - HRegion r = pq.remove(); + HRegion r = pq.remove().getHRegion(); if (r != checkRegion) { Assert.assertTrue("Didn't get expected " + checkRegion + " got " + r, r .equals(checkRegion)); @@ -76,7 +82,7 @@ public class TestPriorityCompactionQueue { } protected void addRegion(PriorityCompactionQueue pq, HRegion r, int p) { - pq.add(r, p); + pq.add(new CompactionRequest(r, null, p)); try { // Sleep 1 millisecond so 2 things are not put in the queue within the // same millisecond. The queue breaks ties arbitrarily between two