diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7b8d74a..9ad798f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -106,6 +106,8 @@ import com.google.common.collect.Lists; */ @InterfaceAudience.Private public class HStore implements Store { + public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7; + static final Log LOG = LogFactory.getLog(HStore.class); protected final MemStore memstore; @@ -123,7 +125,6 @@ public class HStore implements Store { volatile boolean forceMajor = false; /* how many bytes to write between status checks */ static int closeCheckInterval = 0; - private final int blockingStoreFileCount; private volatile long storeSize = 0L; private volatile long totalUncompressedBytes = 0L; private final Object flushLock = new Object(); @@ -212,8 +213,7 @@ public class HStore implements Store { // Setting up cache configuration for this family this.cacheConf = new CacheConfig(conf, family); - this.blockingStoreFileCount = - conf.getInt("hbase.hstore.blockingStoreFiles", 7); + this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); @@ -1234,22 +1234,13 @@ public class HStore implements Store { CompactionRequest ret = null; this.lock.readLock().lock(); try { + List candidates = Lists.newArrayList(storefiles); synchronized (filesCompacting) { - // candidates = all storefiles not already in compaction queue - List candidates = Lists.newArrayList(storefiles); - if (!filesCompacting.isEmpty()) { - // exclude all files older than the newest file we're currently - // compacting. this allows us to preserve contiguity (HBASE-2856) - StoreFile last = filesCompacting.get(filesCompacting.size() - 1); - int idx = candidates.indexOf(last); - Preconditions.checkArgument(idx != -1); - candidates.subList(0, idx + 1).clear(); - } - + // First we need to pre-select compaction, and then pre-compact selection! + candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting); boolean override = false; if (region.getCoprocessorHost() != null) { - override = region.getCoprocessorHost().preCompactSelection( - this, candidates); + override = region.getCoprocessorHost().preCompactSelection(this, candidates); } CompactSelection filesToCompact; if (override) { @@ -1807,12 +1798,14 @@ public class HStore implements Store { @Override public int getCompactPriority(int priority) { - // If this is a user-requested compaction, leave this at the highest priority - if(priority == Store.PRIORITY_USER) { - return Store.PRIORITY_USER; - } else { - return this.blockingStoreFileCount - this.storefiles.size(); + // If this is a user-requested compaction, leave this at the user priority + if (priority != Store.PRIORITY_USER) { + priority = this.compactionPolicy.getSystemCompactionPriority(this.storefiles); + if (priority == Store.PRIORITY_USER) { + ++priority; // System compactions cannot have user priority, make less important. + } } + return priority; } @Override @@ -1923,7 +1916,7 @@ public class HStore implements Store { @Override public boolean needsCompaction() { - return compactionPolicy.needsCompaction(storefiles.size() - filesCompacting.size()); + return compactionPolicy.needsCompaction(storefiles, filesCompacting); } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 283ef3a..ca7ce44 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -85,7 +85,7 @@ class MemStoreFlusher implements FlushRequester { "hbase.regionserver.global.memstore.upperLimit"; private static final String LOWER_KEY = "hbase.regionserver.global.memstore.lowerLimit"; - private long blockingStoreFilesNumber; + private int blockingStoreFileCount; private long blockingWaitTime; private final Counter updatesBlockedMsHighWater = new Counter(); @@ -112,8 +112,8 @@ class MemStoreFlusher implements FlushRequester { "because supplied " + LOWER_KEY + " was > " + UPPER_KEY); } this.globalMemStoreLimitLowMark = lower; - this.blockingStoreFilesNumber = - conf.getInt("hbase.hstore.blockingStoreFiles", 7); + this.blockingStoreFileCount = + conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1); @@ -482,7 +482,7 @@ class MemStoreFlusher implements FlushRequester { private boolean isTooManyStoreFiles(HRegion region) { for (Store hstore : region.stores.values()) { - if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { + if (hstore.getStorefilesCount() > this.blockingStoreFileCount) { return true; } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 4d43bab..83ddfaf 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; /** @@ -63,6 +64,7 @@ public class CompactionConfiguration { boolean shouldDeleteExpired; long majorCompactionPeriod; float majorCompactionJitter; + int blockingStoreFileCount; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -93,6 +95,8 @@ public class CompactionConfiguration { shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true); majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24); majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F); + blockingStoreFileCount = + conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); LOG.info("Compaction configuration " + this.toString()); } @@ -117,6 +121,13 @@ public class CompactionConfiguration { } /** + * @return store file count that will cause the memstore of this store to be blocked. + */ + int getBlockingStorefileCount() { + return this.blockingStoreFileCount; + } + + /** * @return lower bound below which compaction is selected without ratio test */ long getMinCompactSize() { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java index 63e5a59..f233d73 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; +import java.util.Collection; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -53,6 +54,16 @@ public abstract class CompactionPolicy extends Configured { HStore store; /** + * This is called before coprocessor preCompactSelection and should filter the candidates + * for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time. + * @param candidateFiles candidate files, ordered from oldest to newest + * @param filesCompacting files currently compacting + * @return the list of files that can theoretically be compacted. + */ + public abstract List preSelectCompaction( + List candidateFiles, final List filesCompacting); + + /** * @param candidateFiles candidate files, ordered from oldest to newest * @return subset copy of candidate list that meets compaction criteria * @throws java.io.IOException @@ -62,6 +73,16 @@ public abstract class CompactionPolicy extends Configured { final boolean forceMajor) throws IOException; /** + * @param storeFiles Store files in the store. + * @return The system compaction priority of the store, based on storeFiles. + * The priority range is as such - the smaller values are higher priority; + * 1 is user priority; only very important, blocking compactions should use + * values lower than that. With default settings, depending on the number of + * store files, the non-blocking priority will be in 2-6 range. + */ + public abstract int getSystemCompactionPriority(final Collection storeFiles); + + /** * @param filesToCompact Files to compact. Can be null. * @return True if we should run a major compaction. */ @@ -75,10 +96,12 @@ public abstract class CompactionPolicy extends Configured { public abstract boolean throttleCompaction(long compactionSize); /** - * @param numCandidates Number of candidate store files + * @param storeFiles Current store files. + * @param filesCompacting files currently compacting. * @return whether a compactionSelection is possible */ - public abstract boolean needsCompaction(int numCandidates); + public abstract boolean needsCompaction(final Collection storeFiles, + final List filesCompacting); /** * Inform the policy that some configuration has been change, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java index ddac105..b831787 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.util.ArrayList; import java.util.Calendar; +import java.util.Collection; import java.util.GregorianCalendar; import java.util.List; import java.util.Random; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Collections2; @@ -52,6 +54,26 @@ public class DefaultCompactionPolicy extends CompactionPolicy { compactor = new DefaultCompactor(this); } + @Override + public List preSelectCompaction( + List candidateFiles, final List filesCompacting) { + // candidates = all storefiles not already in compaction queue + if (!filesCompacting.isEmpty()) { + // exclude all files older than the newest file we're currently + // compacting. this allows us to preserve contiguity (HBASE-2856) + StoreFile last = filesCompacting.get(filesCompacting.size() - 1); + int idx = candidateFiles.indexOf(last); + Preconditions.checkArgument(idx != -1); + candidateFiles.subList(0, idx + 1).clear(); + } + return candidateFiles; + } + + @Override + public int getSystemCompactionPriority(final Collection storeFiles) { + return this.comConf.getBlockingStorefileCount() - storeFiles.size(); + } + /** * @param candidateFiles candidate files, ordered from oldest to newest * @return subset copy of candidate list that meets compaction criteria @@ -366,11 +388,10 @@ public class DefaultCompactionPolicy extends CompactionPolicy { return compactionSize > comConf.getThrottlePoint(); } - /** - * @param numCandidates Number of candidate store files - * @return whether a compactionSelection is possible - */ - public boolean needsCompaction(int numCandidates) { + @Override + public boolean needsCompaction(final Collection storeFiles, + final List filesCompacting) { + int numCandidates = storeFiles.size() - filesCompacting.size(); return numCandidates > comConf.getMinFilesToCompact(); }