Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 1459154) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy) @@ -323,6 +323,7 @@ protected KeyComparator comparator; protected ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE; protected int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM; + protected boolean includeMVCCReadpoint = true; WriterFactory(Configuration conf, CacheConfig cacheConf) { this.conf = conf; @@ -383,6 +384,11 @@ return this; } + public WriterFactory includeMVCCReadpoint(boolean includeMVCCReadpoint) { + this.includeMVCCReadpoint = includeMVCCReadpoint; + return this; + } + public Writer create() throws IOException { if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) { throw new AssertionError("Please specify exactly one of " + @@ -391,8 +397,8 @@ if (path != null) { ostream = AbstractHFileWriter.createOutputStream(conf, fs, path); } - return createWriter(fs, path, ostream, blockSize, - compression, encoder, comparator, checksumType, bytesPerChecksum); + return createWriter(fs, path, ostream, blockSize, compression, encoder, comparator, + checksumType, bytesPerChecksum, includeMVCCReadpoint); } protected abstract Writer createWriter(FileSystem fs, Path path, @@ -400,7 +406,7 @@ Compression.Algorithm compress, HFileDataBlockEncoder dataBlockEncoder, KeyComparator comparator, ChecksumType checksumType, - int bytesPerChecksum) throws IOException; + int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException; } /** The configuration key for HFile version to use for new files */ Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (revision 1459154) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (working copy) @@ -54,6 +54,7 @@ private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; private boolean includesMemstoreTS = false; + private boolean decodeMemstoreTS = false; private boolean shouldIncludeMemstoreTS() { return includesMemstoreTS; @@ -145,6 +146,9 @@ Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE; fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS); + if (includesMemstoreTS) { + decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0; + } // Read data block encoding algorithm name from file info. dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo, @@ -785,15 +789,20 @@ currValueLen = blockBuffer.getInt(); blockBuffer.reset(); if (this.reader.shouldIncludeMemstoreTS()) { - try { - int memstoreTSOffset = blockBuffer.arrayOffset() - + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen - + currValueLen; - currMemstoreTS = Bytes.readVLong(blockBuffer.array(), - memstoreTSOffset); - currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); - } catch (Exception e) { - throw new RuntimeException("Error reading memstore timestamp", e); + if (this.reader.decodeMemstoreTS) { + try { + int memstoreTSOffset = blockBuffer.arrayOffset() + + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + + currValueLen; + currMemstoreTS = Bytes.readVLong(blockBuffer.array(), + memstoreTSOffset); + currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstore timestamp", e); + } + } else { + currMemstoreTS = 0; + currMemstoreTSLen = 1; } } @@ -832,14 +841,18 @@ vlen = blockBuffer.getInt(); blockBuffer.reset(); if (this.reader.shouldIncludeMemstoreTS()) { - try { - int memstoreTSOffset = blockBuffer.arrayOffset() - + blockBuffer.position() + KEY_VALUE_LEN_SIZE + klen + vlen; - memstoreTS = Bytes.readVLong(blockBuffer.array(), - memstoreTSOffset); - memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); - } catch (Exception e) { - throw new RuntimeException("Error reading memstore timestamp", e); + if (this.reader.decodeMemstoreTS) { + try { + int memstoreTSOffset = blockBuffer.arrayOffset() + + blockBuffer.position() + KEY_VALUE_LEN_SIZE + klen + vlen; + memstoreTS = Bytes.readVLong(blockBuffer.array(), + memstoreTSOffset); + memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstore timestamp", e); + } + } else { + memstoreTSLen = 1; } } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (revision 1459154) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (working copy) @@ -92,7 +92,7 @@ FSDataOutputStream ostream, int blockSize, Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder, KeyComparator comparator, final ChecksumType checksumType, - final int bytesPerChecksum) throws IOException { + final int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException { // version 1 does not implement checksums return new HFileWriterV1(conf, cacheConf, fs, path, ostream, blockSize, compressAlgo, dataBlockEncoder, comparator); Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (revision 1459154) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (working copy) @@ -86,7 +86,7 @@ private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE; private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM; - private final boolean includeMemstoreTS = true; + private final boolean includeMemstoreTS; private long maxMemstoreTS = 0; private int minorVersion = HFileReaderV2.MAX_MINOR_VERSION; @@ -101,9 +101,9 @@ FSDataOutputStream ostream, int blockSize, Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder, final KeyComparator comparator, final ChecksumType checksumType, - final int bytesPerChecksum) throws IOException { - return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize, - compress, blockEncoder, comparator, checksumType, bytesPerChecksum); + final int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException { + return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize, compress, + blockEncoder, comparator, checksumType, bytesPerChecksum, includeMVCCReadpoint); } } @@ -112,13 +112,14 @@ FileSystem fs, Path path, FSDataOutputStream ostream, int blockSize, Compression.Algorithm compressAlgo, HFileDataBlockEncoder blockEncoder, final KeyComparator comparator, final ChecksumType checksumType, - final int bytesPerChecksum) throws IOException { + final int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException { super(cacheConf, ostream == null ? createOutputStream(conf, fs, path) : ostream, path, blockSize, compressAlgo, blockEncoder, comparator); SchemaMetrics.configureGlobally(conf); this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; + this.includeMemstoreTS = includeMVCCReadpoint; if (!conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, false)) { this.minorVersion = 0; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (revision 1459154) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (working copy) @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; @@ -84,6 +86,7 @@ // Also calculate earliest put timestamp if major compaction int maxKeyCount = 0; long earliestPutTs = HConstants.LATEST_TIMESTAMP; + long maxMVCCReadpoint = 0; // pull out the interesting things from the CR for ease later final Store store = request.getStore(); @@ -102,11 +105,17 @@ .getBloomFilterType()) ? r.getFilterEntries() : r.getEntries(); maxKeyCount += keyCount; + // Calculate the maximum MVCC readpoint used in any of the involved files + Map fileInfo = r.loadFileInfo(); + byte[] tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); + if (tmp != null) { + maxMVCCReadpoint = Math.max(maxMVCCReadpoint, Bytes.toLong(tmp)); + } // For major compactions calculate the earliest put timestamp // of all involved storefiles. This is used to remove // family delete marker during the compaction. if (majorCompaction) { - byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS); + tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS); if (tmp == null) { // There's a file with no information, must be an old one // assume we have very old puts @@ -183,7 +192,8 @@ do { hasMore = scanner.next(kvs, compactionKVMax); if (writer == null && !kvs.isEmpty()) { - writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true); + writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true, + maxMVCCReadpoint >= smallestReadPoint); } if (writer != null) { // output to writer: Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1459154) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -901,7 +901,7 @@ */ private StoreFile.Writer createWriterInTmp(int maxKeyCount) throws IOException { - return createWriterInTmp(maxKeyCount, this.family.getCompression(), false); + return createWriterInTmp(maxKeyCount, this.family.getCompression(), false, true); } /* @@ -911,7 +911,7 @@ * @return Writer for a new StoreFile in the tmp dir. */ public StoreFile.Writer createWriterInTmp(int maxKeyCount, - Compression.Algorithm compression, boolean isCompaction) + Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint) throws IOException { final CacheConfig writerCacheConf; if (isCompaction) { @@ -931,6 +931,7 @@ .withChecksumType(checksumType) .withBytesPerChecksum(bytesPerChecksum) .withCompression(compression) + .includeMVCCReadpoint(includeMVCCReadpoint) .build(); // The store file writer's path does not include the CF name, so we need // to configure the HFile writer directly. Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1459154) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -43,7 +43,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.client.Scan; @@ -741,6 +740,7 @@ private Path filePath; private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE; private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM; + private boolean includeMVCCReadpoint = true; public WriterBuilder(Configuration conf, CacheConfig cacheConf, FileSystem fs, int blockSize) { @@ -826,6 +826,15 @@ } /** + * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV + * @return this (for chained invocation) + */ + public WriterBuilder includeMVCCReadpoint(boolean includeMVCCReadpoint) { + this.includeMVCCReadpoint = includeMVCCReadpoint; + return this; + } + + /** * Create a store file writer. Client is responsible for closing file when * done. If metadata, add BEFORE closing using * {@link Writer#appendMetadata}. @@ -859,7 +868,7 @@ } return new Writer(fs, filePath, blockSize, compressAlgo, dataBlockEncoder, conf, cacheConf, comparator, bloomType, maxKeyCount, checksumType, - bytesPerChecksum); + bytesPerChecksum, includeMVCCReadpoint); } } @@ -998,6 +1007,7 @@ * for Bloom filter size in {@link HFile} format version 1. * @param checksumType the checksum type * @param bytesPerChecksum the number of bytes per checksum value + * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV * @throws IOException problem writing to FS */ private Writer(FileSystem fs, Path path, int blocksize, @@ -1005,7 +1015,7 @@ HFileDataBlockEncoder dataBlockEncoder, final Configuration conf, CacheConfig cacheConf, final KVComparator comparator, BloomType bloomType, long maxKeys, - final ChecksumType checksumType, final int bytesPerChecksum) + final ChecksumType checksumType, final int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException { this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; @@ -1017,6 +1027,7 @@ .withComparator(comparator.getRawComparator()) .withChecksumType(checksumType) .withBytesPerChecksum(bytesPerChecksum) + .includeMVCCReadpoint(includeMVCCReadpoint) .create(); this.kvComparator = comparator; Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java (revision 1459154) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java (working copy) @@ -181,7 +181,7 @@ public void testCacheOnWriteInSchema() throws IOException { // Write some random data into the store StoreFile.Writer writer = store.createWriterInTmp(Integer.MAX_VALUE, - Compression.Algorithm.NONE, false); + Compression.Algorithm.NONE, false, true); writeStoreFile(writer); writer.close(); // Verify the block types of interest were cached on write