Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (revision 1174515) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (working copy) @@ -766,7 +766,7 @@ for (int i=numKVs;i>0;i--) { KeyValue kv = new KeyValue(b, b, b, i, b); kvs.add(kv); - totalSize += kv.getLength(); + totalSize += kv.getLength()+Bytes.SIZEOF_LONG; } int blockSize = totalSize / numBlocks; StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize, Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (revision 1174515) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (working copy) @@ -177,9 +177,10 @@ } LOG.info("Block count by type: " + blockCountByType); + String countByType = blockCountByType.toString(); assertEquals( - "{DATA=1367, LEAF_INDEX=172, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", - blockCountByType.toString()); + "{DATA=1459, LEAF_INDEX=183, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=25}", + countByType); reader.close(); } Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (revision 1174515) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (working copy) @@ -36,7 +36,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.junit.Before; import org.junit.Test; @@ -115,10 +117,35 @@ HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize); + // Comparator class name is stored in the trailer in version 2. + RawComparator comparator = trailer.createComparator(); + HFileBlockIndex.BlockIndexReader dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator, + trailer.getNumDataIndexLevels(), blockReader); + HFileBlockIndex.BlockIndexReader metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader( + Bytes.BYTES_RAWCOMPARATOR, 1); + HFileBlock.BlockIterator blockIter = blockReader.blockRange( + trailer.getLoadOnOpenDataOffset(), + fileSize - trailer.getTrailerSize()); + // Data index. We also read statistics about the block index written after + // the root level. + dataBlockIndexReader.readMultiLevelIndexRoot( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getDataIndexCount()); + + // Meta index. + metaBlockIndexReader.readRootIndex( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getMetaIndexCount()); + // File info + FileInfo fileInfo = new FileInfo(); + fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO)); + boolean includeMemstoreTS = (fileInfo.get(HFileWriterV2.MAX_MEMSTORE_KEY) != null); + // Counters for the number of key/value pairs and the number of blocks int entriesRead = 0; int blocksRead = 0; + long memstoreTS = 0; // Scan blocks the way the reader would scan them fsdis.seek(0); @@ -128,6 +155,7 @@ assertEquals(BlockType.DATA, block.getBlockType()); ByteBuffer buf = block.getBufferWithoutHeader(); while (buf.hasRemaining()) { + if (includeMemstoreTS) memstoreTS = buf.getLong(); int keyLen = buf.getInt(); int valueLen = buf.getInt(); Index: src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (revision 1174515) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (working copy) @@ -24,6 +24,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + /** * Manages the read/write consistency within memstore. This provides * an interface for readers to determine what entries to ignore, and @@ -43,6 +46,13 @@ private static final ThreadLocal perThreadReadPoint = new ThreadLocal(); + public ReadWriteConsistencyControl() { + this(0); + } + + public ReadWriteConsistencyControl(long start) { + this.memstoreRead = this.memstoreWrite = start; + } /** * Get this thread's read point. Used primarily by the memstore scanner to * know which values to skip (ie: have not been completed/committed to @@ -52,6 +62,18 @@ return perThreadReadPoint.get(); } + public void initialize(long startPoint) { + synchronized (writeQueue) { + if (this.memstoreWrite != this.memstoreRead) + throw new RuntimeException("Already used this rwcc. Too late to initialize"); + + if (this.memstoreWrite > startPoint) + throw new RuntimeException("Cannot decrease RWCC timestamp"); + + this.memstoreRead = this.memstoreWrite = startPoint; + } + } + /** * Set the thread read point to the given value. The thread RWCC * is used by the Memstore scanner so it knows which values to skip. @@ -137,6 +159,7 @@ } } if (interrupted) Thread.currentThread().interrupt(); + } public long memstoreReadPoint() { Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1174515) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -397,6 +397,8 @@ // Load in all the HStores. Get maximum seqid. long maxSeqId = -1; + // initialized to -1 so that we pick up MemstoreTS from column families + long maxMemstoreTS = -1; for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) { status.setStatus("Instantiating store for column family " + c); Store store = instantiateHStore(this.tableDir, c); @@ -405,7 +407,12 @@ if (storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } + long maxStoreMemstoreTS = store.getMaxMemstoreTS(); + if (maxStoreMemstoreTS > maxMemstoreTS) { + maxMemstoreTS = maxStoreMemstoreTS; + } } + rwcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = replayRecoveredEditsIfAny( this.regiondir, maxSeqId, reporter, status); @@ -2061,7 +2068,7 @@ long size = 0; try { w = rwcc.beginMemstoreInsert(); - + for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); List edits = e.getValue(); @@ -2075,6 +2082,7 @@ } finally { rwcc.completeMemstoreInsert(w); } + return size; } @@ -2623,6 +2631,7 @@ } RegionScannerImpl(Scan scan, List additionalScanners) throws IOException { //DebugPrint.println("HRegionScanner."); + this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1174515) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -232,6 +232,13 @@ } /** + * @return The maximum memstoreTS in all store files. + */ + public long getMaxMemstoreTS() { + return StoreFile.getMaxMemstoreTSInList(this.getStorefiles()); + } + + /** * @param tabledir * @param encodedName Encoded region name. * @param family Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1174515) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileWriterV1; +import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; @@ -153,6 +154,18 @@ // Set when we obtain a Reader. private long sequenceid = -1; + // max of the MemstoreTS in the KV's in this store + // Set when we obtain a Reader. + private long maxMemstoreTS = -1; + + public long getMaxMemstoreTS() { + return maxMemstoreTS; + } + + public void setMaxMemstoreTS(long maxMemstoreTS) { + this.maxMemstoreTS = maxMemstoreTS; + } + // If true, this file was product of a major compaction. Its then set // whenever you get a Reader. private AtomicBoolean majorCompaction = null; @@ -332,6 +345,24 @@ * @return 0 if no non-bulk-load files are provided or, this is Store that * does not yet have any store files. */ + public static long getMaxMemstoreTSInList(Collection sfs) { + long max = 0; + for (StoreFile sf : sfs) { + if (!sf.isBulkLoadResult()) { + max = Math.max(max, sf.getMaxMemstoreTS()); + } + } + return max; + } + + /** + * Return the highest sequence ID found across all storefiles in + * the given list. Store files that were created by a mapreduce + * bulk load are ignored, as they do not correspond to any edit + * log items. + * @return 0 if no non-bulk-load files are provided or, this is Store that + * does not yet have any store files. + */ public static long getMaxSequenceIdInList(Collection sfs) { long max = 0; for (StoreFile sf : sfs) { @@ -520,6 +551,11 @@ } this.reader.setSequenceID(this.sequenceid); + b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_KEY); + if (b != null) { + this.maxMemstoreTS = Bytes.toLong(b); + } + b = metadataMap.get(MAJOR_COMPACTION_KEY); if (b != null) { boolean mc = Bytes.toBoolean(b); Index: src/main/java/org/apache/hadoop/hbase/KeyValue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/KeyValue.java (revision 1174515) +++ src/main/java/org/apache/hadoop/hbase/KeyValue.java (working copy) @@ -234,9 +234,13 @@ * @param bytes byte array */ public KeyValue(final byte [] bytes) { - this(bytes, 0); + this(false, bytes, 0); } + public KeyValue(boolean includesMemTS, final byte [] bytes) { + this(includesMemTS, bytes, 0); + } + /** * Creates a KeyValue from the specified byte array and offset. * Presumes bytes content starting at offset is @@ -245,8 +249,22 @@ * @param offset offset to start of KeyValue */ public KeyValue(final byte [] bytes, final int offset) { - this(bytes, offset, getLength(bytes, offset)); + this(false, bytes, offset); } + + /** + * Creates a KeyValue from the specified byte array and offset. + * Presumes bytes content starting at offset is + * formatted as a KeyValue blob. + * @param bytes byte array + * @param offset offset to start of KeyValue + */ + public KeyValue(boolean includesMemTS, final byte [] bytes, final int offset) { + this(bytes, offset + (includesMemTS? Bytes.SIZEOF_LONG: 0), getLength(bytes, offset + (includesMemTS? Bytes.SIZEOF_LONG: 0))); + if (includesMemTS) { + this.setMemstoreTS(Bytes.toLong(bytes, offset)); + } + } /** * Creates a KeyValue from the specified byte array, starting at offset, and Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (revision 1174515) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (working copy) @@ -369,13 +369,13 @@ * Implementation of {@link HFileScanner} interface. */ protected static class ScannerV1 extends AbstractHFileReader.Scanner { - private final HFileReaderV1 readerV1; + private final HFileReaderV1 reader; private int currBlock; public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(reader, cacheBlocks, pread, isCompaction); - readerV1 = reader; + super(cacheBlocks, pread, isCompaction); + this.reader = reader; } @Override @@ -442,7 +442,7 @@ blockBuffer = null; return false; } - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -462,7 +462,7 @@ @Override public int seekTo(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return -1; // falls before the beginning of the file! :-( // Avoid re-reading the same block (that'd be dumb). loadBlock(b, true); @@ -488,7 +488,7 @@ } } - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) { return -1; } @@ -555,7 +555,7 @@ @Override public boolean seekBefore(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return false; // key is before the start of the file. @@ -607,7 +607,7 @@ return true; } currBlock = 0; - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -617,13 +617,13 @@ private void loadBlock(int bloc, boolean rewind) throws IOException { if (blockBuffer == null) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; } else { if (bloc != currBlock) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (revision 1174515) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (working copy) @@ -46,8 +46,21 @@ * The size of a (key length, value length) tuple that prefixes each entry in * a data block. */ - private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + private boolean includeMemstoreTS = false; + + private boolean includeMemstoreTS() { + return includeMemstoreTS; + } + + private int keyValueLenSize() { + //KEY_VALUE_LEN_SIZE + if (this.includeMemstoreTS) + return 2 * Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG; + else + return 2 * Bytes.SIZEOF_INT; + } + /** * A "sparse lock" implementation allowing to lock on a particular block * identified by offset. The purpose of this is to avoid two clients loading @@ -81,6 +94,8 @@ super(path, trailer, fsdis, size, closeIStream, blockCache, inMemory, evictOnClose); + LOG.info("HFileReaderV2 trying to read from " + path); + trailer.expectVersion(2); fsBlockReader = new HFileBlock.FSReaderV2(fsdis, compressAlgo, fileSize); @@ -115,6 +130,7 @@ lastKey = fileInfo.get(FileInfo.LASTKEY); avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN)); avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN)); + includeMemstoreTS = (fileInfo.get(HFileWriterV2.MAX_MEMSTORE_KEY) != null); // Store all other load-on-open blocks for further consumption. HFileBlock b; @@ -324,19 +340,29 @@ */ protected static class ScannerV2 extends AbstractHFileReader.Scanner { private HFileBlock block; + private HFileReaderV2 reader; public ScannerV2(HFileReaderV2 r, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(r, cacheBlocks, pread, isCompaction); + super(cacheBlocks, pread, isCompaction); + this.reader = r; } @Override + public HFileReaderV2 getReader() { + return reader; + } + + @Override public KeyValue getKeyValue() { if (!isSeeked()) return null; - return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position()); + KeyValue ret = new KeyValue(this.reader.includeMemstoreTS(), + blockBuffer.array(), + blockBuffer.arrayOffset() + blockBuffer.position()); + + return ret; } @Override @@ -345,7 +371,7 @@ return ByteBuffer.wrap( blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position() - + KEY_VALUE_LEN_SIZE, currKeyLen).slice(); + + this.reader.keyValueLenSize(), currKeyLen).slice(); } @Override @@ -354,7 +380,7 @@ return ByteBuffer.wrap( blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position() - + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice(); + + this.reader.keyValueLenSize() + currKeyLen, currValueLen).slice(); } private void setNonSeekedState() { @@ -376,7 +402,7 @@ assertSeeked(); try { - blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE + blockBuffer.position(blockBuffer.position() + this.reader.keyValueLenSize() + currKeyLen + currValueLen); } catch (IllegalArgumentException e) { LOG.error("Current pos = " + blockBuffer.position() @@ -499,7 +525,7 @@ private int seekTo(byte[] key, int offset, int length, boolean rewind) throws IOException { HFileBlock seekToBlock = - ((HFileReaderV2) reader).getDataBlockIndexReader().seekToDataBlock( + reader.getDataBlockIndexReader().seekToDataBlock( key, offset, length, block); if (seekToBlock == null) { // This happens if the key e.g. falls before the beginning of the file. @@ -565,6 +591,9 @@ private final void readKeyValueLen() { blockBuffer.mark(); + if (reader.includeMemstoreTS()) { + currMemstoreTS = blockBuffer.getLong(); + } currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); blockBuffer.reset(); @@ -595,15 +624,19 @@ private int blockSeek(byte[] key, int offset, int length, boolean seekBefore) { int klen, vlen; + long memstoreTS = 0; int lastKeyValueSize = -1; do { blockBuffer.mark(); + if (this.reader.includeMemstoreTS()) { + memstoreTS = blockBuffer.getLong(); + } klen = blockBuffer.getInt(); vlen = blockBuffer.getInt(); blockBuffer.reset(); int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() - + KEY_VALUE_LEN_SIZE; + + this.reader.keyValueLenSize(); int comp = reader.getComparator().compare(key, offset, length, blockBuffer.array(), keyOffset, klen); @@ -620,6 +653,9 @@ readKeyValueLen(); return 1; // non exact match. } + if (this.reader.includeMemstoreTS()) { + currMemstoreTS = memstoreTS; + } currKeyLen = klen; currValueLen = vlen; return 0; // indicate exact match @@ -633,7 +669,7 @@ } // The size of this key/value tuple, including key/value length fields. - lastKeyValueSize = klen + vlen + KEY_VALUE_LEN_SIZE; + lastKeyValueSize = klen + vlen + this.reader.keyValueLenSize(); blockBuffer.position(blockBuffer.position() + lastKeyValueSize); } while (blockBuffer.remaining() > 0); @@ -654,6 +690,9 @@ ByteBuffer buffer = curBlock.getBufferWithoutHeader(); // It is safe to manipulate this buffer because we own the buffer object. buffer.rewind(); + if (this.reader.includeMemstoreTS()) { + buffer.getLong(); // skip memstoreTS + } int klen = buffer.getInt(); buffer.getInt(); ByteBuffer keyBuff = buffer.slice(); @@ -665,9 +704,8 @@ @Override public boolean seekBefore(byte[] key, int offset, int length) throws IOException { - HFileReaderV2 reader2 = (HFileReaderV2) reader; HFileBlock seekToBlock = - reader2.getDataBlockIndexReader().seekToDataBlock( + reader.getDataBlockIndexReader().seekToDataBlock( key, offset, length, block); if (seekToBlock == null) { return false; @@ -686,7 +724,7 @@ // It is important that we compute and pass onDiskSize to the block // reader so that it does not have to read the header separately to // figure out the size. - seekToBlock = reader2.fsBlockReader.readBlockData(previousBlockOffset, + seekToBlock = reader.fsBlockReader.readBlockData(previousBlockOffset, seekToBlock.getOffset() - previousBlockOffset, -1, pread); // TODO shortcut: seek forward in this block to the last key of the @@ -700,13 +738,13 @@ public String getKeyString() { return Bytes.toStringBinary(blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position() - + KEY_VALUE_LEN_SIZE, currKeyLen); + + this.reader.keyValueLenSize(), currKeyLen); } @Override public String getValueString() { return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen, + + blockBuffer.position() + this.reader.keyValueLenSize() + currKeyLen, currValueLen); } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (revision 1174515) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (working copy) @@ -26,6 +26,8 @@ import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -34,6 +36,7 @@ import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; +import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -42,7 +45,12 @@ * Writes HFile format version 2. */ public class HFileWriterV2 extends AbstractHFileWriter { + private static final Log LOG = + LogFactory.getLog(HFileWriterV2.class.getName()); + /** Max memstore id in FileInfo */ + public static final byte [] MAX_MEMSTORE_KEY = Bytes.toBytes("MAX_MEMSTORE_KEY"); + /** Inline block writers for multi-level block index and compound Blooms. */ private List inlineBlockWriters = new ArrayList(); @@ -63,6 +71,9 @@ private List additionalLoadOnOpenData = new ArrayList(); + private final boolean includeMemstoreTS = true; + private long maxMemstoreTS = 0; + static class WriterFactoryV2 extends HFile.WriterFactory { WriterFactoryV2(Configuration conf) { super(conf); } @@ -129,6 +140,7 @@ super(conf, createOutputStream(conf, fs, path), path, blockSize, compressAlgo, comparator); finishInit(conf); + LOG.info("HFileWriterV2 going to write to " + path); } /** Constructor that takes a stream. */ @@ -148,6 +160,7 @@ throws IOException { super(conf, outputStream, null, blockSize, compress, comparator); finishInit(conf); + LOG.info("HFileWriterV2 going to write to " + path); } /** Additional initialization steps */ @@ -283,8 +296,9 @@ */ @Override public void append(final KeyValue kv) throws IOException { - append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), + append(kv.getMemstoreTS(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); + this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMemstoreTS()); } /** @@ -299,7 +313,7 @@ */ @Override public void append(final byte[] key, final byte[] value) throws IOException { - append(key, 0, key.length, value, 0, value.length); + append(0, key, 0, key.length, value, 0, value.length); } /** @@ -314,7 +328,7 @@ * @param vlength * @throws IOException */ - private void append(final byte[] key, final int koffset, final int klength, + private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength, final byte[] value, final int voffset, final int vlength) throws IOException { boolean dupKey = checkKey(key, koffset, klength); @@ -329,6 +343,8 @@ // Write length of key and value and then actual key and value bytes. { DataOutputStream out = fsBlockWriter.getUserDataStream(); + if (this.includeMemstoreTS) + out.writeLong(memstoreTS); out.writeInt(klength); totalKeyLength += klength; out.writeInt(vlength); @@ -398,6 +414,8 @@ BlockType.ROOT_INDEX, false), "meta"); fsBlockWriter.writeHeaderAndData(outputStream); + appendFileInfo(MAX_MEMSTORE_KEY, Bytes.toBytes(maxMemstoreTS)); + // File info writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO, false)); @@ -415,6 +433,7 @@ trailer.setLastDataBlockOffset(lastDataBlockOffset); trailer.setComparatorClass(comparator.getClass()); trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); + finishClose(trailer); Index: src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (revision 1174515) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (working copy) @@ -305,7 +305,6 @@ } protected static abstract class Scanner implements HFileScanner { - protected HFile.Reader reader; protected ByteBuffer blockBuffer; protected boolean cacheBlocks; @@ -314,30 +313,25 @@ protected int currKeyLen; protected int currValueLen; + protected long currMemstoreTS; protected int blockFetches; - public Scanner(final HFile.Reader reader, final boolean cacheBlocks, + public Scanner(final boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - this.reader = reader; this.cacheBlocks = cacheBlocks; this.pread = pread; this.isCompaction = isCompaction; } @Override - public Reader getReader() { - return reader; - } - - @Override public boolean isSeeked(){ return blockBuffer != null; } @Override public String toString() { - return "HFileScanner for reader " + String.valueOf(reader); + return "HFileScanner for reader " + String.valueOf(getReader()); } protected void assertSeeked() { Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1174515) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -59,6 +59,12 @@ /** Row the query is on */ protected byte [] row; + /** Should we ignore KV's with a newer RWCC timestamp **/ + private boolean ignoreNewerKVs = false; + public void ignoreNewerKVs() { + this.ignoreNewerKVs = true; + } + /** * Constructs a ScanQueryMatcher for a Scan. * @param scan @@ -166,6 +172,12 @@ return columns.getNextRowOrNextColumn(bytes, offset, qualLength); } + // The compaction thread has no readPoint set. For other operations, we + // will ignore updates that are done after the read operation has started. + if (this.ignoreNewerKVs && + kv.getMemstoreTS() > ReadWriteConsistencyControl.getThreadReadPoint()) + return MatchCode.SKIP; + byte type = kv.getType(); if (isDelete(type)) { if (tr.withinOrAfterTimeRange(timestamp)) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1174515) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -66,6 +66,7 @@ columns, store.ttl, store.comparator.getRawComparator(), store.minVersions, store.versionsToReturn(scan.getMaxVersions()), false); + matcher.ignoreNewerKVs(); this.isGet = scan.isGetScan(); // pass columns = try to filter out unnecessary ScanFiles