Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1041275) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -91,8 +93,10 @@ // ttl in milliseconds. protected long ttl; private long majorCompactionTime; + private final int minFilesToCompact; private final int maxFilesToCompact; private final long minCompactSize; + private final long maxCompactSize; // compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX // With float, java will downcast your long to float for comparisons (bad) private double compactRatio; @@ -119,7 +123,6 @@ new CopyOnWriteArraySet(); private final Object compactLock = new Object(); - private final int compactionThreshold; private final int blocksize; private final boolean blockcache; /** Compression algorithm for flush files and minor compaction */ @@ -177,10 +180,10 @@ this.memstore = new MemStore(this.comparator); this.storeNameStr = Bytes.toString(this.family.getName()); - // By default, we compact if an HStore has more than - // MIN_COMMITS_FOR_COMPACTION map files - this.compactionThreshold = Math.max(2, - conf.getInt("hbase.hstore.compactionThreshold", 3)); + // By default, compact if storefile.count >= minFilesToCompact + this.minFilesToCompact = Math.max(2, + conf.getInt("hbase.hstore.compaction.min", + /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3))); // Check if this is in-memory store this.inMemory = family.isInMemory(); @@ -198,7 +201,10 @@ this.majorCompactionTime = getNextMajorCompactTime(); this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10); - this.minCompactSize = this.region.memstoreFlushSize * 3 / 2; // +50% pad + this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size", + this.region.memstoreFlushSize); + this.maxCompactSize + = conf.getLong("hbase.hstore.compaction.max.size", 0); this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F); if (Store.closeCheckInterval == 0) { @@ -552,7 +558,7 @@ // Tell listeners of the change in readers. notifyChangedReadersObservers(); - return this.storefiles.size() >= this.compactionThreshold; + return this.storefiles.size() >= this.minFilesToCompact; } finally { this.lock.writeLock().unlock(); } @@ -609,129 +615,55 @@ */ StoreSize compact(final boolean forceMajor) throws IOException { boolean forceSplit = this.region.shouldSplit(false); - boolean majorcompaction = forceMajor; synchronized (compactLock) { - this.lastCompactSize = 0; + this.lastCompactSize = 0; // reset first in case compaction is aborted - // filesToCompact are sorted oldest to newest. - List filesToCompact = this.storefiles; - if (filesToCompact.isEmpty()) { - LOG.debug(this.storeNameStr + ": no store files to compact"); - return null; + // sanity checks + for (StoreFile sf : this.storefiles) { + if (sf.getPath() == null || sf.getReader() == null) { + boolean np = sf.getPath() == null; + LOG.debug("StoreFile " + sf + " has null " + (np ? "Path":"Reader")); + return null; + } } - // Check to see if we need to do a major compaction on this region. - // If so, change doMajorCompaction to true to skip the incremental - // compacting below. Only check if doMajorCompaction is not true. - if (!majorcompaction) { - majorcompaction = isMajorCompaction(filesToCompact); + // if the user wants to force a split, skip compaction unless necessary + boolean references = hasReferences(this.storefiles); + if (forceSplit && !forceMajor && !references) { + return checkSplit(forceSplit); } - boolean references = hasReferences(filesToCompact); - if (!majorcompaction && !references && - (forceSplit || (filesToCompact.size() < compactionThreshold))) { + Collection filesToCompact + = compactSelection(this.storefiles, forceMajor); + + // empty == do not compact + if (filesToCompact.isEmpty()) { + // but do see if we need to split before returning return checkSplit(forceSplit); } - /* get store file sizes for incremental compacting selection. - * normal skew: - * - * older ----> newer - * _ - * | | _ - * | | | | _ - * --|-|- |-|- |-|---_-------_------- minCompactSize - * | | | | | | | | _ | | - * | | | | | | | | | | | | - * | | | | | | | | | | | | - */ - int countOfFiles = filesToCompact.size(); - long [] fileSizes = new long[countOfFiles]; - long [] sumSize = new long[countOfFiles]; - for (int i = countOfFiles-1; i >= 0; --i) { - StoreFile file = filesToCompact.get(i); - Path path = file.getPath(); - if (path == null) { - LOG.error("Path is null for " + file); - return null; - } - StoreFile.Reader r = file.getReader(); - if (r == null) { - LOG.error("StoreFile " + file + " has a null Reader"); - return null; - } - fileSizes[i] = file.getReader().length(); - // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo - int tooFar = i + this.maxFilesToCompact - 1; - sumSize[i] = fileSizes[i] - + ((i+1 < countOfFiles) ? sumSize[i+1] : 0) - - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); - } - + // sum size of all files included in compaction long totalSize = 0; - if (!majorcompaction && !references) { - // we're doing a minor compaction, let's see what files are applicable - int start = 0; - double r = this.compactRatio; - - /* Start at the oldest file and stop when you find the first file that - * meets compaction criteria: - * (1) a recently-flushed, small file (i.e. <= minCompactSize) - * OR - * (2) within the compactRatio of sum(newer_files) - * Given normal skew, any newer files will also meet this criteria - * - * Additional Note: - * If fileSizes.size() >> maxFilesToCompact, we will recurse on - * compact(). Consider the oldest files first to avoid a - * situation where we always compact [end-threshold,end). Then, the - * last file becomes an aggregate of the previous compactions. - */ - while(countOfFiles - start >= this.compactionThreshold && - fileSizes[start] > - Math.max(minCompactSize, (long)(sumSize[start+1] * r))) { - ++start; - } - int end = Math.min(countOfFiles, start + this.maxFilesToCompact); - totalSize = fileSizes[start] - + ((start+1 < countOfFiles) ? sumSize[start+1] : 0); - - // if we don't have enough files to compact, just wait - if (end - start < this.compactionThreshold) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipped compaction of " + this.storeNameStr - + " because only " + (end - start) + " file(s) of size " - + StringUtils.humanReadableInt(totalSize) - + " meet compaction criteria."); - } - return checkSplit(forceSplit); - } - - if (0 == start && end == countOfFiles) { - // we decided all the files were candidates! major compact - majorcompaction = true; - } else { - filesToCompact = new ArrayList(filesToCompact.subList(start, - end)); - } - } else { - // all files included in this compaction - for (long i : fileSizes) { - totalSize += i; - } + for (StoreFile sf : filesToCompact) { + totalSize += sf.getReader().length(); } this.lastCompactSize = totalSize; + // major compaction iff all StoreFiles are included + boolean majorcompaction + = (filesToCompact.size() == this.storefiles.size()); + // Max-sequenceID is the last key in the files we're compacting long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact); // Ready to go. Have list of files to compact. LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in cf=" + this.storeNameStr + - (references? ", hasReferences=true,": " ") + " into " + + (hasReferences(filesToCompact)? ", hasReferences=true,": " ") + " into " + region.getTmpDir() + ", seqid=" + maxId + ", totalSize=" + StringUtils.humanReadableInt(totalSize)); - StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); + StoreFile.Writer writer + = compactStore(filesToCompact, majorcompaction, maxId); // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, writer); if (LOG.isInfoEnabled()) { @@ -761,7 +693,8 @@ boolean majorcompaction = (N == count); // Ready to go. Have list of files to compact. - StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); + StoreFile.Writer writer + = compactStore(filesToCompact, majorcompaction, maxId); // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, writer); } @@ -820,10 +753,10 @@ if (filesToCompact == null || filesToCompact.isEmpty() || majorCompactionTime == 0) { return result; - } + } // TODO: Use better method for determining stamp of last major (HBASE-2990) long lowTimestamp = getLowestTimestamp(fs, - filesToCompact.get(0).getPath().getParent()); + filesToCompact.get(0).getPath().getParent()); long now = System.currentTimeMillis(); if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) { // Major compaction time has elapsed. @@ -842,7 +775,7 @@ } else { if (LOG.isDebugEnabled()) { LOG.debug("Major compaction triggered on store " + this.storeNameStr + - "; time since last major compaction " + (now - lowTimestamp) + "ms"); + "; time since last major compaction " + (now - lowTimestamp) + "ms"); } result = true; this.majorCompactionTime = getNextMajorCompactTime(); @@ -873,8 +806,150 @@ } /** - * Do a minor/major compaction. Uses the scan infrastructure to make it easy. + * Algorithm to choose which files to compact * + * Configuration knobs: + * "hbase.hstore.compaction.ratio" + * normal case: minor compact when file <= sum(smaller_files) * ratio + * "hbase.hstore.compaction.min.size" + * unconditionally compact individual files below this size + * "hbase.hstore.compaction.max.size" + * never compact individual files above this size (unless splitting) + * "hbase.hstore.compaction.min" + * min files needed to minor compact + * "hbase.hstore.compaction.max" + * max files to compact at once (avoids OOM) + * + * @param candidates candidate files, ordered from oldest to newest + * @param majorcompaction whether to force a major compaction + * @return subset copy of candidate list that meets compaction criteria + * @throws IOException + */ + List compactSelection(List candidates, + boolean forcemajor) throws IOException { + /* normal skew: + * + * older ----> newer + * _ + * | | _ + * | | | | _ + * --|-|- |-|- |-|---_-------_------- minCompactSize + * | | | | | | | | _ | | + * | | | | | | | | | | | | + * | | | | | | | | | | | | + */ + List filesToCompact = new ArrayList(candidates); + + // Do not compact files above a configurable max filesize unless they are + // references. We MUST compact these + if (this.maxCompactSize > 0) { + final long msize = this.maxCompactSize; + filesToCompact.removeAll(Collections2.filter(filesToCompact, + new Predicate() { + public boolean apply(StoreFile sf) { + // NOTE: keep all references. we must compact them + return sf.getReader().length() > msize && !sf.isReference(); + } + })); + } + + // major compact on user action or age (caveat: we have too many files) + boolean majorcompaction = forcemajor || + (isMajorCompaction(filesToCompact) && + filesToCompact.size() > this.maxFilesToCompact); + + if (filesToCompact.isEmpty()) { + LOG.debug(this.storeNameStr + ": no store files to compact"); + return filesToCompact; + } + + if (!majorcompaction && !hasReferences(filesToCompact)) { + // we're doing a minor compaction, let's see what files are applicable + int start = 0; + double r = this.compactRatio; + + // Sort files by size to correct when normal skew is altered by bulk load. + // + // So, technically, order is important for optimizations like the TimeStamp + // filter. However, realistically this isn't a problem because our normal + // skew always decreases in filesize over time. The only place where our + // skew doesn't decrease is for files that have been recently flushed. + // However, all those will be unconditionally compacted because they will + // be lower than "hbase.hstore.compaction.min.size". + // + // The sorting is to handle an interesting issue that popped up for us + // during migration: we're bulk loading StoreFiles of extremely variable + // size (are we migrating 1k users or 10M?) and they will all appear at + // the end of the StoreFile list. How do we determine when it is + // efficient to compact them? The easiest option was to sort the compact + // list and handle bulk files by relative size instead of making some + // custom compaction selection algorithm just for bulk inclusion. It + // seems like any other companies that will incrementally migrate data + // into HBase would hit the same issue. Nicolas. + // + Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE); + + // get store file sizes for incremental compacting selection. + int countOfFiles = filesToCompact.size(); + long [] fileSizes = new long[countOfFiles]; + long [] sumSize = new long[countOfFiles]; + for (int i = countOfFiles-1; i >= 0; --i) { + StoreFile file = filesToCompact.get(i); + fileSizes[i] = file.getReader().length(); + // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo + int tooFar = i + this.maxFilesToCompact - 1; + sumSize[i] = fileSizes[i] + + ((i+1 < countOfFiles) ? sumSize[i+1] : 0) + - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); + } + + /* Start at the oldest file and stop when you find the first file that + * meets compaction criteria: + * (1) a recently-flushed, small file (i.e. <= minCompactSize) + * OR + * (2) within the compactRatio of sum(newer_files) + * Given normal skew, any newer files will also meet this criteria + * + * Additional Note: + * If fileSizes.size() >> maxFilesToCompact, we will recurse on + * compact(). Consider the oldest files first to avoid a + * situation where we always compact [end-threshold,end). Then, the + * last file becomes an aggregate of the previous compactions. + */ + while(countOfFiles - start >= this.minFilesToCompact && + fileSizes[start] > + Math.max(minCompactSize, (long)(sumSize[start+1] * r))) { + ++start; + } + int end = Math.min(countOfFiles, start + this.maxFilesToCompact); + long totalSize = fileSizes[start] + + ((start+1 < countOfFiles) ? sumSize[start+1] : 0); + filesToCompact = filesToCompact.subList(start, end); + + // if we don't have enough files to compact, just wait + if (filesToCompact.size() < this.minFilesToCompact) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipped compaction of " + this.storeNameStr + + ". Only " + (end - start) + " file(s) of size " + + StringUtils.humanReadableInt(totalSize) + + " have met compaction criteria."); + } + return Collections.emptyList(); + } + } else { + // all files included in this compaction, up to max + if (filesToCompact.size() > this.maxFilesToCompact) { + int pastMax = filesToCompact.size() - this.maxFilesToCompact; + filesToCompact.subList(0, pastMax).clear(); + } + } + return filesToCompact; + } + + /** + * Do a minor/major compaction on an explicit set of storefiles in a Store. + * Uses the scan infrastructure to make it easy. + * * @param filesToCompact which files to compact * @param majorCompaction true to major compact (prune all deletes, max versions, etc) * @param maxId Readers maximum sequence id. @@ -882,7 +957,7 @@ * nothing made it through the compaction. * @throws IOException */ - private StoreFile.Writer compact(final List filesToCompact, + private StoreFile.Writer compactStore(final Collection filesToCompact, final boolean majorCompaction, final long maxId) throws IOException { // calculate maximum key count after compaction (for blooms) @@ -987,7 +1062,7 @@ * @return StoreFile created. May be null. * @throws IOException */ - private StoreFile completeCompaction(final List compactedFiles, + private StoreFile completeCompaction(final Collection compactedFiles, final StoreFile.Writer compactedFile) throws IOException { // 1. Moving the new files into place -- if there is a new file (may not @@ -1521,15 +1596,15 @@ /** * See if there's too much store files in this store * @return true if number of store files is greater than - * the number defined in compactionThreshold + * the number defined in minFilesToCompact */ public boolean hasTooManyStoreFiles() { - return this.storefiles.size() > this.compactionThreshold; + return this.storefiles.size() > this.minFilesToCompact; } public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + (15 * ClassSize.REFERENCE) + - (6 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) + + (7 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) + (4 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1041275) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.text.NumberFormat; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -303,7 +304,7 @@ * @return 0 if no non-bulk-load files are provided or, this is Store that * does not yet have any store files. */ - public static long getMaxSequenceIdInList(List sfs) { + public static long getMaxSequenceIdInList(Collection sfs) { long max = 0; for (StoreFile sf : sfs) { if (!sf.isBulkLoadResult()) { @@ -909,6 +910,13 @@ bloomFilterType = BloomType.NONE; } + /** + * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS + */ + Reader() { + this.reader = null; + } + public RawComparator getComparator() { return reader.getComparator(); } @@ -1132,5 +1140,15 @@ } } + /** + * FILE_SIZE = descending sort StoreFiles (largest --> smallest in size) + */ + static final Comparator FILE_SIZE = + Ordering.natural().reverse().onResultOf(new Function() { + @Override + public Long apply(StoreFile sf) { + return sf.getReader().length(); + } + }); } }