Index: conf/hbase-default.xml
===================================================================
--- conf/hbase-default.xml (revision 690123)
+++ conf/hbase-default.xml (working copy)
@@ -245,6 +245,18 @@
+ hbase.hstore.compaction.max
+ 10
+ Max number of HStoreFiles to compact at one time while doing a minor compaction.
+
+
+
+ hbase.hregion.majorcompaction
+ 86400000
+ The time (in miliseconds) between compactions of all HStoreFiles in to one default 1 day.
+
+
+
hbase.regionserver.thread.splitcompactcheckfrequency
20000
How often a region server runs the split/compaction check.
Index: src/java/org/apache/hadoop/hbase/regionserver/HStore.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 690123)
+++ src/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy)
@@ -86,7 +86,8 @@
final FileSystem fs;
private final HBaseConfiguration conf;
protected long ttl;
-
+ private long majorCompactionTime;
+ private int maxFilesToCompact;
private final long desiredMaxFileSize;
private volatile long storeSize;
@@ -187,6 +188,8 @@
}
this.desiredMaxFileSize = maxFileSize;
+ this.majorCompactionTime = conf.getLong("hbase.hregion.majorcompaction", 86400000);
+ this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
this.storeSize = 0L;
if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) {
@@ -708,7 +711,29 @@
}
return false;
}
-
+
+ /*
+ * Gets lowest timestamp from files in a dir
+ *
+ * @param fs
+ * @param dir
+ * @throws IOException
+ */
+ private static long getLowestTimestamp(FileSystem fs, Path dir) throws IOException {
+ FileStatus[] stats = fs.listStatus(dir);
+ if (stats == null || stats.length == 0) {
+ return 0l;
+ }
+ long lowTimestamp = Long.MAX_VALUE;
+ for (int i = 0; i < stats.length; i++) {
+ long timestamp = stats[i].getModificationTime();
+ if (timestamp < lowTimestamp){
+ lowTimestamp = timestamp;
+ }
+ }
+ return lowTimestamp;
+ }
+
/**
* Compact the back-HStores. This method may take some time, so the calling
* thread must be able to block for long periods.
@@ -725,12 +750,12 @@
* We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
- * @param force True to force a compaction regardless of thresholds (Needed
+ * @param majorCompaction True to force a compaction regardless of thresholds (Needed
* by merge).
* @return mid key if a split is needed, null otherwise
* @throws IOException
*/
- StoreSize compact(final boolean force) throws IOException {
+ StoreSize compact(boolean majorCompaction) throws IOException {
synchronized (compactLock) {
long maxId = -1;
int nrows = -1;
@@ -740,12 +765,28 @@
return null;
}
filesToCompact = new ArrayList(this.storefiles.values());
-
+
// The max-sequenceID in any of the to-be-compacted TreeMaps is the
// last key of storefiles.
maxId = this.storefiles.lastKey().longValue();
}
- if (!force && !hasReferences(filesToCompact) &&
+ // check to see if we need to do a major compaction on this region
+ // if so then change force to true to skip the incremental compacting below
+ // only check if force is not true if true major compaction will happen anyways
+ if (!majorCompaction){
+ Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(), family.getName());
+ long lowTimestamp = getLowestTimestamp(fs, mapdir);
+ if (LOG.isDebugEnabled() && lowTimestamp > 0l) {
+ LOG.debug("Hours sense last major compaction: " + (System.currentTimeMillis()-lowTimestamp)/3600000);
+ }
+ if (lowTimestamp < (System.currentTimeMillis() - majorCompactionTime) && lowTimestamp > 0l){
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Major compaction triggered on store: " + this.storeNameStr);
+ }
+ majorCompaction = true;
+ }
+ }
+ if (!majorCompaction && !hasReferences(filesToCompact) &&
filesToCompact.size() < compactionThreshold) {
return checkSplit();
}
@@ -771,13 +812,13 @@
fileSizes[i] = len;
totalSize += len;
}
- if (!force && !hasReferences(filesToCompact)) {
+ if (!majorCompaction && !hasReferences(filesToCompact)) {
// Here we select files for incremental compaction.
// The rule is: if the largest(oldest) one is more than twice the
// size of the second, skip the largest, and continue to next...,
// until we meet the compactionThreshold limit.
for (point = 0; point < compactionThreshold - 1; point++) {
- if (fileSizes[point] < fileSizes[point + 1] * 2) {
+ if (fileSizes[point] < fileSizes[point + 1] * 2 && maxFilesToCompact < (countOfFiles - point)) {
break;
}
skipped += fileSizes[point];
@@ -842,7 +883,7 @@
this.compression, this.family.isBloomfilter(), nrows);
writer.setIndexInterval(family.getMapFileIndexInterval());
try {
- compactHStoreFiles(writer, readers);
+ compactHStoreFiles(writer, readers, majorCompaction);
} finally {
writer.close();
}
@@ -869,7 +910,7 @@
* us to throw out deleted values or obsolete versions.
*/
private void compactHStoreFiles(final MapFile.Writer compactedOut,
- final List readers)
+ final List readers, final boolean majorCompaction)
throws IOException {
MapFile.Reader[] rdrs = readers.toArray(new MapFile.Reader[readers.size()]);
try {
@@ -927,7 +968,10 @@
timesSeen = 0;
}
- if (timesSeen <= family.getMaxVersions()) {
+ // added majorCompaction here to make sure all versions make it to
+ // the major compaction so we do not remove the wrong last versions
+ // this effected HBASE-826
+ if (timesSeen <= family.getMaxVersions() || !majorCompaction) {
// Keep old versions until we have maxVersions worth.
// Then just skip them.
if (sk.getRow().length != 0 && sk.getColumn().length != 0) {
Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 690123)
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -881,12 +881,12 @@
* conflicts with a region split, and that cannot happen because the region
* server does them sequentially and not in parallel.
*
- * @param force True to force a compaction regardless of thresholds (Needed
+ * @param majorCompaction True to force a compaction regardless of thresholds (Needed
* by merge).
* @return mid key if split is needed
* @throws IOException
*/
- private byte [] compactStores(final boolean force) throws IOException {
+ private byte [] compactStores(final boolean majorCompaction) throws IOException {
splitsAndClosesLock.readLock().lock();
try {
byte [] midKey = null;
@@ -909,7 +909,7 @@
doRegionCompactionPrep();
long maxSize = -1;
for (HStore store: stores.values()) {
- final HStore.StoreSize size = store.compact(force);
+ final HStore.StoreSize size = store.compact(majorCompaction);
if (size != null && size.getSize() > maxSize) {
maxSize = size.getSize();
midKey = size.getKey();