diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index b27679c..abda87c 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -353,6 +353,11 @@ public final class HConstants { /** Default value for cluster ID */ public static final String CLUSTER_ID_DEFAULT = "default-cluster"; + + /** Parameter name for # days to keep MVCC values during a major compaction */ + public static final String KEEP_MVCC_PERIOD = "hbase.hstore.compaction.keep.mvcc.period"; + /** At least to keep MVCC values in hfiles for 5 days */ + public static final int MIN_KEEP_MVCC_PERIOD = 5; // Always store the location of the root table's HRegion. // This HRegion is never split. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index f0f92ae..1b07594 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -211,15 +211,6 @@ public class StoreFileScanner implements KeyValueScanner { return false; } - // For the optimisation in HBASE-4346, we set the KV's memstoreTS to - // 0, if it is older than all the scanners' read points. It is possible - // that a newer KV's memstoreTS was reset to 0. But, there is an - // older KV which was not reset to 0 (because it was - // not old enough during flush). Make sure that we set it correctly now, - // so that the comparision order does not change. - if (cur.getMvccVersion() <= readPt) { - KeyValueUtil.ensureKeyValue(cur).setMvccVersion(0); - } return true; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 9e792c4..744a25b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -58,6 +58,9 @@ public abstract class Compactor { private int compactionKVMax; protected Compression.Algorithm compactionCompression; + + /** specify how many days to keep MVCC values during major compaction **/ + protected int keepMVCCPeriod; //TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(final Configuration conf, final Store store) { @@ -67,6 +70,8 @@ public abstract class Compactor { this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); this.compactionCompression = (this.store.getFamily() == null) ? Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression(); + this.keepMVCCPeriod = Math.max(this.conf.getInt(HConstants.KEEP_MVCC_PERIOD, + HConstants.MIN_KEEP_MVCC_PERIOD), HConstants.MIN_KEEP_MVCC_PERIOD); } /** @@ -92,19 +97,30 @@ public abstract class Compactor { public long maxMVCCReadpoint = 0; /** Max tags length**/ public int maxTagsLength = 0; + /** Min MVCC to keep during a major compaction **/ + public long minMVCCToKeep = 0; } /** * Extracts some details about the files to compact that are commonly needed by compactors. * @param filesToCompact Files. - * @param calculatePutTs Whether earliest put TS is needed. + * @param isAllFiles Whether all files are included for compaction * @return The result. */ protected FileDetails getFileDetails( - Collection filesToCompact, boolean calculatePutTs) throws IOException { + Collection filesToCompact, boolean isAllFiles) throws IOException { FileDetails fd = new FileDetails(); + long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - + (this.keepMVCCPeriod * 24 * 60 * 60 * 1000); for (StoreFile file : filesToCompact) { + if(isAllFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) { + // when isAllFiles is true, all files are compacted so we can calculate the smallest + // MVCC value to keep + if(fd.minMVCCToKeep < file.getMaxMemstoreTS()) { + fd.minMVCCToKeep = file.getMaxMemstoreTS(); + } + } long seqNum = file.getMaxSequenceId(); fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); StoreFile.Reader r = file.getReader(); @@ -130,7 +146,7 @@ public abstract class Compactor { // If required, calculate the earliest put timestamp of all involved storefiles. // This is used to remove family delete marker during compaction. long earliestPutTs = 0; - if (calculatePutTs) { + if (isAllFiles) { tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS); if (tmp == null) { // There's a file with no information, must be an old one @@ -148,7 +164,7 @@ public abstract class Compactor { ", size=" + StringUtils.humanReadableInt(r.length()) + ", encoding=" + r.getHFileReader().getDataBlockEncoding() + ", seqNum=" + seqNum + - (calculatePutTs ? ", earliestPutTs=" + earliestPutTs: "")); + (isAllFiles ? ", earliestPutTs=" + earliestPutTs: "")); } } return fd; @@ -202,10 +218,11 @@ public abstract class Compactor { * @param scanner Where to read from. * @param writer Where to write to. * @param smallestReadPoint Smallest read point. + * @param cleanMVCC When true, remove mvcc value which is <= smallestReadPoint * @return Whether compaction ended; false if it was interrupted for some reason. */ protected boolean performCompaction(InternalScanner scanner, - CellSink writer, long smallestReadPoint) throws IOException { + CellSink writer, long smallestReadPoint, boolean cleanMVCC) throws IOException { int bytesWritten = 0; // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. @@ -218,7 +235,7 @@ public abstract class Compactor { // output to writer: for (Cell c : kvs) { KeyValue kv = KeyValueUtil.ensureKeyValue(c); - if (kv.getMvccVersion() <= smallestReadPoint) { + if (cleanMVCC && kv.getMvccVersion() <= smallestReadPoint) { kv.setMvccVersion(0); } writer.append(kv); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 3e8523d..cbd29d6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -54,6 +54,7 @@ public class DefaultCompactor extends Compactor { StoreFile.Writer writer = null; List newFiles = new ArrayList(); + boolean cleanMVCC = false; try { InternalScanner scanner = null; try { @@ -71,9 +72,13 @@ public class DefaultCompactor extends Compactor { } // Create the writer even if no kv(Empty store file is also ok), // because we need record the max seq id for the store file, see HBASE-6059 + if(fd.minMVCCToKeep > 0) { + smallestReadPoint = Math.min(fd.minMVCCToKeep, smallestReadPoint); + cleanMVCC = true; + } writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); - boolean finished = performCompaction(scanner, writer, smallestReadPoint); + boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanMVCC); if (!finished) { writer.close(); store.getFileSystem().delete(writer.getPath(), false); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 11556e5..ee957ea 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -90,6 +90,7 @@ public class StripeCompactor extends Compactor { boolean finished = false; InternalScanner scanner = null; + boolean cleanMVCC = false; try { // Get scanner to use. ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES; @@ -108,6 +109,10 @@ public class StripeCompactor extends Compactor { } // Create the writer factory for compactions. + if(fd.minMVCCToKeep > 0) { + smallestReadPoint = Math.min(fd.minMVCCToKeep, smallestReadPoint); + cleanMVCC = true; + } final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint; final Compression.Algorithm compression = store.getFamily().getCompactionCompression(); StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() { @@ -122,7 +127,7 @@ public class StripeCompactor extends Compactor { // It is ok here if storeScanner is null. StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; mw.init(storeScanner, factory, store.getComparator()); - finished = performCompaction(scanner, mw, smallestReadPoint); + finished = performCompaction(scanner, mw, smallestReadPoint, cleanMVCC); if (!finished) { throw new InterruptedIOException( "Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 3824294..70044f6 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -3008,7 +3008,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } } - private static String safeGetAsStr(List lst, int i) { + public static String safeGetAsStr(List lst, int i) { if (0 <= i && i < lst.size()) { return lst.get(i).toString(); } else { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java index 988d82f..450dd82 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.HBaseTestingUtility.assertKVListsEqual; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -449,5 +448,32 @@ public class TestSeekOptimizations { } + public void assertKVListsEqual(String additionalMsg, + final List expected, + final List actual) { + final int eLen = expected.size(); + final int aLen = actual.size(); + final int minLen = Math.min(eLen, aLen); + + int i; + for (i = 0; i < minLen + && KeyValue.COMPARATOR.compareOnlyKeyPortion(expected.get(i), actual.get(i)) == 0; + ++i) {} + + if (additionalMsg == null) { + additionalMsg = ""; + } + if (!additionalMsg.isEmpty()) { + additionalMsg = ". " + additionalMsg; + } + + if (eLen != aLen || i != minLen) { + throw new AssertionError( + "Expected and actual KV arrays differ at position " + i + ": " + + HBaseTestingUtility.safeGetAsStr(expected, i) + " (length " + eLen +") vs. " + + HBaseTestingUtility.safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg); + } + } + }