Index: conf/hbase-default.xml =================================================================== --- conf/hbase-default.xml (revision 690123) +++ conf/hbase-default.xml (working copy) @@ -245,6 +245,18 @@ + hbase.hstore.compaction.max + 10 + Max number of HStoreFiles to compact at one time while doing a minor compaction. + + + + hbase.hregion.majorcompaction + 86400000 + The time (in miliseconds) between compactions of all HStoreFiles in to one default 1 day. + + + hbase.regionserver.thread.splitcompactcheckfrequency 20000 How often a region server runs the split/compaction check. Index: src/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 690123) +++ src/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -86,7 +86,8 @@ final FileSystem fs; private final HBaseConfiguration conf; protected long ttl; - + private long majorCompactionTime; + private int maxFilesToCompact; private final long desiredMaxFileSize; private volatile long storeSize; @@ -187,6 +188,8 @@ } this.desiredMaxFileSize = maxFileSize; + this.majorCompactionTime = conf.getLong("hbase.hregion.majorcompaction", 86400000); + this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10); this.storeSize = 0L; if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) { @@ -708,7 +711,29 @@ } return false; } - + + /* + * Gets lowest timestamp from files in a dir + * + * @param fs + * @param dir + * @throws IOException + */ + private static long getLowestTimestamp(FileSystem fs, Path dir) throws IOException { + FileStatus[] stats = fs.listStatus(dir); + if (stats == null || stats.length == 0) { + return 0l; + } + long lowTimestamp = Long.MAX_VALUE; + for (int i = 0; i < stats.length; i++) { + long timestamp = stats[i].getModificationTime(); + if (timestamp < lowTimestamp){ + lowTimestamp = timestamp; + } + } + return lowTimestamp; + } + /** * Compact the back-HStores. This method may take some time, so the calling * thread must be able to block for long periods. @@ -725,12 +750,12 @@ * We don't want to hold the structureLock for the whole time, as a compact() * can be lengthy and we want to allow cache-flushes during this period. * - * @param force True to force a compaction regardless of thresholds (Needed + * @param majorCompaction True to force a compaction regardless of thresholds (Needed * by merge). * @return mid key if a split is needed, null otherwise * @throws IOException */ - StoreSize compact(final boolean force) throws IOException { + StoreSize compact(boolean majorCompaction) throws IOException { synchronized (compactLock) { long maxId = -1; int nrows = -1; @@ -740,12 +765,28 @@ return null; } filesToCompact = new ArrayList(this.storefiles.values()); - + // The max-sequenceID in any of the to-be-compacted TreeMaps is the // last key of storefiles. maxId = this.storefiles.lastKey().longValue(); } - if (!force && !hasReferences(filesToCompact) && + // check to see if we need to do a major compaction on this region + // if so then change force to true to skip the incremental compacting below + // only check if force is not true if true major compaction will happen anyways + if (!majorCompaction){ + Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(), family.getName()); + long lowTimestamp = getLowestTimestamp(fs, mapdir); + if (LOG.isDebugEnabled() && lowTimestamp > 0l) { + LOG.debug("Hours sense last major compaction: " + (System.currentTimeMillis()-lowTimestamp)/3600000); + } + if (lowTimestamp < (System.currentTimeMillis() - majorCompactionTime) && lowTimestamp > 0l){ + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store: " + this.storeNameStr); + } + majorCompaction = true; + } + } + if (!majorCompaction && !hasReferences(filesToCompact) && filesToCompact.size() < compactionThreshold) { return checkSplit(); } @@ -771,13 +812,13 @@ fileSizes[i] = len; totalSize += len; } - if (!force && !hasReferences(filesToCompact)) { + if (!majorCompaction && !hasReferences(filesToCompact)) { // Here we select files for incremental compaction. // The rule is: if the largest(oldest) one is more than twice the // size of the second, skip the largest, and continue to next..., // until we meet the compactionThreshold limit. for (point = 0; point < compactionThreshold - 1; point++) { - if (fileSizes[point] < fileSizes[point + 1] * 2) { + if (fileSizes[point] < fileSizes[point + 1] * 2 && maxFilesToCompact < (countOfFiles - point)) { break; } skipped += fileSizes[point]; @@ -842,7 +883,7 @@ this.compression, this.family.isBloomfilter(), nrows); writer.setIndexInterval(family.getMapFileIndexInterval()); try { - compactHStoreFiles(writer, readers); + compactHStoreFiles(writer, readers, majorCompaction); } finally { writer.close(); } @@ -869,7 +910,7 @@ * us to throw out deleted values or obsolete versions. */ private void compactHStoreFiles(final MapFile.Writer compactedOut, - final List readers) + final List readers, final boolean majorCompaction) throws IOException { MapFile.Reader[] rdrs = readers.toArray(new MapFile.Reader[readers.size()]); try { @@ -927,7 +968,10 @@ timesSeen = 0; } - if (timesSeen <= family.getMaxVersions()) { + // added majorCompaction here to make sure all versions make it to + // the major compaction so we do not remove the wrong last versions + // this effected HBASE-826 + if (timesSeen <= family.getMaxVersions() || !majorCompaction) { // Keep old versions until we have maxVersions worth. // Then just skip them. if (sk.getRow().length != 0 && sk.getColumn().length != 0) { Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 690123) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -881,12 +881,12 @@ * conflicts with a region split, and that cannot happen because the region * server does them sequentially and not in parallel. * - * @param force True to force a compaction regardless of thresholds (Needed + * @param majorCompaction True to force a compaction regardless of thresholds (Needed * by merge). * @return mid key if split is needed * @throws IOException */ - private byte [] compactStores(final boolean force) throws IOException { + private byte [] compactStores(final boolean majorCompaction) throws IOException { splitsAndClosesLock.readLock().lock(); try { byte [] midKey = null; @@ -909,7 +909,7 @@ doRegionCompactionPrep(); long maxSize = -1; for (HStore store: stores.values()) { - final HStore.StoreSize size = store.compact(force); + final HStore.StoreSize size = store.compact(majorCompaction); if (size != null && size.getSize() > maxSize) { maxSize = size.getSize(); midKey = size.getKey();