From c0048d2f5f7e6bc45b1f4895cb4789be597d0fa8 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 3 Feb 2015 16:17:31 +0800 Subject: [PATCH] HBASE-8329 Limit compaction speed --- .../hbase/regionserver/CompactSplitThread.java | 39 ++- .../hadoop/hbase/regionserver/CompactionTool.java | 4 +- .../hbase/regionserver/DefaultStoreEngine.java | 8 +- .../regionserver/DefaultStoreFileManager.java | 24 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 16 +- .../hadoop/hbase/regionserver/HRegionServer.java | 14 + .../apache/hadoop/hbase/regionserver/HStore.java | 25 +- .../hbase/regionserver/RegionServerServices.java | 8 + .../apache/hadoop/hbase/regionserver/Store.java | 27 +- .../hbase/regionserver/StoreFileManager.java | 6 + .../hbase/regionserver/StripeStoreEngine.java | 6 +- .../hbase/regionserver/StripeStoreFileManager.java | 51 +++- .../compactions/CompactionConfiguration.java | 2 +- .../compactions/CompactionContext.java | 3 +- .../regionserver/compactions/CompactionPolicy.java | 7 +- .../CompactionThroughputController.java | 52 ++++ .../CompactionThroughputControllerFactory.java | 61 +++++ .../hbase/regionserver/compactions/Compactor.java | 102 +++++--- .../regionserver/compactions/DefaultCompactor.java | 13 +- .../NoLimitCompactionThroughputController.java | 66 +++++ ...ressureAwareCompactionThroughputController.java | 265 +++++++++++++++++++ .../compactions/StripeCompactionPolicy.java | 17 +- .../regionserver/compactions/StripeCompactor.java | 25 +- .../hadoop/hbase/MockRegionServerServices.java | 5 + .../org/apache/hadoop/hbase/TestIOFencing.java | 8 +- .../TestRegionObserverScannerOpenHook.java | 9 +- .../hadoop/hbase/master/MockRegionServer.java | 5 + .../hadoop/hbase/regionserver/TestCompaction.java | 28 +- .../TestSplitTransactionOnCluster.java | 6 +- .../hadoop/hbase/regionserver/TestStore.java | 5 +- .../hbase/regionserver/TestStripeCompactor.java | 9 +- .../hbase/regionserver/TestStripeStoreEngine.java | 26 +- .../TestCompactionWithThroughputController.java | 286 +++++++++++++++++++++ .../compactions/TestStripeCompactionPolicy.java | 53 ++-- 34 files changed, 1107 insertions(+), 174 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 247370b..dec4c93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -35,15 +35,18 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -61,6 +64,8 @@ public class CompactSplitThread implements CompactionRequestor { private final ThreadPoolExecutor splits; private final ThreadPoolExecutor mergePool; + private final CompactionThroughputController compactionThroughputController; + /** * Splitting should not take place if the total number of regions exceed this. * This is not a hard limit to the number of regions but it is a guideline to @@ -131,6 +136,10 @@ public class CompactSplitThread implements CompactionRequestor { return t; } }); + + // compaction throughput controller + this.compactionThroughputController = + CompactionThroughputControllerFactory.create(server, conf); } @Override @@ -147,32 +156,32 @@ public class CompactSplitThread implements CompactionRequestor { queueLists.append("Compaction/Split Queue dump:\n"); queueLists.append(" LargeCompation Queue:\n"); BlockingQueue lq = largeCompactions.getQueue(); - Iterator it = lq.iterator(); - while(it.hasNext()){ - queueLists.append(" "+it.next().toString()); + Iterator it = lq.iterator(); + while (it.hasNext()) { + queueLists.append(" " + it.next().toString()); queueLists.append("\n"); } - if( smallCompactions != null ){ + if (smallCompactions != null) { queueLists.append("\n"); queueLists.append(" SmallCompation Queue:\n"); lq = smallCompactions.getQueue(); it = lq.iterator(); - while(it.hasNext()){ - queueLists.append(" "+it.next().toString()); + while (it.hasNext()) { + queueLists.append(" " + it.next().toString()); queueLists.append("\n"); } } - + queueLists.append("\n"); queueLists.append(" Split Queue:\n"); lq = splits.getQueue(); it = lq.iterator(); - while(it.hasNext()){ - queueLists.append(" "+it.next().toString()); + while (it.hasNext()) { + queueLists.append(" " + it.next().toString()); queueLists.append("\n"); } - + queueLists.append("\n"); queueLists.append(" Region Merge Queue:\n"); lq = mergePool.getQueue(); @@ -478,7 +487,7 @@ public class CompactSplitThread implements CompactionRequestor { // Note: please don't put single-compaction logic here; // put it into region/store/etc. This is CST logic. long start = EnvironmentEdgeManager.currentTimeMillis(); - boolean completed = region.compact(compaction, store); + boolean completed = region.compact(compaction, store, compactionThroughputController); long now = EnvironmentEdgeManager.currentTimeMillis(); LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + this + "; duration=" + StringUtils.formatTimeDiff(now, start)); @@ -540,4 +549,10 @@ public class CompactSplitThread implements CompactionRequestor { } } } + + @VisibleForTesting + public CompactionThroughputController getCompactionThroughputController() { + return compactionThroughputController; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java index a9f9332..fb3a375 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.mapreduce.JobUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -162,7 +163,8 @@ public class CompactionTool extends Configured implements Tool { do { CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null); if (compaction == null) break; - List storeFiles = store.compact(compaction); + List storeFiles = + store.compact(compaction, NoLimitCompactionThroughputController.INSTANCE); if (storeFiles != null && !storeFiles.isEmpty()) { if (keepCompactedFiles && deleteCompacted) { for (StoreFile storeFile: storeFiles) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 91eb7ba..3c1345d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.util.ReflectionUtils; /** @@ -63,7 +64,6 @@ public class DefaultStoreEngine extends StoreEngine< @Override protected void createComponents( Configuration conf, Store store, KVComparator kvComparator) throws IOException { - storeFileManager = new DefaultStoreFileManager(kvComparator, conf); String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName()); try { compactor = ReflectionUtils.instantiateWithCustomCtor(className, @@ -80,6 +80,7 @@ public class DefaultStoreEngine extends StoreEngine< } catch (Exception e) { throw new IOException("Unable to load configured compaction policy '" + className + "'", e); } + storeFileManager = new DefaultStoreFileManager(kvComparator, conf, compactionPolicy.getConf()); className = conf.get( DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName()); try { @@ -106,8 +107,9 @@ public class DefaultStoreEngine extends StoreEngine< } @Override - public List compact() throws IOException { - return compactor.compact(request); + public List compact(CompactionThroughputController throughputController) + throws IOException { + return compactor.compact(request, throughputController); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index eb5522d..6475a87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -27,10 +27,11 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; @@ -44,7 +45,8 @@ class DefaultStoreFileManager implements StoreFileManager { static final Log LOG = LogFactory.getLog(DefaultStoreFileManager.class); private final KVComparator kvComparator; - private final Configuration conf; + private final CompactionConfiguration comConf; + private final int blockingFileCount; /** * List of store files inside this store. This is an immutable list that @@ -52,9 +54,12 @@ class DefaultStoreFileManager implements StoreFileManager { */ private volatile ImmutableList storefiles = null; - public DefaultStoreFileManager(KVComparator kvComparator, Configuration conf) { + public DefaultStoreFileManager(KVComparator kvComparator, Configuration conf, + CompactionConfiguration comConf) { this.kvComparator = kvComparator; - this.conf = conf; + this.comConf = comConf; + this.blockingFileCount = + conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); } @Override @@ -129,8 +134,6 @@ class DefaultStoreFileManager implements StoreFileManager { @Override public int getStoreCompactionPriority() { - int blockingFileCount = conf.getInt( - HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); int priority = blockingFileCount - storefiles.size(); return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority; } @@ -161,5 +164,14 @@ class DefaultStoreFileManager implements StoreFileManager { storefiles = ImmutableList.copyOf(storeFiles); } + @Override + public double getCompactionPressure() { + int storefileCount = getStorefileCount(); + int minFilesToCompact = comConf.getMinFilesToCompact(); + if (storefileCount <= minFilesToCompact) { + return 0.0; + } + return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8c2dde3..f4d4348 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -62,7 +62,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -90,6 +89,7 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -127,6 +127,8 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -1443,12 +1445,12 @@ public class HRegion implements HeapSize { // , Writable{ for (Store s : getStores().values()) { CompactionContext compaction = s.requestCompaction(); if (compaction != null) { - compact(compaction, s); + compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE); } } } - /* + /** * Called by compaction thread and after region is opened to compact the * HStores if necessary. * @@ -1459,11 +1461,11 @@ public class HRegion implements HeapSize { // , Writable{ * conflicts with a region split, and that cannot happen because the region * server does them sequentially and not in parallel. * - * @param cr Compaction details, obtained by requestCompaction() + * @param compaction Compaction details, obtained by requestCompaction() * @return whether the compaction completed - * @throws IOException e */ - public boolean compact(CompactionContext compaction, Store store) throws IOException { + public boolean compact(CompactionContext compaction, Store store, + CompactionThroughputController throughputController) throws IOException { assert compaction != null && compaction.hasSelection(); assert !compaction.getRequest().getFiles().isEmpty(); if (this.closing.get() || this.closed.get()) { @@ -1512,7 +1514,7 @@ public class HRegion implements HeapSize { // , Writable{ // We no longer need to cancel the request on the way out of this // method because Store#compact will clean up unconditionally requestNeedsCancellation = false; - store.compact(compaction); + store.compact(compaction, throughputController); } catch (InterruptedIOException iioe) { String msg = "compaction interrupted"; LOG.info(msg, iioe); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2fafdb1..c04338f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -4942,4 +4942,18 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa public HeapMemoryManager getHeapMemoryManager() { return hMemManager; } + + @Override + public double getCompactionPressure() { + double max = 0; + for (HRegion region : onlineRegions.values()) { + for (Store store : region.getStores().values()) { + double normCount = store.getCompactionPressure(); + if (normCount > max) { + max = normCount; + } + } + } + return max; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index f6492aa..105ecfe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -87,6 +88,7 @@ import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -875,7 +877,7 @@ public class HStore implements Store { if (LOG.isInfoEnabled()) { LOG.info("Added " + sf + ", entries=" + r.getEntries() + ", sequenceid=" + logCacheFlushId + - ", filesize=" + StringUtils.humanReadableInt(r.length())); + ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1)); } return sf; } @@ -1083,7 +1085,8 @@ public class HStore implements Store { * @return Storefile we compacted into or null if we failed or opted out early. */ @Override - public List compact(CompactionContext compaction) throws IOException { + public List compact(CompactionContext compaction, + CompactionThroughputController throughputController) throws IOException { assert compaction != null; List sfs = null; CompactionRequest cr = compaction.getRequest();; @@ -1105,10 +1108,10 @@ public class HStore implements Store { LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into tmpdir=" + fs.getTempDir() + ", totalSize=" - + StringUtils.humanReadableInt(cr.getSize())); + + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); // Commence the compaction. - List newFiles = compaction.compact(); + List newFiles = compaction.compact(throughputController); // TODO: get rid of this! if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { @@ -1220,12 +1223,12 @@ public class HStore implements Store { for (StoreFile sf: sfs) { message.append(sf.getPath().getName()); message.append("(size="); - message.append(StringUtils.humanReadableInt(sf.getReader().length())); + message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); message.append("), "); } } message.append("total size for store is ") - .append(StringUtils.humanReadableInt(storeSize)) + .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1)) .append(". This selection was in queue for ") .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) @@ -1285,7 +1288,7 @@ public class HStore implements Store { } } - this.replaceStoreFiles(inputStoreFiles, Collections.EMPTY_LIST); + this.replaceStoreFiles(inputStoreFiles, Collections.emptyList()); this.completeCompaction(inputStoreFiles); } @@ -1503,7 +1506,7 @@ public class HStore implements Store { completeCompaction(delSfs); LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this + " of " + this.getRegionInfo().getRegionNameAsString() - + "; total size for store is " + StringUtils.humanReadableInt(storeSize)); + + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1)); } @Override @@ -1877,7 +1880,6 @@ public class HStore implements Store { } @Override - // TODO: why is there this and also getNumberOfStorefiles?! Remove one. public int getStorefilesCount() { return this.storeEngine.getStoreFileManager().getStorefileCount(); } @@ -2172,4 +2174,9 @@ public class HStore implements Store { public long getMajorCompactedCellsSize() { return majorCompactedCellsSize; } + + @Override + public double getCompactionPressure() { + return storeEngine.getStoreFileManager().getCompactionPressure(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 2a00473..d331840 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -147,4 +147,12 @@ public interface RegionServerServices * @return heap memory manager instance */ HeapMemoryManager getHeapMemoryManager(); + + /** + * @return the max compaction pressure of all stores on this regionserver. The value should be + * greater than or equal to 0.0, and any value greater than 1.0 means we enter the + * emergency state that some stores have too many store files. + * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure() + */ + double getCompactionPressure(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 770ab75..a116a4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -22,15 +22,15 @@ import java.util.Collection; import java.util.List; import java.util.NavigableSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.compress.Compression; @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; /** * Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or @@ -186,7 +187,8 @@ public interface Store extends HeapSize, StoreConfigInformation { void cancelRequestedCompaction(CompactionContext compaction); - List compact(CompactionContext compaction) throws IOException; + List compact(CompactionContext compaction, + CompactionThroughputController throughputController) throws IOException; /** * @return true if we should run a major compaction. @@ -386,4 +388,21 @@ public interface Store extends HeapSize, StoreConfigInformation { * @return Whether this store has too many store files. */ boolean hasTooManyStoreFiles(); + + /** + * This value can represent the degree of emergency of compaction for this store. It should be + * greater than or equal to 0.0, any value greater than 1.0 means we have too many store files. + *
    + *
  • if getStorefilesCount <= getMinFilesToCompact, return 0.0
  • + *
  • return (getStorefilesCount - getMinFilesToCompact) / (blockingFileCount - + * getMinFilesToCompact)
  • + *
+ *

+ * And for striped stores, we should calculate this value by the files in each stripe separately + * and return the maximum value. + *

+ * It is similar to {@link #getCompactPriority()} except that it is more suitable to use in a + * linear formula. + */ + double getCompactionPressure(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index f703420..8196b1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -134,4 +134,10 @@ public interface StoreFileManager { * @return The files which don't have any necessary data according to TTL and other criteria. */ Collection getUnneededFiles(long maxTs, List filesCompacting); + + /** + * @return the compaction pressure used for compaction throughput tuning. + * @see Store#getCompactionPressure() + */ + double getCompactionPressure(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index cd731aa..b910527 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import com.google.common.base.Preconditions; @@ -98,9 +99,10 @@ public class StripeStoreEngine extends StoreEngine compact() throws IOException { + public List compact(CompactionThroughputController throughputController) + throws IOException { Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); - return this.stripeRequest.execute(compactor); + return this.stripeRequest.execute(compactor, throughputController); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index d4e8800..c18d362 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -33,15 +33,15 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConcatenatedLists; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; @@ -474,20 +474,23 @@ public class StripeStoreFileManager if (!LOG.isDebugEnabled()) return; StringBuilder sb = new StringBuilder(); sb.append("\n" + string + "; current stripe state is as such:"); - sb.append("\n level 0 with ").append(state.level0Files.size()) + sb.append("\n level 0 with ") + .append(state.level0Files.size()) .append( " files: " - + StringUtils.humanReadableInt(StripeCompactionPolicy - .getTotalFileSize(state.level0Files)) + ";"); + + TraditionalBinaryPrefix.long2String( + StripeCompactionPolicy.getTotalFileSize(state.level0Files), "", 1) + ";"); for (int i = 0; i < state.stripeFiles.size(); ++i) { String endRow = (i == state.stripeEndRows.length) ? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]"; - sb.append("\n stripe ending in ").append(endRow).append(" with ") + sb.append("\n stripe ending in ") + .append(endRow) + .append(" with ") .append(state.stripeFiles.get(i).size()) .append( " files: " - + StringUtils.humanReadableInt(StripeCompactionPolicy - .getTotalFileSize(state.stripeFiles.get(i))) + ";"); + + TraditionalBinaryPrefix.long2String( + StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i)), "", 1) + ";"); } sb.append("\n").append(state.stripeFiles.size()).append(" stripes total."); sb.append("\n").append(getStorefileCount()).append(" files total."); @@ -958,4 +961,36 @@ public class StripeStoreFileManager } return expiredStoreFiles; } + + @Override + public double getCompactionPressure() { + State stateLocal = this.state; + if (stateLocal.allFilesCached.size() > blockingFileCount) { + // just a hit to tell others that we have reached the blocking file count. + return 2.0; + } + if (stateLocal.stripeFiles.isEmpty()) { + return 0.0; + } + int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size(); + // do not calculate L0 separately because data will be moved to stripe quickly and in most cases + // we flush data to stripe directly. + int delta = stateLocal.level0Files.isEmpty() ? 0 : 1; + double max = 0.0; + for (ImmutableList stripeFile : stateLocal.stripeFiles) { + int stripeFileCount = stripeFile.size(); + double normCount = + (double) (stripeFileCount + delta - config.getStripeCompactMinFiles()) + / (blockingFilePerStripe - config.getStripeCompactMinFiles()); + if (normCount >= 1.0) { + // This could happen if stripe is not split evenly. Do not return values that larger than + // 1.0 because we have not reached the blocking file count actually. + return 1.0; + } + if (normCount > max) { + max = normCount; + } + } + return max; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 4f3530c..a08dc17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -126,7 +126,7 @@ public class CompactionConfiguration { /** * @return upper bound on number of files to be included in minor compactions */ - int getMinFilesToCompact() { + public int getMinFilesToCompact() { return minFilesToCompact; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java index 1bf205a..1c89bf0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java @@ -68,7 +68,8 @@ public abstract class CompactionContext { * Runs the compaction based on current selection. select/forceSelect must have been called. * @return The new file paths resulting from compaction. */ - public abstract List compact() throws IOException; + public abstract List compact(CompactionThroughputController throughputController) + throws IOException; public CompactionRequest getRequest() { assert hasSelection(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java index efe3066..c0d62b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java @@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.util.Collection; -import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; @@ -62,4 +61,8 @@ public abstract class CompactionPolicy { public void setConf(Configuration conf) { this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo); } + + public CompactionConfiguration getConf() { + return this.comConf; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java new file mode 100644 index 0000000..657ecb4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +/** + * A utility that constrains the total throughput of one or more simultaneous flows (compactions) by + * sleeping when necessary. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public interface CompactionThroughputController extends Stoppable { + + /** + * Setup controller for the given region server. + */ + void setup(RegionServerServices server); + + /** + * Start a compaction. + */ + void start(String compactionName); + + /** + * Control the compaction throughput. Will sleep if too fast. + * @return the actual sleep time. + */ + long control(String compactionName, long size) throws InterruptedException; + + /** + * Finish a compaction. Should call this method in a finally block. + */ + void finish(String compactionName); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java new file mode 100644 index 0000000..e9b63ee --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.util.ReflectionUtils; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class CompactionThroughputControllerFactory { + + private static final Log LOG = LogFactory.getLog(CompactionThroughputControllerFactory.class); + + public static final String HBASE_THROUGHPUT_CONTROLLER_KEY = + "hbase.regionserver.throughput.controller"; + + private static final Class + DEFAULT_THROUGHPUT_CONTROLLER_CLASS = NoLimitCompactionThroughputController.class; + + public static CompactionThroughputController create(RegionServerServices server, + Configuration conf) { + Class clazz = getThroughputControllerClass(conf); + CompactionThroughputController controller = ReflectionUtils.newInstance(clazz, conf); + controller.setup(server); + return controller; + } + + public static Class getThroughputControllerClass( + Configuration conf) { + String className = + conf.get(HBASE_THROUGHPUT_CONTROLLER_KEY, DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName()); + try { + return Class.forName(className).asSubclass(CompactionThroughputController.class); + } catch (Exception e) { + LOG.warn( + "Unable to load configured throughput controller '" + className + + "', load default throughput controller " + + DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e); + return DEFAULT_THROUGHPUT_CONTROLLER_CLASS; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index e99f267..8581e29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -25,12 +26,12 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; @@ -44,7 +45,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; /** * A compactor is a compaction algorithm associated a given policy. Base class also contains @@ -146,7 +147,7 @@ public abstract class Compactor { LOG.debug("Compacting " + file + ", keycount=" + keyCount + ", bloomtype=" + r.getBloomFilterType().toString() + - ", size=" + StringUtils.humanReadableInt(r.length()) + + ", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) + ", encoding=" + r.getHFileReader().getDataBlockEncoding() + ", seqNum=" + seqNum + (calculatePutTs ? ", earliestPutTs=" + earliestPutTs: "")); @@ -205,8 +206,9 @@ public abstract class Compactor { * @param smallestReadPoint Smallest read point. * @return Whether compaction ended; false if it was interrupted for some reason. */ - protected boolean performCompaction(InternalScanner scanner, - CellSink writer, long smallestReadPoint) throws IOException { + protected boolean performCompaction(InternalScanner scanner, CellSink writer, + long smallestReadPoint, CompactionThroughputController throughputController) + throws IOException { long bytesWritten = 0; long bytesWrittenProgress = 0; // Since scanner.next() can return 'false' but still be delivering data, @@ -217,52 +219,68 @@ public abstract class Compactor { if (LOG.isDebugEnabled()) { lastMillis = EnvironmentEdgeManager.currentTimeMillis(); } + String compactionName = + store.getRegionInfo().getRegionNameAsString() + "#" + store.getFamily().getNameAsString(); long now = 0; boolean hasMore; - do { - hasMore = scanner.next(kvs, compactionKVMax); - if (LOG.isDebugEnabled()) { - now = EnvironmentEdgeManager.currentTimeMillis(); - } - // output to writer: - for (Cell c : kvs) { - KeyValue kv = KeyValueUtil.ensureKeyValue(c); - if (kv.getMvccVersion() <= smallestReadPoint) { - kv.setMvccVersion(0); - } - writer.append(kv); - int len = kv.getLength(); - ++progress.currentCompactedKVs; - progress.totalCompactedSize += len; + throughputController.start(compactionName); + try { + do { + hasMore = scanner.next(kvs, compactionKVMax); if (LOG.isDebugEnabled()) { - bytesWrittenProgress += len; + now = EnvironmentEdgeManager.currentTimeMillis(); } + // output to writer: + for (Cell c : kvs) { + KeyValue kv = KeyValueUtil.ensureKeyValue(c); + if (kv.getMvccVersion() <= smallestReadPoint) { + kv.setMvccVersion(0); + } + writer.append(kv); + int len = kv.getLength(); + ++progress.currentCompactedKVs; + progress.totalCompactedSize += len; + if (LOG.isDebugEnabled()) { + bytesWrittenProgress += len; + } + throughputController.control(compactionName, len); - // check periodically to see if a system stop is requested - if (closeCheckInterval > 0) { - bytesWritten += len; - if (bytesWritten > closeCheckInterval) { - bytesWritten = 0; - if (!store.areWritesEnabled()) { - progress.cancel(); - return false; + // check periodically to see if a system stop is requested + if (closeCheckInterval > 0) { + bytesWritten += len; + if (bytesWritten > closeCheckInterval) { + bytesWritten = 0; + if (!store.areWritesEnabled()) { + progress.cancel(); + return false; + } } } } - } - // Log the progress of long running compactions every minute if - // logging at DEBUG level - if (LOG.isDebugEnabled()) { - if ((now - lastMillis) >= 60 * 1000) { - LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec", - (bytesWrittenProgress / 1024.0) / ((now - lastMillis) / 1000.0))); - lastMillis = now; - bytesWrittenProgress = 0; + // Log the progress of long running compactions every minute if + // logging at DEBUG level + if (LOG.isDebugEnabled()) { + if ((now - lastMillis) >= 60 * 1000) { + LOG.debug("Compaction progress: " + + compactionName + + " " + + progress + + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0) + / ((now - lastMillis) / 1000.0)) + ", throughputController is " + + throughputController); + lastMillis = now; + bytesWrittenProgress = 0; + } } - } - kvs.clear(); - } while (hasMore); - progress.complete(); + kvs.clear(); + } while (hasMore); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted while control throughput of compacting " + + compactionName); + } finally { + throughputController.finish(compactionName); + progress.complete(); + } return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 1c23b65..b505efc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -36,7 +36,8 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; /** - * Compact passed set of files. Create an instance and then call {@link #compact(CompactionRequest)} + * Compact passed set of files. Create an instance and then call + * {@link #compact(CompactionRequest, CompactionThroughputController)} */ @InterfaceAudience.Private public class DefaultCompactor extends Compactor { @@ -49,7 +50,8 @@ public class DefaultCompactor extends Compactor { /** * Do a minor/major compaction on an explicit set of storefiles from a Store. */ - public List compact(final CompactionRequest request) throws IOException { + public List compact(final CompactionRequest request, + CompactionThroughputController throughputController) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isMajor()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -93,7 +95,8 @@ public class DefaultCompactor extends Compactor { // because we need record the max seq id for the store file, see HBASE-6059 writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); - boolean finished = performCompaction(scanner, writer, smallestReadPoint); + boolean finished = + performCompaction(scanner, writer, smallestReadPoint, throughputController); if (!finished) { writer.close(); store.getFileSystem().delete(writer.getPath(), false); @@ -137,7 +140,7 @@ public class DefaultCompactor extends Compactor { /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to - * {@link #compact(CompactionRequest)}; + * {@link #compact(CompactionRequest, CompactionThroughputController)}; * @param filesToCompact the files to compact. These are used as the compactionSelection for * the generated {@link CompactionRequest}. * @param isMajor true to major compact (prune all deletes, max versions, etc) @@ -149,6 +152,6 @@ public class DefaultCompactor extends Compactor { throws IOException { CompactionRequest cr = new CompactionRequest(filesToCompact); cr.setIsMajor(isMajor); - return this.compact(cr); + return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java new file mode 100644 index 0000000..766b2cb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +/** + * A dummy CompactionThroughputController that does nothing. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class NoLimitCompactionThroughputController implements CompactionThroughputController { + + public static final NoLimitCompactionThroughputController INSTANCE = + new NoLimitCompactionThroughputController(); + + @Override + public void setup(RegionServerServices server) { + } + + @Override + public void start(String compactionName) { + } + + @Override + public long control(String compactionName, long size) throws InterruptedException { + return 0; + } + + @Override + public void finish(String compactionName) { + } + + private volatile boolean stopped; + + @Override + public void stop(String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } + + @Override + public String toString() { + return "NoLimitCompactionThroughputController"; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java new file mode 100644 index 0000000..1594451 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * A throughput controller which uses the follow schema to limit throughput + *

    + *
  • If compaction pressure is greater than 1.0, no limitation.
  • + *
  • In off peak hours, use a fixed throughput limitation + * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}
  • + *
  • In normal hours, the max throughput is tune between + * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and + * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula "lower + + * (higer - lower) * compactionPressure", where compactionPressure is in range [0.0, 1.0]
  • + *
+ * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure() + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class PressureAwareCompactionThroughputController extends Configured implements + CompactionThroughputController, Stoppable { + + private final static Log LOG = LogFactory + .getLog(PressureAwareCompactionThroughputController.class); + + public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND = + "hbase.hstore.compaction.throughput.higher.bound"; + + private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND = + 20L * 1024 * 1024; + + public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND = + "hbase.hstore.compaction.throughput.lower.bound"; + + private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND = + 10L * 1024 * 1024; + + public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = + "hbase.hstore.compaction.throughput.offpeak"; + + private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE; + + public static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = + "hbase.hstore.compaction.throughput.tune.period"; + + private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000; + + /** + * Stores the information of one controlled compaction. + */ + private static final class ActiveCompaction { + + private final long startTime; + + private long lastControlTime; + + private long lastControlSize; + + private long totalSize; + + private long numberOfSleeps; + + private long totalSleepTime; + + // prevent too many debug log + private long lastLogTime; + + ActiveCompaction() { + long currentTime = EnvironmentEdgeManager.currentTimeMillis(); + this.startTime = currentTime; + this.lastControlTime = currentTime; + this.lastLogTime = currentTime; + } + } + + private long maxThroughputHigherBound; + + private long maxThroughputLowerBound; + + private long maxThroughputOffpeak; + + private OffPeakHours offPeakHours; + + private long controlPerSize; + + private int tuningPeriod; + + volatile double maxThroughput; + + private final ConcurrentMap activeCompactions = + new ConcurrentHashMap(); + + @Override + public void setup(final RegionServerServices server) { + Chore tuner = new Chore("CompactionThroughputTuner", tuningPeriod, this) { + + @Override + protected void chore() { + tune(server.getCompactionPressure()); + } + }; + tuner.setDaemon(true); + tuner.start(); + } + + private void tune(double compactionPressure) { + double maxThroughputToSet; + if (compactionPressure > 1.0) { + // set to unlimited if some stores already reach the blocking store file count + maxThroughputToSet = Double.MAX_VALUE; + } else if (offPeakHours.isOffPeakHour()) { + maxThroughputToSet = maxThroughputOffpeak; + } else { + // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to + // calculate the throughput limitation. + maxThroughputToSet = + maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound) + * compactionPressure; + } + if (LOG.isDebugEnabled()) { + LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to " + + throughputDesc(maxThroughputToSet)); + } + this.maxThroughput = maxThroughputToSet; + } + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + if (conf == null) { + return; + } + this.maxThroughputHigherBound = + conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND); + this.maxThroughputLowerBound = + conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND); + this.maxThroughputOffpeak = + conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK, + DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK); + this.offPeakHours = OffPeakHours.getInstance(conf); + this.controlPerSize = this.maxThroughputLowerBound; + this.maxThroughput = this.maxThroughputLowerBound; + this.tuningPeriod = + getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, + DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD); + LOG.info("Compaction throughput configurations, higher bound: " + + throughputDesc(maxThroughputHigherBound) + ", lower bound " + + throughputDesc(maxThroughputLowerBound) + ", off peak: " + + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms"); + } + + private String throughputDesc(long deltaSize, long elapsedTime) { + return throughputDesc((double) deltaSize / elapsedTime * 1000); + } + + private String throughputDesc(double speed) { + if (speed >= 1E15) { // large enough to say it is unlimited + return "unlimited"; + } else { + return String.format("%.2f MB/sec", speed / 1024 / 1024); + } + } + + @Override + public void start(String compactionName) { + activeCompactions.put(compactionName, new ActiveCompaction()); + } + + @Override + public long control(String compactionName, long size) throws InterruptedException { + ActiveCompaction compaction = activeCompactions.get(compactionName); + compaction.totalSize += size; + long deltaSize = compaction.totalSize - compaction.lastControlSize; + if (deltaSize < controlPerSize) { + return 0; + } + long now = EnvironmentEdgeManager.currentTimeMillis(); + double maxThroughputPerCompaction = this.maxThroughput / activeCompactions.size(); + long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms + long elapsedTime = now - compaction.lastControlTime; + compaction.lastControlSize = compaction.totalSize; + if (elapsedTime >= minTimeAllowed) { + compaction.lastControlTime = EnvironmentEdgeManager.currentTimeMillis(); + return 0; + } + // too fast + long sleepTime = minTimeAllowed - elapsedTime; + if (LOG.isDebugEnabled()) { + // do not log too much + if (now - compaction.lastLogTime > 60L * 1000) { + LOG.debug(compactionName + " sleep " + sleepTime + " ms because current throughput is " + + throughputDesc(deltaSize, elapsedTime) + ", max allowed is " + + throughputDesc(maxThroughputPerCompaction) + ", already slept " + + compaction.numberOfSleeps + " time(s) and total slept time is " + + compaction.totalSleepTime + " ms till now."); + compaction.lastLogTime = now; + } + } + Thread.sleep(sleepTime); + compaction.numberOfSleeps++; + compaction.totalSleepTime += sleepTime; + compaction.lastControlTime = EnvironmentEdgeManager.currentTimeMillis(); + return sleepTime; + } + + @Override + public void finish(String compactionName) { + ActiveCompaction compaction = activeCompactions.remove(compactionName); + long elapsedTime = + Math.max(1, EnvironmentEdgeManager.currentTimeMillis() - compaction.startTime); + LOG.info(compactionName + " average throughput is " + + throughputDesc(compaction.totalSize, elapsedTime) + ", slept " + + compaction.numberOfSleeps + " time(s) and total slept time is " + + compaction.totalSleepTime + " ms. " + activeCompactions.size() + + " active compactions remaining, total limit is " + throughputDesc(maxThroughput)); + } + + private volatile boolean stopped = false; + + @Override + public void stop(String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } + + @Override + public String toString() { + return "DefaultCompactionThroughputController [maxThroughput=" + throughputDesc(maxThroughput) + + ", activeCompactions=" + activeCompactions.size() + "]"; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index cd988e5..c0c5330 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -396,7 +396,8 @@ public class StripeCompactionPolicy extends CompactionPolicy { * @param compactor Compactor. * @return result of compact(...) */ - public abstract List execute(StripeCompactor compactor) throws IOException; + public abstract List execute(StripeCompactor compactor, + CompactionThroughputController throughputController) throws IOException; public StripeCompactionRequest(CompactionRequest request) { this.request = request; @@ -447,9 +448,10 @@ public class StripeCompactionPolicy extends CompactionPolicy { } @Override - public List execute(StripeCompactor compactor) throws IOException { - return compactor.compact( - this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow); + public List execute(StripeCompactor compactor, + CompactionThroughputController throughputController) throws IOException { + return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow, + this.majorRangeToRow, throughputController); } } @@ -497,9 +499,10 @@ public class StripeCompactionPolicy extends CompactionPolicy { } @Override - public List execute(StripeCompactor compactor) throws IOException { - return compactor.compact(this.request, this.targetCount, this.targetKvs, - this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow); + public List execute(StripeCompactor compactor, + CompactionThroughputController throughputController) throws IOException { + return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow, + this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController); } /** Set major range of the compaction to the entire compaction range. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index a36087d..7a5cbb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -21,25 +21,22 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; -import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; -import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.util.Bytes; /** @@ -54,7 +51,8 @@ public class StripeCompactor extends Compactor { } public List compact(CompactionRequest request, List targetBoundaries, - byte[] majorRangeFromRow, byte[] majorRangeToRow) throws IOException { + byte[] majorRangeFromRow, byte[] majorRangeToRow, + CompactionThroughputController throughputController) throws IOException { if (LOG.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:"); @@ -65,12 +63,13 @@ public class StripeCompactor extends Compactor { } StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter( targetBoundaries, majorRangeFromRow, majorRangeToRow); - return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow); + return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, + throughputController); } public List compact(CompactionRequest request, int targetCount, long targetSize, - byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow) - throws IOException { + byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, + CompactionThroughputController throughputController) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Executing compaction with " + targetSize + " target file size, no more than " + targetCount + " files, in [" @@ -78,11 +77,13 @@ public class StripeCompactor extends Compactor { } StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter( targetCount, targetSize, left, right); - return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow); + return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, + throughputController); } private List compactInternal(StripeMultiFileWriter mw, CompactionRequest request, - byte[] majorRangeFromRow, byte[] majorRangeToRow) throws IOException { + byte[] majorRangeFromRow, byte[] majorRangeToRow, + CompactionThroughputController throughputController) throws IOException { final Collection filesToCompact = request.getFiles(); final FileDetails fd = getFileDetails(filesToCompact, request.isMajor()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -124,7 +125,7 @@ public class StripeCompactor extends Compactor { // It is ok here if storeScanner is null. StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; mw.init(storeScanner, factory, store.getComparator()); - finished = performCompaction(scanner, mw, smallestReadPoint); + finished = performCompaction(scanner, mw, smallestReadPoint, throughputController); if (!finished) { throw new InterruptedIOException( "Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 7369a30..f90f51e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -254,4 +254,9 @@ class MockRegionServerServices implements RegionServerServices { public HeapMemoryManager getHeapMemoryManager() { return null; } + + @Override + public double getCompactionPressure() { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index ff96acf..6fe1a2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -117,14 +118,17 @@ public class TestIOFencing { throw new IOException(ex); } } + @Override - public boolean compact(CompactionContext compaction, Store store) throws IOException { + public boolean compact(CompactionContext compaction, Store store, + CompactionThroughputController throughputController) throws IOException { try { - return super.compact(compaction, store); + return super.compact(compaction, store, throughputController); } finally { compactCount++; } } + public int countStoreFiles() { int count = 0; for (Store store : stores.values()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 8875487..19bde79 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -60,7 +59,9 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -225,9 +226,11 @@ public class TestRegionObserverScannerOpenHook { if (compactionStateChangeLatch == null) compactionStateChangeLatch = new CountDownLatch(1); return compactionStateChangeLatch; } + @Override - public boolean compact(CompactionContext compaction, Store store) throws IOException { - boolean ret = super.compact(compaction, store); + public boolean compact(CompactionContext compaction, Store store, + CompactionThroughputController throughputController) throws IOException { + boolean ret = super.compact(compaction, store, throughputController); if (ret) compactionStateChangeLatch.countDown(); return ret; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 5390d85..c09a82d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -594,4 +594,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { public HeapMemoryManager getHeapMemoryManager() { return null; } + + @Override + public double getCompactionPressure() { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 2b7cf96..db315b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -50,15 +50,18 @@ import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -98,8 +101,10 @@ public class TestCompaction { super(); // Set cache flush size to 1MB - conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024); + conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); conf.setInt("hbase.hregion.memstore.block.multiplier", 100); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + NoLimitCompactionThroughputController.class.getName()); compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); secondRowBytes = START_KEY_BYTES.clone(); @@ -358,7 +363,8 @@ public class TestCompaction { } @Override - public List compact() throws IOException { + public List compact(CompactionThroughputController throughputController) + throws IOException { finishCompaction(this.selectedFiles); return new ArrayList(); } @@ -409,12 +415,15 @@ public class TestCompaction { } @Override - public List compact() throws IOException { + public List compact(CompactionThroughputController throughputController) + throws IOException { try { isInCompact = true; - synchronized (this) { this.wait(); } + synchronized (this) { + this.wait(); + } } catch (InterruptedException e) { - Assume.assumeNoException(e); + Assume.assumeNoException(e); } return new ArrayList(); } @@ -485,9 +494,12 @@ public class TestCompaction { // Set up the region mock that redirects compactions. HRegion r = mock(HRegion.class); - when(r.compact(any(CompactionContext.class), any(Store.class))).then(new Answer() { + when( + r.compact(any(CompactionContext.class), any(Store.class), + any(CompactionThroughputController.class))).then(new Answer() { public Boolean answer(InvocationOnMock invocation) throws Throwable { - ((CompactionContext)invocation.getArguments()[0]).compact(); + ((CompactionContext)invocation.getArguments()[0]).compact( + (CompactionThroughputController)invocation.getArguments()[2]); return true; } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 62535e0..5254b77 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -39,14 +39,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.RegionTransition; @@ -74,9 +73,10 @@ import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 101402d..0a21569 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.io.compress.Compression; @@ -63,9 +62,11 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; @@ -358,7 +359,7 @@ public class TestStore { Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); // after compact; check the lowest time stamp - store.compact(store.requestCompaction()); + store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE); lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index 2a918e7..5471ee5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -120,7 +121,8 @@ public class TestStripeCompactor { StoreFileWritersCapture writers = new StoreFileWritersCapture(); StripeCompactor sc = createCompactor(writers, input); List paths = - sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo); + sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo, + NoLimitCompactionThroughputController.INSTANCE); writers.verifyKvs(output, allFiles, true); if (allFiles) { assertEquals(output.length, paths.size()); @@ -155,8 +157,9 @@ public class TestStripeCompactor { byte[] left, byte[] right, KeyValue[][] output) throws Exception { StoreFileWritersCapture writers = new StoreFileWritersCapture(); StripeCompactor sc = createCompactor(writers, input); - List paths = sc.compact( - createDummyRequest(), targetCount, targetSize, left, right, null, null); + List paths = + sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null, + NoLimitCompactionThroughputController.INSTANCE); assertEquals(output.length, paths.size()); writers.verifyKvs(output, true, true); List boundaries = new ArrayList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index 2c7637b..4e002de 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -17,8 +17,16 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; @@ -29,6 +37,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -62,9 +72,10 @@ public class TestStripeStoreEngine { TestStoreEngine se = createEngine(conf); StripeCompactor mockCompactor = mock(StripeCompactor.class); se.setCompactorOverride(mockCompactor); - when(mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), - any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class))) - .thenReturn(new ArrayList()); + when( + mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class), + any(byte[].class), any(byte[].class), any(byte[].class), + any(CompactionThroughputController.class))).thenReturn(new ArrayList()); // Produce 3 L0 files. StoreFile sf = createFile(); @@ -82,9 +93,10 @@ public class TestStripeStoreEngine { assertEquals(2, compaction.getRequest().getFiles().size()); assertFalse(compaction.getRequest().getFiles().contains(sf)); // Make sure the correct method it called on compactor. - compaction.compact(); + compaction.compact(NoLimitCompactionThroughputController.INSTANCE); verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L, - StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null); + StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null, + NoLimitCompactionThroughputController.INSTANCE); } private static StoreFile createFile() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java new file mode 100644 index 0000000..586f363 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreEngine; +import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; +import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class }) +public class TestCompactionWithThroughputController { + + private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final double EPSILON = 1E-6; + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (HRegion region : hrs.getOnlineRegions(tableName)) { + return region.getStores().values().iterator().next(); + } + } + return null; + } + + private Store prepareData() throws IOException, InterruptedException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTable table = TEST_UTIL.createTable(tableName, family); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).add(family, qualifier, value)); + } + admin.flush(tableName.getName()); + } + return getStoreWithName(tableName); + } + + private long testCompactionWithThroughputLimit() throws Exception { + long throughputLimit = 1024L * 1024; + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setInt(CompactionConfiguration.MIN_KEY, 100); + conf.setInt(CompactionConfiguration.MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + throughputLimit); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + throughputLimit); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + PressureAwareCompactionThroughputController.class.getName()); + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + long startTime = System.currentTimeMillis(); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName.getName()); + Thread.sleep(5000); + assertEquals(10, store.getStorefilesCount()); + while (store.getStorefilesCount() != 1) { + Thread.sleep(20); + } + long duration = System.currentTimeMillis() - startTime; + double throughput = (double) store.getStorefilesSize() / duration * 1000; + // confirm that the speed limit work properly(not too fast, and also not too slow) + // 20% is the max acceptable error rate. + assertTrue(throughput < throughputLimit * 1.2); + assertTrue(throughput > throughputLimit * 0.8); + return System.currentTimeMillis() - startTime; + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + private long testCompactionWithoutThroughputLimit() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setInt(CompactionConfiguration.MIN_KEY, 100); + conf.setInt(CompactionConfiguration.MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + NoLimitCompactionThroughputController.class.getName()); + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + long startTime = System.currentTimeMillis(); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName.getName()); + while (store.getStorefilesCount() != 1) { + Thread.sleep(20); + } + return System.currentTimeMillis() - startTime; + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testCompaction() throws Exception { + long limitTime = testCompactionWithThroughputLimit(); + long noLimitTime = testCompactionWithoutThroughputLimit(); + LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use " + + noLimitTime + "ms"); + // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this + // is a very weak assumption. + assertTrue(limitTime > noLimitTime * 2); + } + + /** + * Test the tuning task of {@link PressureAwareCompactionThroughputController} + */ + @Test + public void testThroughputTuning() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + 20L * 1024 * 1024); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + 10L * 1024 * 1024); + conf.setInt(CompactionConfiguration.MIN_KEY, 4); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + PressureAwareCompactionThroughputController.class.getName()); + conf.setInt( + PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, + 1000); + TEST_UTIL.startMiniCluster(1); + HConnection conn = HConnectionManager.createConnection(conf); + try { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + htd.setCompactionEnabled(false); + TEST_UTIL.getHBaseAdmin().createTable(htd); + TEST_UTIL.waitTableAvailable(tableName.getName()); + HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); + PressureAwareCompactionThroughputController throughputController = + (PressureAwareCompactionThroughputController) regionServer.compactSplitThread + .getCompactionThroughputController(); + assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON); + HTableInterface table = conn.getTable(tableName); + for (int i = 0; i < 5; i++) { + table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + } + Thread.sleep(2000); + assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON); + + table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + Thread.sleep(2000); + assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON); + + table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + Thread.sleep(2000); + assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON); + } finally { + conn.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } + + /** + * Test the logic that we calculate compaction pressure for a striped store. + */ + @Test + public void testGetCompactionPressureForStripedStore() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); + conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false); + conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2); + conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12); + TEST_UTIL.startMiniCluster(1); + HConnection conn = HConnectionManager.createConnection(conf); + try { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + htd.setCompactionEnabled(false); + TEST_UTIL.getHBaseAdmin().createTable(htd); + TEST_UTIL.waitTableAvailable(tableName.getName()); + HStore store = (HStore) getStoreWithName(tableName); + assertEquals(0, store.getStorefilesCount()); + assertEquals(0.0, store.getCompactionPressure(), EPSILON); + HTableInterface table = conn.getTable(tableName); + for (int i = 0; i < 4; i++) { + table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0])); + table.put(new Put(Bytes.toBytes(100 + i)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + } + assertEquals(8, store.getStorefilesCount()); + assertEquals(0.0, store.getCompactionPressure(), EPSILON); + + table.put(new Put(Bytes.toBytes(4)).add(family, qualifier, new byte[0])); + table.put(new Put(Bytes.toBytes(104)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + assertEquals(10, store.getStorefilesCount()); + assertEquals(0.5, store.getCompactionPressure(), EPSILON); + + table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0])); + table.put(new Put(Bytes.toBytes(105)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + assertEquals(12, store.getStorefilesCount()); + assertEquals(1.0, store.getCompactionPressure(), EPSILON); + + table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0])); + table.put(new Put(Bytes.toBytes(106)).add(family, qualifier, new byte[0])); + TEST_UTIL.flush(tableName); + assertEquals(14, store.getStorefilesCount()); + assertEquals(2.0, store.getCompactionPressure(), EPSILON); + } finally { + conn.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 269f68a..2b51502 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -208,9 +209,10 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); assertEquals(si.getStorefiles(), scr.getRequest().getFiles()); - scr.execute(sc); - verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), - aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY)); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), + aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), + any(NoLimitCompactionThroughputController.class)); } @Test @@ -442,7 +444,7 @@ public class TestStripeCompactionPolicy { // All the Stripes are expired, so the Compactor will not create any Writers. We need to create // an empty file to preserve metadata StripeCompactor sc = createCompactor(); - List paths = scr.execute(sc); + List paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); assertEquals(1, paths.size()); } @@ -501,22 +503,21 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc); - verify(sc, times(1)).compact(eq(scr.getRequest()), argThat( - new ArgumentMatcher>() { - @Override - public boolean matches(Object argument) { - @SuppressWarnings("unchecked") - List other = (List)argument; - if (other.size() != boundaries.size()) return false; - for (int i = 0; i < other.size(); ++i) { - if (!Bytes.equals(other.get(i), boundaries.get(i))) return false; - } - return true; - } - }), - dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), - dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo)); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher>() { + @Override + public boolean matches(Object argument) { + @SuppressWarnings("unchecked") + List other = (List) argument; + if (other.size() != boundaries.size()) return false; + for (int i = 0; i < other.size(); ++i) { + if (!Bytes.equals(other.get(i), boundaries.get(i))) return false; + } + return true; + } + }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), + dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), + any(NoLimitCompactionThroughputController.class)); } /** @@ -537,11 +538,12 @@ public class TestStripeCompactionPolicy { assertTrue(!needsCompaction || policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); verify(sc, times(1)).compact(eq(scr.getRequest()), - count == null ? anyInt() : eq(count.intValue()), - size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), - dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end)); + count == null ? anyInt() : eq(count.intValue()), + size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), + dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), + any(NoLimitCompactionThroughputController.class)); } /** Verify arbitrary flush. */ @@ -727,7 +729,10 @@ public class TestStripeCompactionPolicy { HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo")); StoreFileWritersCapture writers = new StoreFileWritersCapture(); Store store = mock(Store.class); + HRegionInfo info = mock(HRegionInfo.class); + when(info.getRegionNameAsString()).thenReturn("testRegion"); when(store.getFamily()).thenReturn(col); + when(store.getRegionInfo()).thenReturn(info); when( store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); -- 1.9.1