commit 96efe893a480bb24a7ac6c2edde4533eb444b6a7 Author: nspiegelberg Date: 2 minutes ago HBASE-3826 : Recursively compact Stores that have too many files diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 48e7b00..41e2b3b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -82,9 +82,16 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { if (r.getLastCompactInfo() != null) { // compaction aborted? this.server.getMetrics().addCompaction(r.getLastCompactInfo()); } - if (shouldSplitRegion() && midKey != null && - !this.server.isStopped()) { - split(r, midKey); + if (!this.server.isStopped()) { + if (shouldSplitRegion() && midKey != null) { + split(r, midKey); + } else if (r.getCompactPriority() < PRIORITY_USER) { + // degenerate case. recursively enqueue blocked regions + int p = r.getCompactPriority(); + LOG.debug("Re-queueing region " + r + " with " + p + + " priority for compaction request "); + compactionQueue.add(r, p); + } } } } finally { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d0a1e11..b54153c 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -137,6 +137,7 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; import com.google.common.base.Function; @@ -232,9 +233,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, MemStoreFlusher cacheFlusher; /* - * Check for major compactions. + * Check for compactions requests. */ - Chore majorCompactionChecker; + Chore compactionChecker; // HLog and HLog roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes @@ -551,11 +552,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Compaction thread this.compactSplitThread = new CompactSplitThread(this); - // Background thread to check for major compactions; needed if region + // Background thread to check for compactions; needed if region // has not gotten updates in a while. Make it run at a lesser frequency. int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY + ".multiplier", 1000); - this.majorCompactionChecker = new MajorCompactionChecker(this, + this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency * multiplier, this); this.leases = new Leases((int) conf.getLong( @@ -666,7 +667,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary(); - if (this.majorCompactionChecker != null) this.majorCompactionChecker.interrupt(); + if (this.compactionChecker != null) + this.compactionChecker.interrupt(); if (this.killed) { // Just skip out w/o closing regions. @@ -1038,30 +1040,30 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } /* - * Inner class that runs on a long period checking if regions need major - * compaction. + * Inner class that runs on a long period checking if regions need compaction. */ - private static class MajorCompactionChecker extends Chore { + private static class CompactionChecker extends Chore { private final HRegionServer instance; - MajorCompactionChecker(final HRegionServer h, final int sleepTime, + CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { - super("MajorCompactionChecker", sleepTime, h); + super("CompactionChecker", sleepTime, h); this.instance = h; - LOG.info("Runs every " + sleepTime + "ms"); + LOG.info("Runs every " + StringUtils.formatTime(sleepTime)); } @Override protected void chore() { for (HRegion r : this.instance.onlineRegions.values()) { try { - if (r != null && r.isMajorCompaction()) { + if (r == null) + continue; + if (r.isMajorCompaction() || r.hasTooManyStoreFiles()) { // Queue a compaction. Will recognize if major is needed. - this.instance.compactSplitThread.requestCompaction(r, getName() - + " requests major compaction"); + this.instance.compactSplitThread.requestCompaction(r, getName()); } } catch (IOException e) { - LOG.warn("Failed major compaction check on " + r, e); + LOG.warn("Failed compaction check on " + r, e); } } } @@ -1255,8 +1257,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, handler); Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor", handler); - Threads.setDaemonThreadRunning(this.majorCompactionChecker, n - + ".majorCompactionChecker", handler); + Threads.setDaemonThreadRunning(this.compactionChecker, n + + ".compactionChecker", handler); // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -1313,7 +1315,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Verify that all threads are alive if (!(leases.isAlive() && compactSplitThread.isAlive() && cacheFlusher.isAlive() && hlogRoller.isAlive() - && this.majorCompactionChecker.isAlive())) { + && this.compactionChecker.isAlive())) { stop("One or more threads are no longer alive -- stop"); return false; } @@ -1424,7 +1426,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, * have already been called. */ protected void join() { - Threads.shutdown(this.majorCompactionChecker); + Threads.shutdown(this.compactionChecker); Threads.shutdown(this.cacheFlusher); Threads.shutdown(this.compactSplitThread); Threads.shutdown(this.hlogRoller);