Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (revision 1459433) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (working copy) @@ -347,7 +347,7 @@ HRegion region = new HRegion(outputDir, null, fs, conf, regionInfo, htd, null); HStore store = new HStore(region, columnDescriptor, conf); - StoreFile.Writer writer = store.createWriterInTmp(maxKeyCount, compression, false); + StoreFile.Writer writer = store.createWriterInTmp(maxKeyCount, compression, false, true); StatisticsPrinter statsPrinter = new StatisticsPrinter(); statsPrinter.startThread(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java (revision 1459433) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java (working copy) @@ -198,7 +198,7 @@ public void testCacheOnWriteInSchema() throws IOException { // Write some random data into the store StoreFile.Writer writer = store.createWriterInTmp(Integer.MAX_VALUE, - HFile.DEFAULT_COMPRESSION_ALGORITHM, false); + HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true); writeStoreFile(writer); writer.close(); // Verify the block types of interest were cached on write Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 1459433) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy) @@ -178,7 +178,7 @@ init(getName(), conf, hcd); // Test createWriterInTmp() - StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false); + StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true); Path path = writer.getPath(); writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1459433) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -827,7 +827,7 @@ */ private StoreFile.Writer createWriterInTmp(long maxKeyCount) throws IOException { - return createWriterInTmp(maxKeyCount, this.family.getCompression(), false); + return createWriterInTmp(maxKeyCount, this.family.getCompression(), false, true); } /* @@ -837,7 +837,7 @@ * @return Writer for a new StoreFile in the tmp dir. */ public StoreFile.Writer createWriterInTmp(long maxKeyCount, - Compression.Algorithm compression, boolean isCompaction) + Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint) throws IOException { final CacheConfig writerCacheConf; if (isCompaction) { @@ -857,6 +857,7 @@ .withChecksumType(checksumType) .withBytesPerChecksum(bytesPerChecksum) .withCompression(compression) + .includeMVCCReadpoint(includeMVCCReadpoint) .build(); return w; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1459433) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -143,10 +143,11 @@ * @param maxKeyCount * @param compression Compression algorithm to use * @param isCompaction whether we are creating a new file in a compaction + * @param includeMVCCReadpoint whether we should out the MVCC readpoint * @return Writer for a new StoreFile in the tmp dir. */ - public StoreFile.Writer createWriterInTmp(long maxKeyCount, - Compression.Algorithm compression, boolean isCompaction) throws IOException; + public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, + boolean isCompaction, boolean includeMVCCReadpoint) throws IOException; // Compaction oriented methods Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1459433) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -530,6 +530,7 @@ private Path filePath; private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE; private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM; + private boolean includeMVCCReadpoint = true; public WriterBuilder(Configuration conf, CacheConfig cacheConf, FileSystem fs, int blockSize) { @@ -615,6 +616,15 @@ } /** + * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV + * @return this (for chained invocation) + */ + public WriterBuilder includeMVCCReadpoint(boolean includeMVCCReadpoint) { + this.includeMVCCReadpoint = includeMVCCReadpoint; + return this; + } + + /** * Create a store file writer. Client is responsible for closing file when * done. If metadata, add BEFORE closing using * {@link Writer#appendMetadata}. @@ -648,7 +658,7 @@ } return new Writer(fs, filePath, blockSize, compressAlgo, dataBlockEncoder, conf, cacheConf, comparator, bloomType, maxKeyCount, checksumType, - bytesPerChecksum); + bytesPerChecksum, includeMVCCReadpoint); } } @@ -752,6 +762,7 @@ * for Bloom filter size in {@link HFile} format version 1. * @param checksumType the checksum type * @param bytesPerChecksum the number of bytes per checksum value + * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV * @throws IOException problem writing to FS */ private Writer(FileSystem fs, Path path, int blocksize, @@ -759,8 +770,8 @@ HFileDataBlockEncoder dataBlockEncoder, final Configuration conf, CacheConfig cacheConf, final KVComparator comparator, BloomType bloomType, long maxKeys, - final ChecksumType checksumType, final int bytesPerChecksum) - throws IOException { + final ChecksumType checksumType, final int bytesPerChecksum, + final boolean includeMVCCReadpoint) throws IOException { this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; writer = HFile.getWriterFactory(conf, cacheConf) @@ -771,6 +782,7 @@ .withComparator(comparator.getRawComparator()) .withChecksumType(checksumType) .withBytesPerChecksum(bytesPerChecksum) + .includeMVCCReadpoint(includeMVCCReadpoint) .create(); this.kvComparator = comparator; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (revision 1459433) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (working copy) @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.CellOutputStream; import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; @@ -111,6 +113,8 @@ public long earliestPutTs = HConstants.LATEST_TIMESTAMP; /** The last key in the files we're compacting. */ public long maxSeqId = 0; + /** Latest memstore read point found in any of the involved files */ + public long maxMVCCReadpoint = 0; } protected FileDetails getFileDetails( @@ -130,11 +134,17 @@ long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType()) ? r.getFilterEntries() : r.getEntries(); fd.maxKeyCount += keyCount; + // calculate the latest MVCC readpoint in any of the involved store files + Map fileInfo = r.loadFileInfo(); + byte tmp[] = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); + if (tmp != null) { + fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); + } // If required, calculate the earliest put timestamp of all involved storefiles. // This is used to remove family delete marker during compaction. long earliestPutTs = 0; if (calculatePutTs) { - byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS); + tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS); if (tmp == null) { // There's a file with no information, must be an old one // assume we have very old puts Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (revision 1459433) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (working copy) @@ -20,36 +20,22 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; -import org.apache.hadoop.hbase.regionserver.StoreScanner; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; /** * Compact passed set of files. Create an instance and then call {@link #compact(CompactionRequest)} */ @InterfaceAudience.Private public class DefaultCompactor extends Compactor { - private static final Log LOG = LogFactory.getLog(DefaultCompactor.class); - public DefaultCompactor(final Configuration conf, final Store store) { super(conf, store); } @@ -84,7 +70,8 @@ } // Create the writer even if no kv(Empty store file is also ok), // because we need record the max seq id for the store file, see HBASE-6059 - writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true); + writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, + fd.maxMVCCReadpoint >= smallestReadPoint); boolean finished = performCompaction(scanner, writer, smallestReadPoint); if (!finished) { abortWriter(writer); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 1459433) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy) @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; import org.apache.hadoop.hbase.protobuf.generated.HFileProtos; +import org.apache.hadoop.hbase.regionserver.StoreFile.WriterBuilder; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; @@ -338,6 +339,7 @@ protected KeyComparator comparator; protected ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE; protected int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM; + protected boolean includeMVCCReadpoint = true; WriterFactory(Configuration conf, CacheConfig cacheConf) { this.conf = conf; @@ -398,6 +400,15 @@ return this; } + /** + * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV + * @return this (for chained invocation) + */ + public WriterFactory includeMVCCReadpoint(boolean includeMVCCReadpoint) { + this.includeMVCCReadpoint = includeMVCCReadpoint; + return this; + } + public Writer create() throws IOException { if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) { throw new AssertionError("Please specify exactly one of " + @@ -407,7 +418,7 @@ ostream = AbstractHFileWriter.createOutputStream(conf, fs, path); } return createWriter(fs, path, ostream, blockSize, - compression, encoder, comparator, checksumType, bytesPerChecksum); + compression, encoder, comparator, checksumType, bytesPerChecksum, includeMVCCReadpoint); } protected abstract Writer createWriter(FileSystem fs, Path path, @@ -415,7 +426,7 @@ Compression.Algorithm compress, HFileDataBlockEncoder dataBlockEncoder, KeyComparator comparator, ChecksumType checksumType, - int bytesPerChecksum) throws IOException; + int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException; } /** The configuration key for HFile version to use for new files */ Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (revision 1459433) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (working copy) @@ -85,7 +85,7 @@ private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE; private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM; - private final boolean includeMemstoreTS = true; + private final boolean includeMemstoreTS; private long maxMemstoreTS = 0; static class WriterFactoryV2 extends HFile.WriterFactory { @@ -98,9 +98,9 @@ FSDataOutputStream ostream, int blockSize, Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder, final KeyComparator comparator, final ChecksumType checksumType, - final int bytesPerChecksum) throws IOException { - return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize, - compress, blockEncoder, comparator, checksumType, bytesPerChecksum); + final int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException { + return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize, compress, + blockEncoder, comparator, checksumType, bytesPerChecksum, includeMVCCReadpoint); } } @@ -109,12 +109,13 @@ FileSystem fs, Path path, FSDataOutputStream ostream, int blockSize, Compression.Algorithm compressAlgo, HFileDataBlockEncoder blockEncoder, final KeyComparator comparator, final ChecksumType checksumType, - final int bytesPerChecksum) throws IOException { + final int bytesPerChecksum, final boolean includeMVCCReadpoint) throws IOException { super(cacheConf, ostream == null ? createOutputStream(conf, fs, path) : ostream, path, blockSize, compressAlgo, blockEncoder, comparator); this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; + this.includeMemstoreTS = includeMVCCReadpoint; finishInit(conf); }