Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (revision 1458980) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (working copy) @@ -347,7 +347,7 @@ HRegion region = new HRegion(outputDir, null, fs, conf, regionInfo, htd, null); HStore store = new HStore(region, columnDescriptor, conf); - StoreFile.Writer writer = store.createWriterInTmp(maxKeyCount, compression, false); + StoreFile.Writer writer = store.createWriterInTmp(maxKeyCount, compression, false, true); StatisticsPrinter statsPrinter = new StatisticsPrinter(); statsPrinter.startThread(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java (revision 1458980) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java (working copy) @@ -198,7 +198,7 @@ public void testCacheOnWriteInSchema() throws IOException { // Write some random data into the store StoreFile.Writer writer = store.createWriterInTmp(Integer.MAX_VALUE, - HFile.DEFAULT_COMPRESSION_ALGORITHM, false); + HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true); writeStoreFile(writer); writer.close(); // Verify the block types of interest were cached on write Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 1458980) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy) @@ -178,7 +178,7 @@ init(getName(), conf, hcd); // Test createWriterInTmp() - StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false); + StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true); Path path = writer.getPath(); writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1458980) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -827,7 +827,7 @@ */ private StoreFile.Writer createWriterInTmp(long maxKeyCount) throws IOException { - return createWriterInTmp(maxKeyCount, this.family.getCompression(), false); + return createWriterInTmp(maxKeyCount, this.family.getCompression(), false, true); } /* @@ -837,7 +837,7 @@ * @return Writer for a new StoreFile in the tmp dir. */ public StoreFile.Writer createWriterInTmp(long maxKeyCount, - Compression.Algorithm compression, boolean isCompaction) + Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint) throws IOException { final CacheConfig writerCacheConf; if (isCompaction) { @@ -857,6 +857,7 @@ .withChecksumType(checksumType) .withBytesPerChecksum(bytesPerChecksum) .withCompression(compression) + .includeMVCCReadpoint(includeMVCCReadpoint) .build(); return w; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1458980) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -145,8 +145,8 @@ * @param isCompaction whether we are creating a new file in a compaction * @return Writer for a new StoreFile in the tmp dir. */ - public StoreFile.Writer createWriterInTmp(long maxKeyCount, - Compression.Algorithm compression, boolean isCompaction) throws IOException; + public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, + boolean isCompaction, boolean includeMVCCReadpoints) throws IOException; // Compaction oriented methods Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1458980) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -530,6 +530,7 @@ private Path filePath; private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE; private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM; + private boolean includeMVCCReadpoint = true; public WriterBuilder(Configuration conf, CacheConfig cacheConf, FileSystem fs, int blockSize) { @@ -615,6 +616,15 @@ } /** + * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV + * @return this (for chained invocation) + */ + public WriterBuilder includeMVCCReadpoint(boolean includeMVCCReadpoint) { + this.includeMVCCReadpoint = includeMVCCReadpoint; + return this; + } + + /** * Create a store file writer. Client is responsible for closing file when * done. If metadata, add BEFORE closing using * {@link Writer#appendMetadata}. @@ -759,8 +769,8 @@ HFileDataBlockEncoder dataBlockEncoder, final Configuration conf, CacheConfig cacheConf, final KVComparator comparator, BloomType bloomType, long maxKeys, - final ChecksumType checksumType, final int bytesPerChecksum) - throws IOException { + final ChecksumType checksumType, final int bytesPerChecksum, + final boolean includeMVCCReadpoint) throws IOException { this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; writer = HFile.getWriterFactory(conf, cacheConf) @@ -771,6 +781,7 @@ .withComparator(comparator.getRawComparator()) .withChecksumType(checksumType) .withBytesPerChecksum(bytesPerChecksum) + .includeMVCCReadpoint(includeMVCCReadpoint) .create(); this.kvComparator = comparator; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (revision 1458980) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (working copy) @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.CellOutputStream; import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; @@ -111,6 +113,8 @@ public long earliestPutTs = HConstants.LATEST_TIMESTAMP; /** The last key in the files we're compacting. */ public long maxSeqId = 0; + /** Latest memstore read point found in any of the involved files */ + public long maxMemstoreReadPoint = 0; } protected FileDetails getFileDetails( @@ -130,11 +134,17 @@ long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType()) ? r.getFilterEntries() : r.getEntries(); fd.maxKeyCount += keyCount; + // calculate the latest MVCC readpoint in any of the involved store files + Map fileInfo = r.loadFileInfo(); + byte tmp[] = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); + if (tmp != null) { + fd.maxMemstoreReadPoint = Math.max(fd.maxMemstoreReadPoint, Bytes.toLong(tmp)); + } // If required, calculate the earliest put timestamp of all involved storefiles. // This is used to remove family delete marker during compaction. long earliestPutTs = 0; if (calculatePutTs) { - byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS); + tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS); if (tmp == null) { // There's a file with no information, must be an old one // assume we have very old puts Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (revision 1458980) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (working copy) @@ -84,7 +84,8 @@ } // Create the writer even if no kv(Empty store file is also ok), // because we need record the max seq id for the store file, see HBASE-6059 - writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true); + writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, + fd.maxMemstoreReadPoint >= smallestReadPoint); boolean finished = performCompaction(scanner, writer, smallestReadPoint); if (!finished) { abortWriter(writer); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (revision 1458980) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (working copy) @@ -54,6 +54,7 @@ private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; private boolean includesMemstoreTS = false; + private boolean decodeMemstoreTS = false; private boolean shouldIncludeMemstoreTS() { return includesMemstoreTS; @@ -147,6 +148,9 @@ Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE; fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS); + if (includesMemstoreTS) { + decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0; + } // Read data block encoding algorithm name from file info. dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo, @@ -769,15 +773,20 @@ currValueLen = blockBuffer.getInt(); blockBuffer.reset(); if (this.reader.shouldIncludeMemstoreTS()) { - try { - int memstoreTSOffset = blockBuffer.arrayOffset() - + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen - + currValueLen; - currMemstoreTS = Bytes.readVLong(blockBuffer.array(), - memstoreTSOffset); - currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); - } catch (Exception e) { - throw new RuntimeException("Error reading memstore timestamp", e); + if (this.reader.decodeMemstoreTS) { + try { + int memstoreTSOffset = blockBuffer.arrayOffset() + + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + + currValueLen; + currMemstoreTS = Bytes.readVLong(blockBuffer.array(), + memstoreTSOffset); + currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstore timestamp", e); + } + } else { + currMemstoreTS = 0; + currMemstoreTSLen = 1; } } @@ -816,14 +825,19 @@ vlen = blockBuffer.getInt(); blockBuffer.reset(); if (this.reader.shouldIncludeMemstoreTS()) { - try { - int memstoreTSOffset = blockBuffer.arrayOffset() - + blockBuffer.position() + KEY_VALUE_LEN_SIZE + klen + vlen; - memstoreTS = Bytes.readVLong(blockBuffer.array(), - memstoreTSOffset); - memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); - } catch (Exception e) { - throw new RuntimeException("Error reading memstore timestamp", e); + if (this.reader.decodeMemstoreTS) { + try { + int memstoreTSOffset = blockBuffer.arrayOffset() + + blockBuffer.position() + KEY_VALUE_LEN_SIZE + klen + vlen; + memstoreTS = Bytes.readVLong(blockBuffer.array(), + memstoreTSOffset); + memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstore timestamp", e); + } + } else { + memstoreTS = 0; + memstoreTSLen = 1; } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 1458980) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy) @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; import org.apache.hadoop.hbase.protobuf.generated.HFileProtos; +import org.apache.hadoop.hbase.regionserver.StoreFile.WriterBuilder; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; @@ -338,6 +339,7 @@ protected KeyComparator comparator; protected ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE; protected int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM; + protected boolean includeMVCCReadpoint = true; WriterFactory(Configuration conf, CacheConfig cacheConf) { this.conf = conf; @@ -398,6 +400,15 @@ return this; } + /** + * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV + * @return this (for chained invocation) + */ + public WriterFactory includeMVCCReadpoint(boolean includeMVCCReadpoint) { + this.includeMVCCReadpoint = includeMVCCReadpoint; + return this; + } + public Writer create() throws IOException { if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) { throw new AssertionError("Please specify exactly one of " + @@ -407,7 +418,7 @@ ostream = AbstractHFileWriter.createOutputStream(conf, fs, path); } return createWriter(fs, path, ostream, blockSize, - compression, encoder, comparator, checksumType, bytesPerChecksum); + compression, encoder, comparator, checksumType, bytesPerChecksum, includeMVCCReadpoint); } protected abstract Writer createWriter(FileSystem fs, Path path, @@ -415,7 +426,7 @@ Compression.Algorithm compress, HFileDataBlockEncoder dataBlockEncoder, KeyComparator comparator, ChecksumType checksumType, - int bytesPerChecksum) throws IOException; + int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException; } /** The configuration key for HFile version to use for new files */ Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (revision 1458980) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (working copy) @@ -85,7 +85,7 @@ private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE; private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM; - private final boolean includeMemstoreTS = true; + private final boolean includeMemstoreTS; private long maxMemstoreTS = 0; static class WriterFactoryV2 extends HFile.WriterFactory { @@ -98,9 +98,9 @@ FSDataOutputStream ostream, int blockSize, Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder, final KeyComparator comparator, final ChecksumType checksumType, - final int bytesPerChecksum) throws IOException { - return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize, - compress, blockEncoder, comparator, checksumType, bytesPerChecksum); + final int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException { + return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize, compress, + blockEncoder, comparator, checksumType, bytesPerChecksum, includeMVCCReadpoint); } } @@ -109,12 +109,13 @@ FileSystem fs, Path path, FSDataOutputStream ostream, int blockSize, Compression.Algorithm compressAlgo, HFileDataBlockEncoder blockEncoder, final KeyComparator comparator, final ChecksumType checksumType, - final int bytesPerChecksum) throws IOException { + final int bytesPerChecksum, final boolean includeMVCCReadpoint) throws IOException { super(cacheConf, ostream == null ? createOutputStream(conf, fs, path) : ostream, path, blockSize, compressAlgo, blockEncoder, comparator); this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; + this.includeMemstoreTS = includeMVCCReadpoint; finishInit(conf); }