diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 24450ae..3109056 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -19,36 +19,38 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; + +import com.google.common.base.Preconditions; /** * Compact region on request and then run split if appropriate */ -public class CompactSplitThread extends Thread implements CompactionRequestor { +public class CompactSplitThread implements CompactionRequestor { static final Log LOG = LogFactory.getLog(CompactSplitThread.class); - private final long frequency; - private final ReentrantLock lock = new ReentrantLock(); private final HRegionServer server; private final Configuration conf; - private final PriorityCompactionQueue compactionQueue = - new PriorityCompactionQueue(); + private final ThreadPoolExecutor largeCompactions; + private final ThreadPoolExecutor smallCompactions; + private final ThreadPoolExecutor splits; + private final long throttleSize; /* The default priority for user-specified compaction requests. * The user gets top priority unless we have blocking compactions. (Pri <= 0) */ public static final int PRIORITY_USER = 1; + public static final int NO_PRIORITY = Integer.MIN_VALUE; /** * Splitting should not take place if the total number of regions exceed this. @@ -58,154 +60,159 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { private int regionSplitLimit; /** @param server */ - public CompactSplitThread(HRegionServer server) { + CompactSplitThread(HRegionServer server) { super(); this.server = server; this.conf = server.getConfiguration(); this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit", Integer.MAX_VALUE); - this.frequency = - conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", - 20 * 1000); + + int largeThreads = Math.max(1, conf.getInt( + "hbase.regionserver.thread.compaction.large", 1)); + int smallThreads = conf.getInt( + "hbase.regionserver.thread.compaction.small", 0); + throttleSize = conf.getLong( + "hbase.regionserver.thread.compaction.throttle", 0); + int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1); + + // if we have throttle threads, make sure the user also specified size + Preconditions.checkArgument(smallThreads == 0 || throttleSize > 0); + + this.largeCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, + 60, TimeUnit.SECONDS, new PriorityBlockingQueue()); + this.largeCompactions + .setRejectedExecutionHandler(new CompactionRequest.Rejection()); + if (smallThreads <= 0) { + this.smallCompactions = null; + } else { + this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, + 60, TimeUnit.SECONDS, new PriorityBlockingQueue()); + this.smallCompactions + .setRejectedExecutionHandler(new CompactionRequest.Rejection()); + } + this.splits = (ThreadPoolExecutor) Executors + .newFixedThreadPool(splitThreads); } @Override - public void run() { - while (!this.server.isStopped()) { - CompactionRequest compactionRequest = null; - HRegion r = null; - try { - compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); - if (compactionRequest != null) { - lock.lock(); - try { - if(!this.server.isStopped()) { - // Don't interrupt us while we are working - r = compactionRequest.getHRegion(); - byte [] midKey = r.compactStore(compactionRequest.getStore()); - if (r.getLastCompactInfo() != null) { // compaction aborted? - this.server.getMetrics().addCompaction(r.getLastCompactInfo()); - } - if (shouldSplitRegion() && midKey != null && - !this.server.isStopped()) { - split(r, midKey); - } - } - } finally { - lock.unlock(); - } - } - } catch (InterruptedException ex) { - continue; - } catch (IOException ex) { - LOG.error("Compaction/Split failed for region " + - r.getRegionNameAsString(), - RemoteExceptionHandler.checkIOException(ex)); - if (!server.checkFileSystem()) { - break; - } - } catch (Exception ex) { - LOG.error("Compaction failed" + - (r != null ? (" for region " + r.getRegionNameAsString()) : ""), - ex); - if (!server.checkFileSystem()) { - break; - } + public String toString() { + return "compaction_queue=" + + (smallCompactions != null ? "(" + + largeCompactions.getQueue().size() + ":" + + smallCompactions.getQueue().size() + ")" + : largeCompactions.getQueue().size()) + + ", split_queue=" + splits.getQueue().size(); + } + + public synchronized boolean requestSplit(final HRegion r) { + // don't split regions that are blocking + if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) { + byte[] midKey = r.checkSplit(); + if (midKey != null) { + requestSplit(r, midKey); + return true; } } - compactionQueue.clear(); - LOG.info(getName() + " exiting"); + return false; + } + + public synchronized void requestSplit(final HRegion r, byte[] midKey) { + try { + SplitRequest sr = new SplitRequest(r, midKey); + sr.setServer(this.server); + this.splits.execute(sr); + if (LOG.isDebugEnabled()) { + LOG.debug("Split requested for " + r + ". " + this); + } + } catch (RejectedExecutionException ree) { + LOG.info("Could not execute split for " + r, ree); + } } public synchronized void requestCompaction(final HRegion r, final String why) { for(Store s : r.getStores().values()) { - requestCompaction(r, s, false, why, s.getCompactPriority()); + requestCompaction(r, s, why, NO_PRIORITY); } } - public synchronized void requestCompaction(final HRegion r, - final String why, int p) { - requestCompaction(r, false, why, p); + public synchronized void requestCompaction(final HRegion r, final Store s, + final String why) { + requestCompaction(r, s, why, NO_PRIORITY); } - public synchronized void requestCompaction(final HRegion r, - final boolean force, final String why, int p) { + public synchronized void requestCompaction(final HRegion r, final String why, + int p) { for(Store s : r.getStores().values()) { - requestCompaction(r, s, force, why, p); + requestCompaction(r, s, why, p); } } /** * @param r HRegion store belongs to - * @param force Whether next compaction should be major + * @param s Store to request compaction on * @param why Why compaction requested -- used in debug messages + * @param priority override the default priority (NO_PRIORITY == decide) */ public synchronized void requestCompaction(final HRegion r, final Store s, - final boolean force, final String why, int priority) { + final String why, int priority) { if (this.server.isStopped()) { return; } - // tell the region to major-compact (and don't downgrade it) - if (force) { - s.setForceMajorCompaction(force); - } - CompactionRequest compactionRequest = new CompactionRequest(r, s, priority); - if (compactionQueue.add(compactionRequest) && LOG.isDebugEnabled()) { - LOG.debug("Compaction " + (force? "(major) ": "") + - "requested for region " + r.getRegionNameAsString() + - "/" + r.getRegionInfo().getEncodedName() + - ", store " + s + - (why != null && !why.isEmpty()? " because " + why: "") + - "; priority=" + priority + ", compaction queue size=" + compactionQueue.size()); - } - } - - private void split(final HRegion parent, final byte [] midKey) - throws IOException { - final long startTime = System.currentTimeMillis(); - SplitTransaction st = new SplitTransaction(parent, midKey); - // If prepare does not return true, for some reason -- logged inside in - // the prepare call -- we are not ready to split just now. Just return. - if (!st.prepare()) return; - try { - st.execute(this.server, this.server); - } catch (Exception e) { - try { - LOG.info("Running rollback of failed split of " + - parent.getRegionNameAsString() + "; " + e.getMessage()); - st.rollback(this.server, this.server); - LOG.info("Successful rollback of failed split of " + - parent.getRegionNameAsString()); - } catch (RuntimeException ee) { - // If failed rollback, kill this server to avoid having a hole in table. - LOG.info("Failed rollback of failed split of " + - parent.getRegionNameAsString() + " -- aborting server", ee); - this.server.abort("Failed split"); + CompactionRequest cr = s.requestCompaction(); + if (cr != null) { + cr.setServer(server); + if (priority != NO_PRIORITY) { + cr.setPriority(priority); + } + ThreadPoolExecutor pool = largeCompactions; + if (smallCompactions != null && throttleSize > cr.getSize()) { + // smallCompactions is like the 10 items or less line at Walmart + pool = smallCompactions; + } + pool.execute(cr); + if (LOG.isDebugEnabled()) { + String type = ""; + if (smallCompactions != null) { + type = (pool == smallCompactions) ? "Small " : "Large "; + } + LOG.debug(type + "Compaction requested: " + cr + + (why != null && !why.isEmpty() ? "; Because: " + why : "") + + "; " + this); } - return; } - - LOG.info("Region split, META updated, and report to master. Parent=" + - parent.getRegionInfo().getRegionNameAsString() + ", new regions: " + - st.getFirstDaughter().getRegionNameAsString() + ", " + - st.getSecondDaughter().getRegionNameAsString() + ". Split took " + - StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); } /** * Only interrupt once it's done with a run through the work loop. */ void interruptIfNecessary() { - if (lock.tryLock()) { + splits.shutdown(); + largeCompactions.shutdown(); + if (smallCompactions != null) + smallCompactions.shutdown(); + } + + private void waitFor(ThreadPoolExecutor t, String name) { + boolean done = false; + while (!done) { try { - this.interrupt(); - } finally { - lock.unlock(); + done = t.awaitTermination(60, TimeUnit.SECONDS); + LOG.debug("Waiting for " + name + " to finish..."); + } catch (InterruptedException ie) { + LOG.debug("Interrupted waiting for " + name + " to finish..."); } } } + void join() { + waitFor(splits, "Split Thread"); + waitFor(largeCompactions, "Large Compaction Thread"); + if (smallCompactions != null) { + waitFor(smallCompactions, "Small Compaction Thread"); + } + } + /** * Returns the current size of the queue containing regions that are * processed. @@ -213,7 +220,10 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { * @return The current size of the regions queue. */ public int getCompactionQueueSize() { - return compactionQueue.size(); + int size = largeCompactions.getQueue().size(); + if (smallCompactions != null) + size += smallCompactions.getQueue().size(); + return size; } private boolean shouldSplitRegion() { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java index fdbf659..654825a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java @@ -28,8 +28,25 @@ public interface CompactionRequestor { /** * @param r Region to compact + * @param s Store within region to compact + * @param why Why compaction was requested -- used in debug messages + */ + public void requestCompaction(final HRegion r, final Store s, final String why); + + /** + * @param r Region to compact * @param why Why compaction was requested -- used in debug messages * @param pri Priority of this compaction. minHeap. <=0 is critical */ public void requestCompaction(final HRegion r, final String why, int pri); + + /** + * @param r Region to compact + * @param s Store within region to compact + * @param why Why compaction was requested -- used in debug messages + * @param pri Priority of this compaction. minHeap. <=0 is critical + */ + public void requestCompaction(final HRegion r, final Store s, + final String why, int pri); + } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 11fd50e..ba39888 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -101,6 +102,7 @@ import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; +import com.google.common.base.Preconditions; import com.google.common.collect.ClassToInstanceMap; import com.google.common.collect.Lists; import com.google.common.collect.MutableClassToInstanceMap; @@ -206,8 +208,8 @@ public class HRegion implements HeapSize { // , Writable{ volatile boolean flushing = false; // Set when a flush has been requested. volatile boolean flushRequested = false; - // Set while a compaction is running. - volatile boolean compacting = false; + // Number of compactions running. + volatile int compacting = 0; // Gets set in close. If set, cannot compact or flush again. volatile boolean writesEnabled = true; // Set if region is read-only @@ -395,7 +397,7 @@ public class HRegion implements HeapSize { // , Writable{ this.writestate.setReadOnly(this.regionInfo.getTableDesc().isReadOnly()); - this.writestate.compacting = false; + this.writestate.compacting = 0; this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). @@ -606,12 +608,10 @@ public class HRegion implements HeapSize { // , Writable{ writestate.writesEnabled = false; wasFlushing = writestate.flushing; LOG.debug("Closing " + this + ": disabling compactions & flushes"); - while (writestate.compacting || writestate.flushing) { - LOG.debug("waiting for" + - (writestate.compacting ? " compaction" : "") + - (writestate.flushing ? - (writestate.compacting ? "," : "") + " cache flush" : - "") + " to complete for region " + this); + while (writestate.compacting > 0 || writestate.flushing) { + LOG.debug("waiting for " + writestate.compacting + " compactions" + + (writestate.flushing ? " & cache flush" : "") + + " to complete for region " + this); try { writestate.wait(); } catch (InterruptedException iex) { @@ -734,11 +734,6 @@ public class HRegion implements HeapSize { // , Writable{ return this.fs; } - /** @return info about the last compaction */ - public Pair getLastCompactInfo() { - return this.lastCompactInfo; - } - /** @return the last time the region was flushed */ public long getLastFlushTime() { return this.lastFlushTime; @@ -794,9 +789,9 @@ public class HRegion implements HeapSize { // , Writable{ return new Path(getRegionDir(), ".tmp"); } - void setForceMajorCompaction(final boolean b) { + void triggerMajorCompaction() { for (Store h: stores.values()) { - h.setForceMajorCompaction(b); + h.triggerMajorCompaction(); } } @@ -817,7 +812,9 @@ public class HRegion implements HeapSize { // , Writable{ */ byte [] compactStores(final boolean majorCompaction) throws IOException { - this.setForceMajorCompaction(majorCompaction); + if (majorCompaction) { + this.triggerMajorCompaction(); + } return compactStores(); } @@ -826,13 +823,21 @@ public class HRegion implements HeapSize { // , Writable{ * to be split. */ public byte[] compactStores() throws IOException { - byte[] splitRow = null; for(Store s : getStores().values()) { - if(splitRow == null) { - splitRow = compactStore(s); + CompactionRequest cr = s.requestCompaction(); + if(cr != null) { + try { + compact(cr); + } finally { + s.finishRequest(cr); + } + } + byte[] splitRow = s.checkSplit(); + if (splitRow != null) { + return splitRow; } } - return splitRow; + return null; } /* @@ -846,93 +851,77 @@ public class HRegion implements HeapSize { // , Writable{ * conflicts with a region split, and that cannot happen because the region * server does them sequentially and not in parallel. * - * @return split row if split is needed + * @param cr Compaction details, obtained by requestCompaction() + * @return whether the compaction completed * @throws IOException e */ - public byte [] compactStore(Store store) throws IOException { - if (this.closing.get()) { - LOG.debug("Skipping compaction on " + this + " because closing"); - return null; + public boolean compact(CompactionRequest cr) + throws IOException { + if (cr == null) { + return false; + } + if (this.closing.get() || this.closed.get()) { + LOG.debug("Skipping compaction on " + this + " because closing/closed"); + return false; } + Preconditions.checkArgument(cr.getHRegion().equals(this)); lock.readLock().lock(); - this.lastCompactInfo = null; - byte [] splitRow = null; MonitoredTask status = TaskMonitor.get().createStatus( - "Compacting stores in " + this); + "Compacting " + cr.getStore() + " in " + this); try { if (this.closed.get()) { LOG.debug("Skipping compaction on " + this + " because closed"); - return null; - } - if (this.closed.get()) { - return splitRow; + return false; } if (coprocessorHost != null) { status.setStatus("Running coprocessor preCompact hooks"); coprocessorHost.preCompact(false); } + boolean decr = true; try { synchronized (writestate) { - if (!writestate.compacting && writestate.writesEnabled) { - writestate.compacting = true; + if (writestate.writesEnabled) { + ++writestate.compacting; } else { - String msg = "NOT compacting region " + this + - ": compacting=" + writestate.compacting + ", writesEnabled=" + - writestate.writesEnabled; + String msg = "NOT compacting region " + this + ". Writes disabled."; LOG.info(msg); status.abort(msg); - return splitRow; + decr = false; + return false; } } - LOG.info("Starting compaction on region " + this); - long startTime = EnvironmentEdgeManager.currentTimeMillis(); + LOG.info("Starting compaction on " + cr.getStore() + " in region " + + this); doRegionCompactionPrep(); - long lastCompactSize = 0; - boolean completed = false; try { - status.setStatus("Compacting store " + store); - final Store.StoreSize ss = store.compact(); - lastCompactSize += store.getLastCompactSize(); - if (ss != null) { - splitRow = ss.getSplitRow(); - } - completed = true; + status.setStatus("Compacting store " + cr.getStore()); + cr.getStore().compact(cr); } catch (InterruptedIOException iioe) { - LOG.info("compaction interrupted by user: ", iioe); - } finally { - long now = EnvironmentEdgeManager.currentTimeMillis(); - LOG.info(((completed) ? "completed" : "aborted") - + " compaction on region " + this - + " after " + StringUtils.formatTimeDiff(now, startTime)); - if (completed) { - this.lastCompactInfo = - new Pair((now - startTime) / 1000, lastCompactSize); - status.setStatus("Compaction complete: " + - StringUtils.humanReadableInt(lastCompactSize) + " in " + - (now - startTime) + "ms"); - } + String msg = "compaction interrupted by user"; + LOG.info(msg, iioe); + status.abort(msg); + return false; } } finally { - synchronized (writestate) { - writestate.compacting = false; - writestate.notifyAll(); + if (decr) { + synchronized (writestate) { + --writestate.compacting; + if (writestate.compacting <= 0) { + writestate.notifyAll(); + } + } } } if (coprocessorHost != null) { status.setStatus("Running coprocessor post-compact hooks"); - coprocessorHost.postCompact(splitRow != null); + coprocessorHost.postCompact(false); } - status.markComplete("Compaction complete"); + return true; } finally { status.cleanup(); lock.readLock().unlock(); } - if (splitRow != null) { - assert splitPoint == null || Bytes.equals(splitRow, splitPoint); - this.splitPoint = null; // clear the split point (if set) - } - return splitRow; } /** @@ -3708,6 +3697,10 @@ public class HRegion implements HeapSize { // , Writable{ } } + void clearSplit_TESTS_ONLY() { + this.splitRequest = false; + } + /** * Give the region a chance to prepare before it is split. */ @@ -3715,6 +3708,20 @@ public class HRegion implements HeapSize { // , Writable{ // nothing } + byte[] checkSplit() { + if (this.splitPoint != null) { + return this.splitPoint; + } + byte[] splitPoint = null; + for (Store s : stores.values()) { + splitPoint = s.checkSplit(); + if (splitPoint != null) { + return splitPoint; + } + } + return null; + } + /** * @return The priority that this region should have in the compaction queue */ @@ -3731,9 +3738,9 @@ public class HRegion implements HeapSize { // , Writable{ * store files * @return true if any store has too many store files */ - public boolean hasTooManyStoreFiles() { + public boolean needsCompaction() { for(Store store : stores.values()) { - if(store.hasTooManyStoreFiles()) { + if(store.needsCompaction()) { return true; } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b910254..9dbf7e1 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -226,7 +226,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, private RegionServerMetrics metrics; // Compactions - CompactSplitThread compactSplitThread; + public CompactSplitThread compactSplitThread; // Cache flushing MemStoreFlusher cacheFlusher; @@ -1017,7 +1017,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, * * @return false if file system is not available */ - protected boolean checkFileSystem() { + public boolean checkFileSystem() { if (this.fsOk && this.fs != null) { try { FSUtils.checkFileSystemAvailable(this.fs); @@ -1046,14 +1046,18 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, @Override protected void chore() { for (HRegion r : this.instance.onlineRegions.values()) { - try { - if (r != null && r.isMajorCompaction()) { - // Queue a compaction. Will recognize if major is needed. - this.instance.compactSplitThread.requestCompaction(r, getName() - + " requests major compaction"); + if (r == null) + continue; + for (Store s : r.getStores().values()) { + try { + if (s.isMajorCompaction()) { + // Queue a compaction. Will recognize if major is needed. + this.instance.compactSplitThread.requestCompaction(r, s, + getName() + " requests major compaction"); + } + } catch (IOException e) { + LOG.warn("Failed major compaction check on " + r, e); } - } catch (IOException e) { - LOG.warn("Failed major compaction check on " + r, e); } } } @@ -1243,8 +1247,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler); Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", handler); - Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor", - handler); Threads.setDaemonThreadRunning(this.majorCompactionChecker, n + ".majorCompactionChecker", handler); @@ -1312,7 +1314,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return false; } // Verify that all threads are alive - if (!(leases.isAlive() && compactSplitThread.isAlive() + if (!(leases.isAlive() && cacheFlusher.isAlive() && hlogRoller.isAlive() && this.majorCompactionChecker.isAlive())) { stop("One or more threads are no longer alive -- stop"); @@ -1346,10 +1348,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, final boolean daughter) throws KeeperException, IOException { // Do checks to see if we need to compact (references or too many files) - if (r.hasReferences() || r.hasTooManyStoreFiles()) { - getCompactionRequester().requestCompaction(r, - r.hasReferences()? "Region has references on open" : - "Region has too many store files"); + for (Store s : r.getStores().values()) { + if (s.hasReferences() || s.needsCompaction()) { + getCompactionRequester().requestCompaction(r, s, "Opening Region"); + } } // Add to online regions if all above was successful. @@ -1430,8 +1432,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, protected void join() { Threads.shutdown(this.majorCompactionChecker); Threads.shutdown(this.cacheFlusher); - Threads.shutdown(this.compactSplitThread); Threads.shutdown(this.hlogRoller); + if (this.compactSplitThread != null) { + this.compactSplitThread.join(); + } if (this.service != null) this.service.shutdown(); if (this.replicationHandler != null) { this.replicationHandler.join(); @@ -2334,11 +2338,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, HRegion region = getRegion(regionInfo.getRegionName()); region.flushcache(); region.forceSplit(splitPoint); - // force a compaction, split will be side-effect - // TODO: flush/compact/split refactor will make it trivial to do this - // sync/async (and won't require us to do a compaction to split!) - compactSplitThread.requestCompaction(region, "User-triggered split", - CompactSplitThread.PRIORITY_USER); + compactSplitThread.requestSplit(region, region.checkSplit()); } @Override @@ -2346,7 +2346,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public void compactRegion(HRegionInfo regionInfo, boolean major) throws NotServingRegionException, IOException { HRegion region = getRegion(regionInfo.getRegionName()); - compactSplitThread.requestCompaction(region, major, "User-triggered " + if (major) { + region.triggerMajorCompaction(); + } + compactSplitThread.requestCompaction(region, "User-triggered " + (major ? "major " : "") + "compaction", CompactSplitThread.PRIORITY_USER); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index f66a7cd..213b2ab 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -354,7 +354,9 @@ class MemStoreFlusher extends Thread implements FlushRequester { LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); } - this.server.compactSplitThread.requestCompaction(region, getName()); + if (!this.server.compactSplitThread.requestSplit(region)) { + this.server.compactSplitThread.requestCompaction(region, getName()); + } // Put back on the queue. Have it come back out of the queue // after a delay of this.blockingWaitTime / 100 ms. this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java b/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java deleted file mode 100644 index 13bcb3f..0000000 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java +++ /dev/null @@ -1,299 +0,0 @@ -/** -* Copyright 2010 The Apache Software Foundation -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.hbase.regionserver; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.util.Pair; - -/** - * This class delegates to the BlockingQueue but wraps all Stores in - * compaction requests that hold the priority and the date requested. - * - * Implementation Note: With an elevation time of -1 there is the potential for - * starvation of the lower priority compaction requests as long as there is a - * constant stream of high priority requests. - */ -public class PriorityCompactionQueue implements BlockingQueue { - static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class); - - /** The actual blocking queue we delegate to */ - protected final BlockingQueue queue = - new PriorityBlockingQueue(); - - /** Hash map of the Stores contained within the Compaction Queue */ - private final HashMap, CompactionRequest> storesInQueue = - new HashMap, CompactionRequest>(); - - /** Creates a new PriorityCompactionQueue with no priority elevation time */ - public PriorityCompactionQueue() { - LOG.debug("Create PriorityCompactionQueue"); - } - - protected Pair toPair(CompactionRequest cr) { - return Pair.newPair(cr.getHRegion(), cr.getStore()); - } - - /** If the store is not already in the queue it will add it and return a - * new compaction request object. If it is already present in the queue - * then it will return null. - * @param p If null it will use the default priority - * @return returns a compaction request if it isn't already in the queue - */ - protected CompactionRequest addToCompactionQueue(CompactionRequest newRequest) { - CompactionRequest queuedRequest = null; - synchronized (storesInQueue) { - queuedRequest = storesInQueue.get(toPair(newRequest)); - if (queuedRequest == null || - newRequest.getPriority() < queuedRequest.getPriority()) { - String reason = ""; - if (queuedRequest != null) { - if (newRequest.getPriority() < queuedRequest.getPriority()) { - reason = "Reason : priority changed from " + - queuedRequest.getPriority() + " to " + - newRequest.getPriority() + ". "; - } - } - LOG.debug("Inserting store in queue. " + reason + newRequest); - storesInQueue.put(toPair(newRequest), newRequest); - } else { - LOG.debug("Store already in queue, skipping. Queued: " + queuedRequest + - ", requested: " + newRequest); - newRequest = null; // It is already present so don't add it - } - } - - if (newRequest != null && queuedRequest != null) { - // Remove the lower priority request - queue.remove(queuedRequest); - } - - return newRequest; - } - - /** Removes the request from the stores in queue - * @param remove - */ - protected CompactionRequest removeFromQueue(CompactionRequest c) { - if (c == null) return null; - - synchronized (storesInQueue) { - CompactionRequest cr = storesInQueue.remove(toPair(c)); - if (cr != null && !cr.equals(c)) - { - //Because we don't synchronize across both this.regionsInQueue and this.queue - //a rare race condition exists where a higher priority compaction request replaces - //the lower priority request in this.regionsInQueue but the lower priority request - //is taken off this.queue before the higher can be added to this.queue. - //So if we didn't remove what we were expecting we put it back on. - storesInQueue.put(toPair(cr), cr); - } - if (cr == null) { - LOG.warn("Removed a compaction request it couldn't find in storesInQueue: " + - "region = " + c.getHRegion() + ", store = " + c.getStore()); - } - return cr; - } - } - - @Override - public boolean add(CompactionRequest e) { - CompactionRequest request = this.addToCompactionQueue(e); - if (request != null) { - boolean result = queue.add(request); - return result; - } else { - return false; - } - } - - @Override - public boolean offer(CompactionRequest e) { - CompactionRequest request = this.addToCompactionQueue(e); - return (request != null)? queue.offer(request): false; - } - - @Override - public void put(CompactionRequest e) throws InterruptedException { - CompactionRequest request = this.addToCompactionQueue(e); - if (request != null) { - queue.put(request); - } - } - - @Override - public boolean offer(CompactionRequest e, long timeout, TimeUnit unit) - throws InterruptedException { - CompactionRequest request = this.addToCompactionQueue(e); - return (request != null)? queue.offer(request, timeout, unit): false; - } - - @Override - public CompactionRequest take() throws InterruptedException { - CompactionRequest cr = queue.take(); - if (cr != null) { - removeFromQueue(cr); - return cr; - } - return null; - } - - @Override - public CompactionRequest poll(long timeout, TimeUnit unit) throws InterruptedException { - CompactionRequest cr = queue.poll(timeout, unit); - if (cr != null) { - removeFromQueue(cr); - return cr; - } - return null; - } - - @Override - public boolean remove(Object o) { - if (o instanceof CompactionRequest) { - CompactionRequest cr = removeFromQueue((CompactionRequest) o); - if (cr != null) { - return queue.remove(cr); - } - } - - return false; - } - - @Override - public CompactionRequest remove() { - CompactionRequest cr = queue.remove(); - if (cr != null) { - removeFromQueue(cr); - return cr; - } - return null; - } - - @Override - public CompactionRequest poll() { - CompactionRequest cr = queue.poll(); - if (cr != null) { - removeFromQueue(cr); - return cr; - } - return null; - } - - @Override - public int remainingCapacity() { - return queue.remainingCapacity(); - } - - @Override - public boolean contains(Object r) { - if (r instanceof CompactionRequest) { - synchronized (storesInQueue) { - return storesInQueue.containsKey(toPair((CompactionRequest) r)); - } - } else if (r instanceof CompactionRequest) { - return queue.contains(r); - } - return false; - } - - @Override - public CompactionRequest element() { - CompactionRequest cr = queue.element(); - return (cr != null)? cr: null; - } - - @Override - public CompactionRequest peek() { - CompactionRequest cr = queue.peek(); - return (cr != null)? cr: null; - } - - @Override - public int size() { - return queue.size(); - } - - @Override - public boolean isEmpty() { - return queue.isEmpty(); - } - - @Override - public void clear() { - storesInQueue.clear(); - queue.clear(); - } - - // Unimplemented methods, collection methods - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException("Not supported."); - } - - @Override - public Object[] toArray() { - throw new UnsupportedOperationException("Not supported."); - } - - @Override - public T[] toArray(T[] a) { - throw new UnsupportedOperationException("Not supported."); - } - - @Override - public boolean containsAll(Collection c) { - throw new UnsupportedOperationException("Not supported."); - } - - @Override - public boolean addAll(Collection c) { - throw new UnsupportedOperationException("Not supported."); - } - - @Override - public boolean removeAll(Collection c) { - throw new UnsupportedOperationException("Not supported."); - } - - @Override - public boolean retainAll(Collection c) { - throw new UnsupportedOperationException("Not supported."); - } - - @Override - public int drainTo(Collection c) { - throw new UnsupportedOperationException("Not supported."); - } - - @Override - public int drainTo(Collection c, int maxElements) { - throw new UnsupportedOperationException("Not supported."); - } -} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java new file mode 100644 index 0000000..ac46e93 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -0,0 +1,82 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.base.Preconditions; + +/** + * Handles processing region splits. Put in a queue, owned by HRegionServer. + */ +class SplitRequest implements Runnable { + static final Log LOG = LogFactory.getLog(SplitRequest.class); + private final HRegion parent; + private final byte[] midKey; + private HRegionServer server = null; + + SplitRequest(HRegion region, byte[] midKey) { + this.parent = region; + this.midKey = midKey; + } + + public void setServer(HRegionServer hrs) { + this.server = hrs; + } + + @Override + public String toString() { + return "regionName=" + parent + ", midKey=" + Bytes.toStringBinary(midKey); + } + + @Override + public void run() { + Preconditions.checkNotNull(server); + try { + final long startTime = System.currentTimeMillis(); + SplitTransaction st = new SplitTransaction(parent, midKey); + // If prepare does not return true, for some reason -- logged inside in + // the prepare call -- we are not ready to split just now. Just return. + if (!st.prepare()) + return; + try { + st.execute(this.server, this.server); + } catch (Exception e) { + try { + LOG.info("Running rollback of failed split of " + + parent.getRegionNameAsString() + "; " + e.getMessage()); + st.rollback(this.server, this.server); + LOG.info("Successful rollback of failed split of " + + parent.getRegionNameAsString()); + } catch (RuntimeException ee) { + // If failed rollback, kill this server to avoid having a hole in + // table. + LOG.info("Failed rollback of failed split of " + + parent.getRegionNameAsString() + " -- aborting server", ee); + this.server.abort("Failed split"); + } + return; + } + LOG.info("Region split, META updated, and report to master. Parent=" + + parent.getRegionInfo().getRegionNameAsString() + ", new regions: " + + st.getFirstDaughter().getRegionNameAsString() + ", " + + st.getSecondDaughter().getRegionNameAsString() + ". Split took " + + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); + } catch (IOException ex) { + LOG.error("Split failed " + this, RemoteExceptionHandler + .checkIOException(ex)); + server.checkFileSystem(); + } + } + +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index e2295c2..941b804 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -24,8 +24,10 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.NavigableSet; +import java.util.Set; import java.util.SortedSet; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -48,16 +50,20 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * A Store holds a column family in a Region. Its a memstore and a set of zero @@ -102,7 +108,7 @@ public class Store implements HeapSize { // With float, java will downcast your long to float for comparisons (bad) private double compactRatio; private long lastCompactSize = 0; - private volatile boolean forceMajor = false; + volatile boolean forceMajor = false; /* how many bytes to write between status checks */ static int closeCheckInterval = 0; private final long desiredMaxFileSize; @@ -119,12 +125,12 @@ public class Store implements HeapSize { */ private ImmutableList storefiles = null; + List filesCompacting = Lists.newArrayList(); // All access must be synchronized. private final CopyOnWriteArraySet changedReaderObservers = new CopyOnWriteArraySet(); - private final Object compactLock = new Object(); private final int blocksize; private final boolean blockcache; /** Compression algorithm for flush files and minor compaction */ @@ -569,7 +575,7 @@ public class Store implements HeapSize { // Tell listeners of the change in readers. notifyChangedReadersObservers(); - return this.storefiles.size() >= this.minFilesToCompact; + return needsCompaction(); } finally { this.lock.writeLock().unlock(); } @@ -619,100 +625,109 @@ public class Store implements HeapSize { * *

We don't want to hold the structureLock for the whole time, as a compact() * can be lengthy and we want to allow cache-flushes during this period. - * - * @return row to split around if a split is needed, null otherwise + * + * @param CompactionRequest + * compaction details obtained from requestCompaction() * @throws IOException */ - StoreSize compact() throws IOException { - boolean forceSplit = this.region.shouldForceSplit(); - synchronized (compactLock) { - this.lastCompactSize = 0; // reset first in case compaction is aborted - - // sanity checks - for (StoreFile sf : this.storefiles) { - if (sf.getPath() == null || sf.getReader() == null) { - boolean np = sf.getPath() == null; - LOG.debug("StoreFile " + sf + " has null " + (np ? "Path":"Reader")); - return null; - } - } - - // if the user wants to force a split, skip compaction unless necessary - boolean references = hasReferences(this.storefiles); - if (forceSplit && !this.forceMajor && !references) { - return checkSplit(forceSplit); - } - - Collection filesToCompact - = compactSelection(this.storefiles, this.forceMajor); - - // empty == do not compact - if (filesToCompact.isEmpty()) { - // but do see if we need to split before returning - return checkSplit(forceSplit); - } + void compact(CompactionRequest cr) throws IOException { + if (cr == null || cr.getFiles().isEmpty()) { + return; + } + Preconditions.checkArgument(cr.getStore().toString() + .equals(this.toString())); - // sum size of all files included in compaction - long totalSize = 0; - for (StoreFile sf : filesToCompact) { - totalSize += sf.getReader().length(); - } - this.lastCompactSize = totalSize; + List filesToCompact = cr.getFiles(); - // major compaction iff all StoreFiles are included - boolean majorcompaction - = (filesToCompact.size() == this.storefiles.size()); - if (majorcompaction) { - this.forceMajor = false; - } + synchronized (filesCompacting) { + // sanity check: we're compacting files that this store knows about + // TODO: change this to LOG.error() after more debugging + Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact)); + } - // Max-sequenceID is the last key in the files we're compacting - long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact); - - // Ready to go. Have list of files to compact. - LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in cf=" + - this.storeNameStr + - (hasReferences(filesToCompact)? ", hasReferences=true,": " ") + " into " + - region.getTmpDir() + ", seqid=" + maxId + - ", totalSize=" + StringUtils.humanReadableInt(totalSize)); - StoreFile.Writer writer - = compactStore(filesToCompact, majorcompaction, maxId); + // Max-sequenceID is the last key in the files we're compacting + long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact); + + // Ready to go. Have list of files to compact. + LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " + + this.storeNameStr + " of " + + this.region.getRegionInfo().getRegionNameAsString() + + " into " + region.getTmpDir() + ", seqid=" + maxId + ", totalSize=" + + StringUtils.humanReadableInt(cr.getSize())); + + StoreFile sf = null; + try { + StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(), + maxId); // Move the compaction into place. - StoreFile sf = completeCompaction(filesToCompact, writer); - if (LOG.isInfoEnabled()) { - LOG.info("Completed" + (majorcompaction? " major ": " ") + - "compaction of " + filesToCompact.size() + - " file(s), new file=" + (sf == null? "none": sf.toString()) + - ", size=" + (sf == null? "none": StringUtils.humanReadableInt(sf.getReader().length())) + - "; total size for store is " + StringUtils.humanReadableInt(storeSize)); + sf = completeCompaction(filesToCompact, writer); + } finally { + synchronized (filesCompacting) { + filesCompacting.removeAll(filesToCompact); } } - return checkSplit(forceSplit); + + LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of " + + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of " + + this.region.getRegionInfo().getRegionNameAsString() + + "; new storefile name=" + (sf == null ? "none" : sf.toString()) + + ", size=" + (sf == null ? "none" : + StringUtils.humanReadableInt(sf.getReader().length())) + + "; total size for store is " + + StringUtils.humanReadableInt(storeSize)); } /* * Compact the most recent N files. Essentially a hook for testing. */ protected void compactRecent(int N) throws IOException { - synchronized(compactLock) { - List filesToCompact = this.storefiles; - int count = filesToCompact.size(); - if (N > count) { - throw new RuntimeException("Not enough files"); - } + List filesToCompact; + long maxId; + boolean isMajor; - filesToCompact = new ArrayList(filesToCompact.subList(count-N, count)); - long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact); - boolean majorcompaction = (N == count); + this.lock.readLock().lock(); + try { + synchronized (filesCompacting) { + filesToCompact = Lists.newArrayList(storefiles); + if (!filesCompacting.isEmpty()) { + // exclude all files older than the newest file we're currently + // compacting. this allows us to preserve contiguity (HBASE-2856) + StoreFile last = filesCompacting.get(filesCompacting.size() - 1); + int idx = filesToCompact.indexOf(last); + Preconditions.checkArgument(idx != -1); + filesToCompact.subList(0, idx + 1).clear(); + } + int count = filesToCompact.size(); + if (N > count) { + throw new RuntimeException("Not enough files"); + } + + filesToCompact = filesToCompact.subList(count - N, count); + maxId = StoreFile.getMaxSequenceIdInList(filesToCompact); + isMajor = (filesToCompact.size() == storefiles.size()); + filesCompacting.addAll(filesToCompact); + Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME); + } + } finally { + this.lock.readLock().unlock(); + } - // Ready to go. Have list of files to compact. - StoreFile.Writer writer - = compactStore(filesToCompact, majorcompaction, maxId); + try { + // Ready to go. Have list of files to compact. + StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId); // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, writer); + } finally { + synchronized (filesCompacting) { + filesCompacting.removeAll(filesToCompact); + } } } + boolean hasReferences() { + return hasReferences(this.storefiles); + } + /* * @param files * @return True if any of the files in files are References. @@ -835,6 +850,69 @@ public class Store implements HeapSize { return ret; } + public CompactionRequest requestCompaction() { + // don't even select for compaction if writes are disabled + if (!this.region.areWritesEnabled()) { + return null; + } + + CompactionRequest ret = null; + this.lock.readLock().lock(); + try { + synchronized (filesCompacting) { + // candidates = all storefiles not already in compaction queue + List candidates = Lists.newArrayList(storefiles); + if (!filesCompacting.isEmpty()) { + // exclude all files older than the newest file we're currently + // compacting. this allows us to preserve contiguity (HBASE-2856) + StoreFile last = filesCompacting.get(filesCompacting.size() - 1); + int idx = candidates.indexOf(last); + Preconditions.checkArgument(idx != -1); + candidates.subList(0, idx + 1).clear(); + } + List filesToCompact = compactSelection(candidates); + + // no files to compact + if (filesToCompact.isEmpty()) { + return null; + } + + // basic sanity check: do not try to compact the same StoreFile twice. + if (!Collections.disjoint(filesCompacting, filesToCompact)) { + // TODO: change this from an IAE to LOG.error after sufficient testing + Preconditions.checkArgument(false, "%s overlaps with %s", + filesToCompact, filesCompacting); + } + filesCompacting.addAll(filesToCompact); + Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME); + + // major compaction iff all StoreFiles are included + boolean isMajor = (filesToCompact.size() == this.storefiles.size()); + if (isMajor) { + // since we're enqueuing a major, update the compaction wait interval + this.forceMajor = false; + this.majorCompactionTime = getNextMajorCompactTime(); + } + + // everything went better than expected. create a compaction request + int pri = getCompactPriority(); + ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri); + } + } catch (IOException ex) { + LOG.error("Compaction Request failed for region " + region + ", store " + + this, RemoteExceptionHandler.checkIOException(ex)); + } finally { + this.lock.readLock().unlock(); + } + return ret; + } + + public void finishRequest(CompactionRequest cr) { + synchronized (filesCompacting) { + filesCompacting.removeAll(cr.getFiles()); + } + } + /** * Algorithm to choose which files to compact * @@ -851,12 +929,13 @@ public class Store implements HeapSize { * max files to compact at once (avoids OOM) * * @param candidates candidate files, ordered from oldest to newest - * @param majorcompaction whether to force a major compaction * @return subset copy of candidate list that meets compaction criteria * @throws IOException */ - List compactSelection(List candidates, - boolean forcemajor) throws IOException { + List compactSelection(List candidates) + throws IOException { + // ASSUMPTION!!! filesCompacting is locked when calling this function + /* normal skew: * * older ----> newer @@ -870,6 +949,7 @@ public class Store implements HeapSize { */ List filesToCompact = new ArrayList(candidates); + boolean forcemajor = this.forceMajor && filesCompacting.isEmpty(); if (!forcemajor) { // do not compact old files above a configurable threshold // save all references. we MUST compact them @@ -888,15 +968,17 @@ public class Store implements HeapSize { // major compact on user action or age (caveat: we have too many files) boolean majorcompaction = (forcemajor || isMajorCompaction(filesToCompact)) && filesToCompact.size() < this.maxFilesToCompact; - if (majorcompaction) { - this.majorCompactionTime = getNextMajorCompactTime(); - } if (!majorcompaction && !hasReferences(filesToCompact)) { // we're doing a minor compaction, let's see what files are applicable int start = 0; double r = this.compactRatio; + // skip selection algorithm if we don't have enough files + if (filesToCompact.size() < this.minFilesToCompact) { + return Collections.emptyList(); + } + /* TODO: add sorting + unit test back in when HBASE-2856 is fixed // Sort files by size to correct when normal skew is altered by bulk load. Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE); @@ -1054,9 +1136,6 @@ public class Store implements HeapSize { } /* - * It's assumed that the compactLock will be acquired prior to calling this - * method! Otherwise, it is not thread-safe! - * *

It works by processing a compaction that's been written to disk. * *

It is usually invoked at the end of a compaction, but might also be @@ -1097,18 +1176,13 @@ public class Store implements HeapSize { this.lock.writeLock().lock(); try { try { - // 2. Unloading - // 3. Loading the new TreeMap. // Change this.storefiles so it reflects new state but do not // delete old store files until we have sent out notification of // change in case old files are still being accessed by outstanding // scanners. - ArrayList newStoreFiles = new ArrayList(); - for (StoreFile sf : storefiles) { - if (!compactedFiles.contains(sf)) { - newStoreFiles.add(sf); - } - } + ArrayList newStoreFiles = Lists.newArrayList(storefiles); + newStoreFiles.removeAll(compactedFiles); + filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock() // If a StoreFile result, move it into place. May be null. if (result != null) { @@ -1318,13 +1392,13 @@ public class Store implements HeapSize { } /** - * Determines if HStore can be split - * @param force Whether to force a split or not. - * @return a StoreSize if store can be split, null otherwise. + * Determines if Store should be split + * @return byte[] if store should be split, null otherwise. */ - StoreSize checkSplit(final boolean force) { + public byte[] checkSplit() { this.lock.readLock().lock(); try { + boolean force = this.region.shouldForceSplit(); // sanity checks if (this.storefiles.isEmpty()) { return null; @@ -1369,7 +1443,7 @@ public class Store implements HeapSize { } // if the user explicit set a split point, use that if (this.region.getSplitPoint() != null) { - return new StoreSize(maxSize, this.region.getSplitPoint()); + return this.region.getSplitPoint(); } StoreFile.Reader r = largestSf.getReader(); if (r == null) { @@ -1396,7 +1470,7 @@ public class Store implements HeapSize { } return null; } - return new StoreSize(maxSize, mk.getRow()); + return mk.getRow(); } } catch(IOException e) { LOG.warn("Failed getting store size for " + this.storeNameStr, e); @@ -1416,8 +1490,8 @@ public class Store implements HeapSize { return storeSize; } - void setForceMajorCompaction(final boolean b) { - this.forceMajor = b; + void triggerMajorCompaction() { + this.forceMajor = true; } boolean getForceMajorCompaction() { @@ -1493,28 +1567,6 @@ public class Store implements HeapSize { return this.blockingStoreFileCount - this.storefiles.size(); } - /** - * Datastructure that holds size and row to split a file around. - * TODO: Take a KeyValue rather than row. - */ - static class StoreSize { - private final long size; - private final byte [] row; - - StoreSize(long size, byte [] row) { - this.size = size; - this.row = row; - } - /* @return the size */ - long getSize() { - return size; - } - - byte [] getSplitRow() { - return this.row; - } - } - HRegion getHRegion() { return this.region; } @@ -1624,8 +1676,8 @@ public class Store implements HeapSize { * @return true if number of store files is greater than * the number defined in minFilesToCompact */ - public boolean hasTooManyStoreFiles() { - return this.storefiles.size() > this.minFilesToCompact; + public boolean needsCompaction() { + return (storefiles.size() - filesCompacting.size()) > minFilesToCompact; } public static final long FIXED_OVERHEAD = ClassSize.align( diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index 98f962b..33a7b64 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -19,45 +19,59 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import java.io.IOException; import java.util.Date; +import java.util.List; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.StringUtils; - /** - * This class represents a compaction request and holds the region, priority, - * and time submitted. - */ - public class CompactionRequest implements Comparable { +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; + +/** + * This class holds all details necessary to run a compaction. + */ +public class CompactionRequest implements Comparable, + Runnable { static final Log LOG = LogFactory.getLog(CompactionRequest.class); private final HRegion r; private final Store s; + private final List files; + private final long totalSize; + private final boolean isMajor; private int p; private final Date date; + private HRegionServer server = null; - public CompactionRequest(HRegion r, Store s) { - this(r, s, s.getCompactPriority()); - } - - public CompactionRequest(HRegion r, Store s, int p) { - this(r, s, p, null); - } - - public CompactionRequest(HRegion r, Store s, int p, Date d) { - if (r == null) { - throw new NullPointerException("HRegion cannot be null"); - } - - if (d == null) { - d = new Date(); - } + public CompactionRequest(HRegion r, Store s, + List files, boolean isMajor, int p) { + Preconditions.checkNotNull(r); + Preconditions.checkNotNull(files); this.r = r; this.s = s; + this.files = files; + long sz = 0; + for (StoreFile sf : files) { + sz += sf.getReader().length(); + } + this.totalSize = sz; + this.isMajor = isMajor; this.p = p; - this.date = d; + this.date = new Date(); } /** @@ -89,8 +103,8 @@ import org.apache.hadoop.hbase.regionserver.Store; return compareVal; } - //break the tie arbitrarily - return -1; + // break the tie based on hash code + return this.hashCode() - request.hashCode(); } /** Gets the HRegion for the request */ @@ -103,6 +117,20 @@ import org.apache.hadoop.hbase.regionserver.Store; return s; } + /** Gets the StoreFiles for the request */ + public List getFiles() { + return files; + } + + /** Gets the total size of all StoreFiles in compaction */ + public long getSize() { + return totalSize; + } + + public boolean isMajor() { + return this.isMajor; + } + /** Gets the priority for the request */ public int getPriority() { return p; @@ -112,11 +140,78 @@ import org.apache.hadoop.hbase.regionserver.Store; public void setPriority(int p) { this.p = p; } + + public void setServer(HRegionServer hrs) { + this.server = hrs; + } + @Override public String toString() { + String fsList = Joiner.on(", ").join( + Collections2.transform(Collections2.filter(files, + new Predicate() { + public boolean apply(StoreFile sf) { + return sf.getReader() != null; + } + }), new Function() { + public String apply(StoreFile sf) { + return StringUtils.humanReadableInt(sf.getReader().length()); + } + })); + return "regionName=" + r.getRegionNameAsString() + - ((s == null) ? "" - : "storeName = " + new String(s.getFamily().getName())) + + ", storeName=" + new String(s.getFamily().getName()) + + ", fileCount=" + files.size() + + ", fileSize=" + StringUtils.humanReadableInt(totalSize) + + ((fsList.isEmpty()) ? "" : " (" + fsList + ")") + ", priority=" + p + ", date=" + date; } + + @Override + public void run() { + Preconditions.checkNotNull(server); + if (server.isStopped()) { + return; + } + try { + long start = EnvironmentEdgeManager.currentTimeMillis(); + boolean completed = r.compact(this); + long now = EnvironmentEdgeManager.currentTimeMillis(); + LOG.info(((completed) ? "completed" : "aborted") + " compaction: " + + this + "; duration=" + StringUtils.formatTimeDiff(now, start)); + if (completed) { + server.getMetrics().addCompaction(now - start, this.totalSize); + // degenerate case: blocked regions require recursive enqueues + if (s.getCompactPriority() <= 0) { + server.compactSplitThread + .requestCompaction(r, s, "Recursive enqueue"); + } + } + } catch (IOException ex) { + LOG.error("Compaction failed " + this, RemoteExceptionHandler + .checkIOException(ex)); + server.checkFileSystem(); + } catch (Exception ex) { + LOG.error("Compaction failed " + this, ex); + server.checkFileSystem(); + } finally { + s.finishRequest(this); + LOG.debug("CompactSplitThread Status: " + server.compactSplitThread); + } + } + + /** + * Cleanup class to use when rejecting a compaction request from the queue. + */ + public static class Rejection implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable request, ThreadPoolExecutor pool) { + if (request instanceof CompactionRequest) { + CompactionRequest cr = (CompactionRequest) request; + LOG.debug("Compaction Rejected: " + cr); + cr.getStore().finishRequest(cr); + } + } + } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java b/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java index 345e393..cc2b2b4 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java @@ -314,11 +314,12 @@ public class RegionServerMetrics implements Updater { } /** - * @param compact history in + * @param time time that compaction took + * @param size bytesize of storefiles in the compaction */ - public synchronized void addCompaction(final Pair compact) { - this.compactionTime.inc(compact.getFirst()); - this.compactionSize.inc(compact.getSecond()); + public synchronized void addCompaction(long time, long size) { + this.compactionTime.inc(time); + this.compactionSize.inc(size); } /** diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java index 6517ba7..48fa162 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java @@ -158,7 +158,9 @@ public class TestCompactSelection extends TestCase { void compactEquals(List candidates, boolean forcemajor, long ... expected) throws IOException { - List actual = store.compactSelection(candidates, forcemajor); + store.forceMajor = forcemajor; + List actual = store.compactSelection(candidates); + store.forceMajor = false; assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); } @@ -187,7 +189,7 @@ public class TestCompactSelection extends TestCase { */ // don't exceed max file compact threshold assertEquals(maxFiles, - store.compactSelection(sfCreate(7,6,5,4,3,2,1), false).size()); + store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size()); /* MAJOR COMPACTION */ // if a major compaction has been forced, then compact everything @@ -197,8 +199,11 @@ public class TestCompactSelection extends TestCase { // even if one of those files is too big compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12); // don't exceed max file compact threshold, even with major compaction + store.forceMajor = true; assertEquals(maxFiles, - store.compactSelection(sfCreate(7,6,5,4,3,2,1), true).size()); + store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size()); + store.forceMajor = false; + // if we exceed maxCompactSize, downgrade to minor // if not, it creates a 'snowball effect' when files >> maxCompactSize: // the last file in compaction is the aggregate of all previous compactions @@ -217,7 +222,7 @@ public class TestCompactSelection extends TestCase { compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12); // reference files should obey max file compact to avoid OOM assertEquals(maxFiles, - store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1), false).size()); + store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).size()); // empty case compactEquals(new ArrayList() /* empty */); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java deleted file mode 100644 index c5d876d..0000000 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Test class for the priority compaction queue - */ -public class TestPriorityCompactionQueue { - static final Log LOG = LogFactory.getLog(TestPriorityCompactionQueue.class); - - @Before - public void setUp() { - } - - @After - public void tearDown() { - - } - - class DummyHRegion extends HRegion { - String name; - - DummyHRegion(String name) { - super(); - this.name = name; - } - - public int hashCode() { - return name.hashCode(); - } - - public boolean equals(DummyHRegion r) { - return name.equals(r.name); - } - - public String toString() { - return "[DummyHRegion " + name + "]"; - } - - public byte[] getRegionName() { - return Bytes.toBytes(name); - } - - public String getRegionNameAsString() { - return name; - } - } - - protected void getAndCheckRegion(PriorityCompactionQueue pq, - HRegion checkRegion) { - HRegion r = pq.remove().getHRegion(); - if (r != checkRegion) { - Assert.assertTrue("Didn't get expected " + checkRegion + " got " + r, r - .equals(checkRegion)); - } - } - - protected void addRegion(PriorityCompactionQueue pq, HRegion r, int p) { - pq.add(new CompactionRequest(r, null, p)); - try { - // Sleep 1 millisecond so 2 things are not put in the queue within the - // same millisecond. The queue breaks ties arbitrarily between two - // requests inserted at the same time. We want the ordering to - // be consistent for our unit test. - Thread.sleep(1); - } catch (InterruptedException ex) { - // continue - } - } - - // //////////////////////////////////////////////////////////////////////////// - // tests - // //////////////////////////////////////////////////////////////////////////// - - /** tests general functionality of the compaction queue */ - @Test public void testPriorityQueue() throws InterruptedException { - PriorityCompactionQueue pq = new PriorityCompactionQueue(); - - HRegion r1 = new DummyHRegion("r1"); - HRegion r2 = new DummyHRegion("r2"); - HRegion r3 = new DummyHRegion("r3"); - HRegion r4 = new DummyHRegion("r4"); - HRegion r5 = new DummyHRegion("r5"); - - // test 1 - // check fifo w/priority - addRegion(pq, r1, 0); - addRegion(pq, r2, 0); - addRegion(pq, r3, 0); - addRegion(pq, r4, 0); - addRegion(pq, r5, 0); - - getAndCheckRegion(pq, r1); - getAndCheckRegion(pq, r2); - getAndCheckRegion(pq, r3); - getAndCheckRegion(pq, r4); - getAndCheckRegion(pq, r5); - - // test 2 - // check fifo w/mixed priority - addRegion(pq, r1, 0); - addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r3, 0); - addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r5, 0); - - getAndCheckRegion(pq, r1); - getAndCheckRegion(pq, r3); - getAndCheckRegion(pq, r5); - getAndCheckRegion(pq, r2); - getAndCheckRegion(pq, r4); - - // test 3 - // check fifo w/mixed priority - addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r5, 0); - - getAndCheckRegion(pq, r5); - getAndCheckRegion(pq, r1); - getAndCheckRegion(pq, r2); - getAndCheckRegion(pq, r3); - getAndCheckRegion(pq, r4); - - // test 4 - // check fifo w/mixed priority elevation time - addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r2, 0); - addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); - Thread.sleep(1000); - addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r5, 0); - - getAndCheckRegion(pq, r2); - getAndCheckRegion(pq, r5); - getAndCheckRegion(pq, r1); - getAndCheckRegion(pq, r3); - getAndCheckRegion(pq, r4); - - // reset the priority compaction queue back to a normal queue - pq = new PriorityCompactionQueue(); - - // test 5 - // test that lower priority are removed from the queue when a high priority - // is added - addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r5, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r3, 0); - - getAndCheckRegion(pq, r3); - getAndCheckRegion(pq, r1); - getAndCheckRegion(pq, r2); - getAndCheckRegion(pq, r4); - getAndCheckRegion(pq, r5); - - Assert.assertTrue("Queue should be empty.", pq.size() == 0); - - // test 6 - // don't add the same region more than once - addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r5, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r2, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r3, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r4, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r5, CompactSplitThread.PRIORITY_USER); - - getAndCheckRegion(pq, r1); - getAndCheckRegion(pq, r2); - getAndCheckRegion(pq, r3); - getAndCheckRegion(pq, r4); - getAndCheckRegion(pq, r5); - - Assert.assertTrue("Queue should be empty.", pq.size() == 0); - - // test 7 - // we can handle negative priorities - addRegion(pq, r1, CompactSplitThread.PRIORITY_USER); - addRegion(pq, r2, -1); - addRegion(pq, r3, 0); - addRegion(pq, r4, -2); - - getAndCheckRegion(pq, r4); - getAndCheckRegion(pq, r2); - getAndCheckRegion(pq, r3); - getAndCheckRegion(pq, r1); - - Assert.assertTrue("Queue should be empty.", pq.size() == 0); - } -} diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index adfe1f8..dbe5fb1 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -156,7 +156,7 @@ public class TestStore extends TestCase { assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS); // after compact; check the lowest time stamp - store.compact(); + store.compact(store.requestCompaction()); lowestTimeStampFromStore = Store.getLowestTimestamp(store.getStorefiles()); lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles()); assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS); @@ -688,7 +688,9 @@ public class TestStore extends TestCase { */ public void testSplitWithEmptyColFam() throws IOException { init(this.getName()); - assertNull(store.checkSplit(false)); - assertNull(store.checkSplit(true)); + assertNull(store.checkSplit()); + store.getHRegion().forceSplit(null); + assertNull(store.checkSplit()); + store.getHRegion().clearSplit_TESTS_ONLY(); } }