diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java index 6723b61..9123e70 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java @@ -262,7 +262,6 @@ public abstract class AbstractHFileReader extends SchemaConfigured } protected static abstract class Scanner implements HFileScanner { - protected HFile.Reader reader; protected ByteBuffer blockBuffer; protected boolean cacheBlocks; @@ -271,30 +270,26 @@ public abstract class AbstractHFileReader extends SchemaConfigured protected int currKeyLen; protected int currValueLen; + protected int currMemstoreTSLen; + protected long currMemstoreTS; protected int blockFetches; - public Scanner(final HFile.Reader reader, final boolean cacheBlocks, + public Scanner(final boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - this.reader = reader; this.cacheBlocks = cacheBlocks; this.pread = pread; this.isCompaction = isCompaction; } @Override - public Reader getReader() { - return reader; - } - - @Override public boolean isSeeked(){ return blockBuffer != null; } @Override public String toString() { - return "HFileScanner for reader " + String.valueOf(reader); + return "HFileScanner for reader " + String.valueOf(getReader()); } protected void assertSeeked() { diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java index 69589eb..1eb316a 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java @@ -385,13 +385,13 @@ public class HFileReaderV1 extends AbstractHFileReader { * Implementation of {@link HFileScanner} interface. */ protected static class ScannerV1 extends AbstractHFileReader.Scanner { - private final HFileReaderV1 readerV1; + private final HFileReaderV1 reader; private int currBlock; public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(reader, cacheBlocks, pread, isCompaction); - readerV1 = reader; + super(cacheBlocks, pread, isCompaction); + this.reader = reader; } @Override @@ -458,7 +458,7 @@ public class HFileReaderV1 extends AbstractHFileReader { blockBuffer = null; return false; } - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -478,7 +478,7 @@ public class HFileReaderV1 extends AbstractHFileReader { @Override public int seekTo(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return -1; // falls before the beginning of the file! :-( // Avoid re-reading the same block (that'd be dumb). loadBlock(b, true); @@ -504,7 +504,7 @@ public class HFileReaderV1 extends AbstractHFileReader { } } - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) { return -1; } @@ -571,7 +571,7 @@ public class HFileReaderV1 extends AbstractHFileReader { @Override public boolean seekBefore(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return false; // key is before the start of the file. @@ -623,7 +623,7 @@ public class HFileReaderV1 extends AbstractHFileReader { return true; } currBlock = 0; - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -633,13 +633,13 @@ public class HFileReaderV1 extends AbstractHFileReader { private void loadBlock(int bloc, boolean rewind) throws IOException { if (blockBuffer == null) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; } else { if (bloc != currBlock) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 64d9fbc..150ee70 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -19,7 +19,9 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -34,6 +36,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.io.WritableUtils; /** * {@link HFile} reader for version 2. @@ -46,7 +49,13 @@ public class HFileReaderV2 extends AbstractHFileReader { * The size of a (key length, value length) tuple that prefixes each entry in * a data block. */ - private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + + private boolean includesMemstoreTS = false; + + private boolean shouldIncludeMemstoreTS() { + return includesMemstoreTS; + } /** * A "sparse lock" implementation allowing to lock on a particular block @@ -115,6 +124,9 @@ public class HFileReaderV2 extends AbstractHFileReader { lastKey = fileInfo.get(FileInfo.LASTKEY); avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN)); avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN)); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); + includesMemstoreTS = (keyValueFormatVersion != null && + Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE); // Store all other load-on-open blocks for further consumption. HFileBlock b; @@ -333,10 +345,17 @@ public class HFileReaderV2 extends AbstractHFileReader { */ protected static class ScannerV2 extends AbstractHFileReader.Scanner { private HFileBlock block; + private HFileReaderV2 reader; public ScannerV2(HFileReaderV2 r, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(r, cacheBlocks, pread, isCompaction); + super(cacheBlocks, pread, isCompaction); + this.reader = r; + } + + @Override + public HFileReaderV2 getReader() { + return reader; } @Override @@ -344,8 +363,12 @@ public class HFileReaderV2 extends AbstractHFileReader { if (!isSeeked()) return null; - return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position()); + if (this.reader.shouldIncludeMemstoreTS()) { + ret.setMemstoreTS(currMemstoreTS); + } + return ret; } @Override @@ -371,6 +394,8 @@ public class HFileReaderV2 extends AbstractHFileReader { blockBuffer = null; currKeyLen = 0; currValueLen = 0; + currMemstoreTS = 0; + currMemstoreTSLen = 0; } /** @@ -386,7 +411,7 @@ public class HFileReaderV2 extends AbstractHFileReader { try { blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE - + currKeyLen + currValueLen); + + currKeyLen + currValueLen + currMemstoreTSLen); } catch (IllegalArgumentException e) { LOG.error("Current pos = " + blockBuffer.position() + "; currKeyLen = " + currKeyLen + "; currValLen = " @@ -579,6 +604,16 @@ public class HFileReaderV2 extends AbstractHFileReader { currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); blockBuffer.reset(); + if (this.reader.shouldIncludeMemstoreTS()) { + try { + int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen; + currMemstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset); + currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstoreTS. " + e); + } + } if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit() @@ -606,12 +641,24 @@ public class HFileReaderV2 extends AbstractHFileReader { private int blockSeek(byte[] key, int offset, int length, boolean seekBefore) { int klen, vlen; + long memstoreTS = 0; + int memstoreTSLen = 0; int lastKeyValueSize = -1; do { blockBuffer.mark(); klen = blockBuffer.getInt(); vlen = blockBuffer.getInt(); blockBuffer.reset(); + if (this.reader.shouldIncludeMemstoreTS()) { + try { + int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + klen + vlen; + memstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset); + memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstoreTS. " + e); + } + } int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE; @@ -633,6 +680,10 @@ public class HFileReaderV2 extends AbstractHFileReader { } currKeyLen = klen; currValueLen = vlen; + if (this.reader.shouldIncludeMemstoreTS()) { + currMemstoreTS = memstoreTS; + currMemstoreTSLen = memstoreTSLen; + } return 0; // indicate exact match } @@ -644,7 +695,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } // The size of this key/value tuple, including key/value length fields. - lastKeyValueSize = klen + vlen + KEY_VALUE_LEN_SIZE; + lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE; blockBuffer.position(blockBuffer.position() + lastKeyValueSize); } while (blockBuffer.remaining() > 0); diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index 92dcec4..bc61a3e 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -37,9 +37,11 @@ import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; +import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /** * Writes HFile format version 2. @@ -47,6 +49,13 @@ import org.apache.hadoop.io.Writable; public class HFileWriterV2 extends AbstractHFileWriter { static final Log LOG = LogFactory.getLog(HFileWriterV2.class); + /** Max memstore (mvcc) timestamp in FileInfo */ + public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); + /** KeyValue version in FileInfo */ + public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); + /** Version for KeyValue which includes memstore timestamp */ + public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; + /** Inline block writers for multi-level block index and compound Blooms. */ private List inlineBlockWriters = new ArrayList(); @@ -67,6 +76,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { private List additionalLoadOnOpenData = new ArrayList(); + private final boolean includeMemstoreTS = true; + private long maxMemstoreTS = 0; + static class WriterFactoryV2 extends HFile.WriterFactory { WriterFactoryV2(Configuration conf, CacheConfig cacheConf) { @@ -311,8 +323,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { */ @Override public void append(final KeyValue kv) throws IOException { - append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), + append(kv.getMemstoreTS(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); + this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMemstoreTS()); } /** @@ -327,7 +340,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { */ @Override public void append(final byte[] key, final byte[] value) throws IOException { - append(key, 0, key.length, value, 0, value.length); + append(0, key, 0, key.length, value, 0, value.length); } /** @@ -342,7 +355,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { * @param vlength * @throws IOException */ - private void append(final byte[] key, final int koffset, final int klength, + private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength, final byte[] value, final int voffset, final int vlength) throws IOException { boolean dupKey = checkKey(key, koffset, klength); @@ -355,6 +368,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { newBlock(); // Write length of key and value and then actual key and value bytes. + // Additionally, we may also write down the memstoreTS. { DataOutputStream out = fsBlockWriter.getUserDataStream(); out.writeInt(klength); @@ -363,6 +377,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { totalValueLength += vlength; out.write(key, koffset, klength); out.write(value, voffset, vlength); + if (this.includeMemstoreTS) { + WritableUtils.writeVLong(out, memstoreTS); + } } // Are we the first key in this block? @@ -428,6 +445,11 @@ public class HFileWriterV2 extends AbstractHFileWriter { fsBlockWriter.writeHeaderAndData(outputStream); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + if (this.includeMemstoreTS) { + appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); + appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); + } + // File info writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO, false)); @@ -449,6 +471,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { trailer.setComparatorClass(comparator.getClass()); trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); + finishClose(trailer); fsBlockWriter.releaseCompressor(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java index 4bd3cc1..2eeaab1 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java @@ -48,12 +48,15 @@ public interface ColumnTracker { * @param length * @param ttl The timeToLive to enforce. * @param type The type of the KeyValue + * @param ignoreCount indicates if the KV needs to be excluded while counting + * (used during compactions. We only count KV's that are older than all the + * scanners' read points.) * @return The match code instance. * @throws IOException in case there is an internal consistency problem * caused by a data corruption. */ public ScanQueryMatcher.MatchCode checkColumn(byte[] bytes, int offset, - int length, long ttl, byte type) + int length, long ttl, byte type, boolean ignoreCount) throws IOException; /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java index 0480270..8e25796 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java @@ -102,7 +102,7 @@ public class ExplicitColumnTracker implements ColumnTracker { */ @Override public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, - int length, long timestamp, byte type) { + int length, long timestamp, byte type, boolean ignoreCount) { // delete markers should never be passed to an // *Explicit*ColumnTracker assert !KeyValue.isDelete(type); @@ -124,6 +124,8 @@ public class ExplicitColumnTracker implements ColumnTracker { // Column Matches. If it is not a duplicate key, increment the version count // and include. if(ret == 0) { + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; + //If column matches, check if it is a duplicate timestamp if (sameAsPreviousTS(timestamp)) { //If duplicate, skip this Key diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 00fd086..537e173 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -212,6 +212,29 @@ public class HRegion implements HeapSize { // , Writable{ final Path regiondir; KeyValue.KVComparator comparator; + private ConcurrentHashMap scannerReadPoints; + + /* + * @return The smallest mvcc readPoint across all the scanners in this + * region. Writes older than this readPoint, are included in every + * read operation. + */ + public long getSmallestReadPoint() { + long minimumReadPoint; + // We need to ensure that while we are calculating the smallestReadPoint + // no new RegionScanners can grab a readPoint that we are unaware of. + // We achieve this by synchronizing on the scannerReadPoints object. + synchronized(scannerReadPoints) { + minimumReadPoint = mvcc.memstoreReadPoint(); + + for (Long readPoint: this.scannerReadPoints.values()) { + if (readPoint < minimumReadPoint) { + minimumReadPoint = readPoint; + } + } + } + return minimumReadPoint; + } /* * Data structure of write state flags used coordinating flushes, * compactions and closes. @@ -268,8 +291,8 @@ public class HRegion implements HeapSize { // , Writable{ private boolean splitRequest; private byte[] explicitSplitPoint = null; - private final ReadWriteConsistencyControl rwcc = - new ReadWriteConsistencyControl(); + private final MultiVersionConsistencyControl mvcc = + new MultiVersionConsistencyControl(); // Coprocessor host private RegionCoprocessorHost coprocessorHost; @@ -371,6 +394,7 @@ public class HRegion implements HeapSize { // , Writable{ this.htableDescriptor = null; this.threadWakeFrequency = 0L; this.coprocessorHost = null; + this.scannerReadPoints = new ConcurrentHashMap(); } /** @@ -414,6 +438,7 @@ public class HRegion implements HeapSize { // , Writable{ String encodedNameStr = this.regionInfo.getEncodedName(); setHTableSpecificConf(); this.regiondir = getRegionDir(this.tableDir, encodedNameStr); + this.scannerReadPoints = new ConcurrentHashMap(); // don't initialize coprocessors if not running within a regionserver // TODO: revisit if coprocessors should load in other cases @@ -495,6 +520,8 @@ public class HRegion implements HeapSize { // , Writable{ // min across all the max. long minSeqId = -1; long maxSeqId = -1; + // initialized to -1 so that we pick up MemstoreTS from column families + long maxMemstoreTS = -1; for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) { status.setStatus("Instantiating store for column family " + c); Store store = instantiateHStore(this.tableDir, c); @@ -506,7 +533,12 @@ public class HRegion implements HeapSize { // , Writable{ if (maxSeqId == -1 || storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } + long maxStoreMemstoreTS = store.getMaxMemstoreTS(); + if (maxStoreMemstoreTS > maxMemstoreTS) { + maxMemstoreTS = maxStoreMemstoreTS; + } } + mvcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( this.regiondir, minSeqId, reporter, status)); @@ -715,8 +747,8 @@ public class HRegion implements HeapSize { // , Writable{ } } - public ReadWriteConsistencyControl getRWCC() { - return rwcc; + public MultiVersionConsistencyControl getMVCC() { + return mvcc; } /** @@ -1239,7 +1271,7 @@ public class HRegion implements HeapSize { // , Writable{ // during the flush long sequenceId = -1L; long completeSequenceId = -1L; - ReadWriteConsistencyControl.WriteEntry w = null; + MultiVersionConsistencyControl.WriteEntry w = null; // We have to take a write lock during snapshot, or else a write could // end up in both snapshot and memstore (makes it difficult to do atomic @@ -1250,9 +1282,9 @@ public class HRegion implements HeapSize { // , Writable{ long currentMemStoreSize = 0; List storeFlushers = new ArrayList(stores.size()); try { - // Record the rwcc for all transactions in progress. - w = rwcc.beginMemstoreInsert(); - rwcc.advanceMemstore(w); + // Record the mvcc for all transactions in progress. + w = mvcc.beginMemstoreInsert(); + mvcc.advanceMemstore(w); sequenceId = (wal == null)? myseqid : wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes()); @@ -1269,15 +1301,15 @@ public class HRegion implements HeapSize { // , Writable{ } finally { this.updatesLock.writeLock().unlock(); } - status.setStatus("Waiting for rwcc"); - LOG.debug("Finished snapshotting, commencing waiting for rwcc"); + status.setStatus("Waiting for mvcc"); + LOG.debug("Finished snapshotting, commencing waiting for mvcc"); // wait for all in-progress transactions to commit to HLog before // we can start the flush. This prevents // uncommitted transactions from being written into HFiles. // We have to block before we start the flush, otherwise keys that // were removed via a rollbackMemstore could be written to Hfiles. - rwcc.waitForRead(w); + mvcc.waitForRead(w); status.setStatus("Flushing stores"); LOG.debug("Finished snapshotting, commencing flushing stores"); @@ -1666,6 +1698,8 @@ public class HRegion implements HeapSize { // , Writable{ this.put(put, lockid, put.getWriteToWAL()); } + + /** * @param put * @param lockid @@ -1793,7 +1827,7 @@ public class HRegion implements HeapSize { // , Writable{ } } - ReadWriteConsistencyControl.WriteEntry w = null; + MultiVersionConsistencyControl.WriteEntry w = null; long txid = 0; boolean walSyncSuccessful = false; boolean locked = false; @@ -1883,17 +1917,17 @@ public class HRegion implements HeapSize { // , Writable{ // // ------------------------------------ - // Acquire the latest rwcc number + // Acquire the latest mvcc number // ---------------------------------- - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); // ------------------------------------ // STEP 3. Write back to memstore // Write to memstore. It is ok to write to memstore // first without updating the HLog because we do not roll - // forward the memstore RWCC. The RWCC will be moved up when + // forward the memstore MVCC. The MVCC will be moved up when // the complete operation is done. These changes are not yet - // visible to scanners till we update the RWCC. The RWCC is + // visible to scanners till we update the MVCC. The MVCC is // moved only when the sync is complete. // ---------------------------------- long addedSize = 0; @@ -1951,10 +1985,10 @@ public class HRegion implements HeapSize { // , Writable{ } walSyncSuccessful = true; // ------------------------------------------------------------------ - // STEP 8. Advance rwcc. This will make this put visible to scanners and getters. + // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ if (w != null) { - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); w = null; } @@ -1982,7 +2016,7 @@ public class HRegion implements HeapSize { // , Writable{ if (!walSyncSuccessful) { rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); } - if (w != null) rwcc.completeMemstoreInsert(w); + if (w != null) mvcc.completeMemstoreInsert(w); if (locked) { this.updatesLock.readLock().unlock(); @@ -2254,20 +2288,20 @@ public class HRegion implements HeapSize { // , Writable{ * not check the families for validity. * * @param familyMap Map of kvs per family - * @param localizedWriteEntry The WriteEntry of the RWCC for this transaction. - * If null, then this method internally creates a rwcc transaction. + * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. + * If null, then this method internally creates a mvcc transaction. * @return the additional memory usage of the memstore caused by the * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - ReadWriteConsistencyControl.WriteEntry localizedWriteEntry) { + MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) { long size = 0; - boolean freerwcc = false; + boolean freemvcc = false; try { if (localizedWriteEntry == null) { - localizedWriteEntry = rwcc.beginMemstoreInsert(); - freerwcc = true; + localizedWriteEntry = mvcc.beginMemstoreInsert(); + freemvcc = true; } for (Map.Entry> e : familyMap.entrySet()) { @@ -2281,12 +2315,13 @@ public class HRegion implements HeapSize { // , Writable{ } } } finally { - if (freerwcc) { - rwcc.completeMemstoreInsert(localizedWriteEntry); + if (freemvcc) { + mvcc.completeMemstoreInsert(localizedWriteEntry); } } - return size; - } + + return size; + } /** * Remove all the keys listed in the map from the memstore. This method is @@ -2963,6 +2998,7 @@ public class HRegion implements HeapSize { // , Writable{ } RegionScannerImpl(Scan scan, List additionalScanners) throws IOException { //DebugPrint.println("HRegionScanner."); + this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { @@ -2974,8 +3010,13 @@ public class HRegion implements HeapSize { // , Writable{ // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); - + // synchronize on scannerReadPoints so that nobody calculates + // getSmallestReadPoint, before scannerReadPoints is updated. + synchronized(scannerReadPoints) { + this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); + scannerReadPoints.put(this, this.readPt); + } + List scanners = new ArrayList(); if (additionalScanners != null) { scanners.addAll(additionalScanners); @@ -2984,7 +3025,8 @@ public class HRegion implements HeapSize { // , Writable{ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - scanners.add(store.getScanner(scan, entry.getValue())); + StoreScanner scanner = store.getScanner(scan, entry.getValue()); + scanners.add(scanner); } this.storeHeap = new KeyValueHeap(scanners, comparator); } @@ -3015,7 +3057,7 @@ public class HRegion implements HeapSize { // , Writable{ try { // This could be a new thread from the last time we called next(). - ReadWriteConsistencyControl.setThreadReadPoint(this.readPt); + MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); results.clear(); @@ -3135,6 +3177,8 @@ public class HRegion implements HeapSize { // , Writable{ storeHeap.close(); storeHeap = null; } + // no need to sychronize here. + scannerReadPoints.remove(this); this.filterClosed = true; } @@ -3840,7 +3884,7 @@ public class HRegion implements HeapSize { // , Writable{ */ public Result append(Append append, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use RWCC to make this set of appends atomic to reads + // TODO: Use MVCC to make this set of appends atomic to reads byte[] row = append.getRow(); checkRow(row, "append"); boolean flush = false; @@ -3980,7 +4024,7 @@ public class HRegion implements HeapSize { // , Writable{ public Result increment(Increment increment, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use RWCC to make this set of increments atomic to reads + // TODO: Use MVCC to make this set of increments atomic to reads byte [] row = increment.getRow(); checkRow(row, "increment"); TimeRange tr = increment.getTimeRange(); @@ -4186,7 +4230,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 28 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + + 29 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); @@ -4195,12 +4239,12 @@ public class HRegion implements HeapSize { // , Writable{ (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing ClassSize.ATOMIC_LONG + // memStoreSize ClassSize.ATOMIC_INTEGER + // lockIdGenerator - (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds + (3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints WriteState.HEAP_SIZE + // writestate ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock ClassSize.ARRAYLIST + // recentFlushes - ReadWriteConsistencyControl.FIXED_SIZE // rwcc + MultiVersionConsistencyControl.FIXED_SIZE // mvcc ; @Override @@ -4209,7 +4253,7 @@ public class HRegion implements HeapSize { // , Writable{ for(Store store : this.stores.values()) { heapSize += store.heapSize(); } - // this does not take into account row locks, recent flushes, rwcc entries + // this does not take into account row locks, recent flushes, mvcc entries return heapSize; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 747a90b..5c2d72c 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -673,6 +673,10 @@ public class MemStore implements HeapSize { private Iterator kvsetIt; private Iterator snapshotIt; + // The kvset and snapshot at the time of creating this scanner + volatile KeyValueSkipListSet kvsetAtCreation; + volatile KeyValueSkipListSet snapshotAtCreation; + // Sub lists on which we're iterating private SortedSet kvTail; private SortedSet snapshotTail; @@ -703,10 +707,13 @@ public class MemStore implements HeapSize { MemStoreScanner() { super(); + + kvsetAtCreation = kvset; + snapshotAtCreation = snapshot; } protected KeyValue getNext(Iterator it) { - long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); + long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); while (it.hasNext()) { KeyValue v = it.next(); @@ -734,8 +741,8 @@ public class MemStore implements HeapSize { // kvset and snapshot will never be null. // if tailSet can't find anything, SortedSet is empty (not null). - kvTail = kvset.tailSet(key); - snapshotTail = snapshot.tailSet(key); + kvTail = kvsetAtCreation.tailSet(key); + snapshotTail = snapshotAtCreation.tailSet(key); return seekInSubLists(key); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java new file mode 100644 index 0000000..a892f4b --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -0,0 +1,213 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.LinkedList; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +/** + * Manages the read/write consistency within memstore. This provides + * an interface for readers to determine what entries to ignore, and + * a mechanism for writers to obtain new write numbers, then "commit" + * the new writes for readers to read (thus forming atomic transactions). + */ +public class MultiVersionConsistencyControl { + private volatile long memstoreRead = 0; + private volatile long memstoreWrite = 0; + + private final Object readWaiters = new Object(); + + // This is the pending queue of writes. + private final LinkedList writeQueue = + new LinkedList(); + + private static final ThreadLocal perThreadReadPoint = + new ThreadLocal() { + @Override + protected + Long initialValue() { + return Long.MAX_VALUE; + } + }; + + /** + * Default constructor. Initializes the memstoreRead/Write points to 0. + */ + public MultiVersionConsistencyControl() { + this.memstoreRead = this.memstoreWrite = 0; + } + + /** + * Initializes the memstoreRead/Write points appropriately. + * @param startPoint + */ + public void initialize(long startPoint) { + synchronized (writeQueue) { + if (this.memstoreWrite != this.memstoreRead) { + throw new RuntimeException("Already used this mvcc. Too late to initialize"); + } + + this.memstoreRead = this.memstoreWrite = startPoint; + } + } + + /** + * Get this thread's read point. Used primarily by the memstore scanner to + * know which values to skip (ie: have not been completed/committed to + * memstore). + */ + public static long getThreadReadPoint() { + return perThreadReadPoint.get(); + } + + /** + * Set the thread read point to the given value. The thread MVCC + * is used by the Memstore scanner so it knows which values to skip. + * Give it a value of 0 if you want everything. + */ + public static void setThreadReadPoint(long readPoint) { + perThreadReadPoint.set(readPoint); + } + + /** + * Set the thread MVCC read point to whatever the current read point is in + * this particular instance of MVCC. Returns the new thread read point value. + */ + public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) { + perThreadReadPoint.set(mvcc.memstoreReadPoint()); + return getThreadReadPoint(); + } + + /** + * Set the thread MVCC read point to 0 (include everything). + */ + public static void resetThreadReadPoint() { + perThreadReadPoint.set(0L); + } + + public WriteEntry beginMemstoreInsert() { + synchronized (writeQueue) { + long nextWriteNumber = ++memstoreWrite; + WriteEntry e = new WriteEntry(nextWriteNumber); + writeQueue.add(e); + return e; + } + } + + public void completeMemstoreInsert(WriteEntry e) { + advanceMemstore(e); + waitForRead(e); + } + + boolean advanceMemstore(WriteEntry e) { + synchronized (writeQueue) { + e.markCompleted(); + + long nextReadValue = -1; + boolean ranOnce=false; + while (!writeQueue.isEmpty()) { + ranOnce=true; + WriteEntry queueFirst = writeQueue.getFirst(); + + if (nextReadValue > 0) { + if (nextReadValue+1 != queueFirst.getWriteNumber()) { + throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " + + nextReadValue + " next: " + queueFirst.getWriteNumber()); + } + } + + if (queueFirst.isCompleted()) { + nextReadValue = queueFirst.getWriteNumber(); + writeQueue.removeFirst(); + } else { + break; + } + } + + if (!ranOnce) { + throw new RuntimeException("never was a first"); + } + + if (nextReadValue > 0) { + synchronized (readWaiters) { + memstoreRead = nextReadValue; + readWaiters.notifyAll(); + } + } + if (memstoreRead >= e.getWriteNumber()) { + return true; + } + return false; + } + } + + /** + * Wait for the global readPoint to advance upto + * the specified transaction number. + */ + public void waitForRead(WriteEntry e) { + boolean interrupted = false; + synchronized (readWaiters) { + while (memstoreRead < e.getWriteNumber()) { + try { + readWaiters.wait(0); + } catch (InterruptedException ie) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; + } + } + } + if (interrupted) Thread.currentThread().interrupt(); + } + + public long memstoreReadPoint() { + return memstoreRead; + } + + + public static class WriteEntry { + private long writeNumber; + private boolean completed = false; + WriteEntry(long writeNumber) { + this.writeNumber = writeNumber; + } + void markCompleted() { + this.completed = true; + } + boolean isCompleted() { + return this.completed; + } + long getWriteNumber() { + return this.writeNumber; + } + } + + public static final long FIXED_SIZE = ClassSize.align( + ClassSize.OBJECT + + 2 * Bytes.SIZEOF_LONG + + 2 * ClassSize.REFERENCE); + +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java deleted file mode 100644 index e68d986..0000000 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.LinkedList; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; - -/** - * Manages the read/write consistency within memstore. This provides - * an interface for readers to determine what entries to ignore, and - * a mechanism for writers to obtain new write numbers, then "commit" - * the new writes for readers to read (thus forming atomic transactions). - */ -public class ReadWriteConsistencyControl { - private volatile long memstoreRead = 0; - private volatile long memstoreWrite = 0; - - private final Object readWaiters = new Object(); - - // This is the pending queue of writes. - private final LinkedList writeQueue = - new LinkedList(); - - private static final ThreadLocal perThreadReadPoint = - new ThreadLocal(); - - /** - * Get this thread's read point. Used primarily by the memstore scanner to - * know which values to skip (ie: have not been completed/committed to - * memstore). - */ - public static long getThreadReadPoint() { - return perThreadReadPoint.get(); - } - - /** - * Set the thread read point to the given value. The thread RWCC - * is used by the Memstore scanner so it knows which values to skip. - * Give it a value of 0 if you want everything. - */ - public static void setThreadReadPoint(long readPoint) { - perThreadReadPoint.set(readPoint); - } - - /** - * Set the thread RWCC read point to whatever the current read point is in - * this particular instance of RWCC. Returns the new thread read point value. - */ - public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) { - perThreadReadPoint.set(rwcc.memstoreReadPoint()); - return getThreadReadPoint(); - } - - /** - * Set the thread RWCC read point to 0 (include everything). - */ - public static void resetThreadReadPoint() { - perThreadReadPoint.set(0L); - } - - public WriteEntry beginMemstoreInsert() { - synchronized (writeQueue) { - long nextWriteNumber = ++memstoreWrite; - WriteEntry e = new WriteEntry(nextWriteNumber); - writeQueue.add(e); - return e; - } - } - - public void completeMemstoreInsert(WriteEntry e) { - advanceMemstore(e); - waitForRead(e); - } - - boolean advanceMemstore(WriteEntry e) { - synchronized (writeQueue) { - e.markCompleted(); - - long nextReadValue = -1; - boolean ranOnce=false; - while (!writeQueue.isEmpty()) { - ranOnce=true; - WriteEntry queueFirst = writeQueue.getFirst(); - - if (nextReadValue > 0) { - if (nextReadValue+1 != queueFirst.getWriteNumber()) { - throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " - + nextReadValue + " next: " + queueFirst.getWriteNumber()); - } - } - - if (queueFirst.isCompleted()) { - nextReadValue = queueFirst.getWriteNumber(); - writeQueue.removeFirst(); - } else { - break; - } - } - - if (!ranOnce) { - throw new RuntimeException("never was a first"); - } - - if (nextReadValue > 0) { - synchronized (readWaiters) { - memstoreRead = nextReadValue; - readWaiters.notifyAll(); - } - } - if (memstoreRead >= e.getWriteNumber()) { - return true; - } - return false; - } - } - - /** - * Wait for the global readPoint to advance upto - * the specified transaction number. - */ - public void waitForRead(WriteEntry e) { - boolean interrupted = false; - synchronized (readWaiters) { - while (memstoreRead < e.getWriteNumber()) { - try { - readWaiters.wait(0); - } catch (InterruptedException ie) { - // We were interrupted... finish the loop -- i.e. cleanup --and then - // on our way out, reset the interrupt flag. - interrupted = true; - } - } - } - if (interrupted) Thread.currentThread().interrupt(); - } - - public long memstoreReadPoint() { - return memstoreRead; - } - - - public static class WriteEntry { - private long writeNumber; - private boolean completed = false; - WriteEntry(long writeNumber) { - this.writeNumber = writeNumber; - } - void markCompleted() { - this.completed = true; - } - boolean isCompleted() { - return this.completed; - } - long getWriteNumber() { - return this.writeNumber; - } - } - - public static final long FIXED_SIZE = ClassSize.align( - ClassSize.OBJECT + - 2 * Bytes.SIZEOF_LONG + - 2 * ClassSize.REFERENCE); - -} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index acea2f2..f8a7b28 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -92,6 +92,9 @@ public class ScanQueryMatcher { */ private final long earliestPutTs; + /** readPoint over which the KVs are unconditionally included */ + protected long maxReadPointToTrackVersions; + /** * This variable shows whether there is an null column in the query. There * always exists a null column in the wildcard column query. @@ -110,6 +113,7 @@ public class ScanQueryMatcher { */ public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo, NavigableSet columns, StoreScanner.ScanType scanType, + long readPointToUse, long earliestPutTs) { this.tr = scan.getTimeRange(); this.rowComparator = scanInfo.getComparator().getRawComparator(); @@ -119,6 +123,7 @@ public class ScanQueryMatcher { scanInfo.getFamily()); this.filter = scan.getFilter(); this.earliestPutTs = earliestPutTs; + this.maxReadPointToTrackVersions = readPointToUse; /* how to deal with deletes */ // keep deleted cells: if compaction or raw scan @@ -153,6 +158,7 @@ public class ScanQueryMatcher { ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo, NavigableSet columns) { this(scan, scanInfo, columns, StoreScanner.ScanType.USER_SCAN, + Long.MAX_VALUE, /* max Readpoint to track versions */ HConstants.LATEST_TIMESTAMP); } @@ -315,7 +321,7 @@ public class ScanQueryMatcher { } MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, - timestamp, type); + timestamp, type, kv.getMemstoreTS() > maxReadPointToTrackVersions); /* * According to current implementation, colChecker can only be * SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java index ec8ed7f..1f1115d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java @@ -64,17 +64,20 @@ public class ScanWildcardColumnTracker implements ColumnTracker { */ @Override public MatchCode checkColumn(byte[] bytes, int offset, int length, - long timestamp, byte type) throws IOException { + long timestamp, byte type, boolean ignoreCount) throws IOException { if (columnBuffer == null) { // first iteration. resetBuffer(bytes, offset, length); + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; // do not count a delete marker as another version return checkVersion(type, timestamp); } int cmp = Bytes.compareTo(bytes, offset, length, columnBuffer, columnOffset, columnLength); if (cmp == 0) { + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; + //If column matches, check if it is a duplicate timestamp if (sameAsPreviousTSAndType(timestamp, type)) { return ScanQueryMatcher.MatchCode.SKIP; @@ -88,6 +91,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker { if (cmp > 0) { // switched columns, lets do something.x resetBuffer(bytes, offset, length); + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; return checkVersion(type, timestamp); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index fec5547..52056f2 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -238,6 +238,13 @@ public class Store extends SchemaConfigured implements HeapSize { } /** + * @return The maximum memstoreTS in all store files. + */ + public long getMaxMemstoreTS() { + return StoreFile.getMaxMemstoreTSInList(this.getStorefiles()); + } + + /** * @param tabledir * @param encodedName Encoded region name. * @param family @@ -417,10 +424,15 @@ public class Store extends SchemaConfigured implements HeapSize { ArrayList newFiles = new ArrayList(storefiles); newFiles.add(sf); this.storefiles = sortAndClone(newFiles); - notifyChangedReadersObservers(); } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. this.lock.writeLock().unlock(); } + notifyChangedReadersObservers(); LOG.info("Successfully loaded store file " + srcPath + " into store " + this + " (new location: " + dstPath + ")"); } @@ -507,6 +519,9 @@ public class Store extends SchemaConfigured implements HeapSize { MonitoredTask status) throws IOException { StoreFile.Writer writer; + String fileName; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = region.getSmallestReadPoint(); long flushed = 0; Path pathName; // Don't flush if there are no entries. @@ -520,7 +535,8 @@ public class Store extends SchemaConfigured implements HeapSize { // treat this as a minor compaction. InternalScanner scanner = new StoreScanner(this, scan, Collections .singletonList(new CollectionBackedScanner(set, this.comparator)), - ScanType.MINOR_COMPACT, HConstants.OLDEST_TIMESTAMP); + ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP); try { // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem @@ -538,6 +554,14 @@ public class Store extends SchemaConfigured implements HeapSize { hasMore = scanner.next(kvs); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to disk. + if (kv.getMemstoreTS() <= smallestReadPoint) { + // let us not change the original KV. It could be in the memstore + // changing its memstoreTS could affect other threads/scanners. + kv = kv.shallowCopy(); + kv.setMemstoreTS(0); + } writer.append(kv); flushed += this.memstore.heapSizeChange(kv, true); } @@ -656,15 +680,21 @@ public class Store extends SchemaConfigured implements HeapSize { ArrayList newList = new ArrayList(storefiles); newList.add(sf); storefiles = sortAndClone(newList); - this.memstore.clearSnapshot(set); - - // Tell listeners of the change in readers. - notifyChangedReadersObservers(); - return needsCompaction(); + this.memstore.clearSnapshot(set); } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. this.lock.writeLock().unlock(); } + + // Tell listeners of the change in readers. + notifyChangedReadersObservers(); + + return needsCompaction(); } /* @@ -677,6 +707,35 @@ public class Store extends SchemaConfigured implements HeapSize { } } + protected List getScanners(boolean cacheBlocks, + boolean isGet, + boolean isCompaction, + ScanQueryMatcher matcher) throws IOException { + List storeFiles; + List memStoreScanners; + this.lock.readLock().lock(); + try { + storeFiles = this.getStorefiles(); + memStoreScanners = this.memstore.getScanners(); + } finally { + this.lock.readLock().unlock(); + } + + // First the store file scanners + + // TODO this used to get the store files in descending order, + // but now we get them in ascending order, which I think is + // actually more correct, since memstore get put at the end. + List sfScanners = StoreFileScanner + .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher); + List scanners = + new ArrayList(sfScanners.size()+1); + scanners.addAll(sfScanners); + // Then the memstore scanners + scanners.addAll(memStoreScanners); + return scanners; + } + /* * @param o Observer who wants to know about changes in set of Readers */ @@ -1224,6 +1283,9 @@ public class Store extends SchemaConfigured implements HeapSize { // Make the instantiation lazy in case compaction produces no product; i.e. // where all source cells are expired or deleted. StoreFile.Writer writer = null; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = region.getSmallestReadPoint(); + MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); try { InternalScanner scanner = null; try { @@ -1232,7 +1294,7 @@ public class Store extends SchemaConfigured implements HeapSize { /* include deletes, unless we are doing a major compaction */ scanner = new StoreScanner(this, scan, scanners, majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, - earliestPutTs); + smallestReadPoint, earliestPutTs); if (region.getCoprocessorHost() != null) { InternalScanner cpScanner = region.getCoprocessorHost().preCompact( this, scanner); @@ -1259,6 +1321,9 @@ public class Store extends SchemaConfigured implements HeapSize { if (writer != null) { // output to writer: for (KeyValue kv : kvs) { + if (kv.getMemstoreTS() <= smallestReadPoint) { + kv.setMemstoreTS(0); + } writer.append(kv); // update progress per key ++progress.currentCompactedKVs; @@ -1361,8 +1426,8 @@ public class Store extends SchemaConfigured implements HeapSize { this.family.getBloomFilterType()); result.createReader(); } - this.lock.writeLock().lock(); try { + this.lock.writeLock().lock(); try { // Change this.storefiles so it reflects new state but do not // delete old store files until we have sent out notification of @@ -1378,34 +1443,40 @@ public class Store extends SchemaConfigured implements HeapSize { } this.storefiles = sortAndClone(newStoreFiles); + } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. + this.lock.writeLock().unlock(); + } - // Tell observers that list of StoreFiles has changed. - notifyChangedReadersObservers(); - // Finally, delete old store files. - for (StoreFile hsf: compactedFiles) { - hsf.deleteReader(); - } - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files in " + this.storeNameStr + - ". Compacted file is " + (result == null? "none": result.toString()) + - ". Files replaced " + compactedFiles.toString() + - " some of which may have been already removed", e); + // Tell observers that list of StoreFiles has changed. + notifyChangedReadersObservers(); + // Finally, delete old store files. + for (StoreFile hsf: compactedFiles) { + hsf.deleteReader(); } - // 4. Compute new store size - this.storeSize = 0L; - this.totalUncompressedBytes = 0L; - for (StoreFile hsf : this.storefiles) { - StoreFile.Reader r = hsf.getReader(); - if (r == null) { - LOG.warn("StoreFile " + hsf + " has a null Reader"); - continue; - } - this.storeSize += r.length(); - this.totalUncompressedBytes += r.getTotalUncompressedBytes(); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.error("Failed replacing compacted files in " + this.storeNameStr + + ". Compacted file is " + (result == null? "none": result.toString()) + + ". Files replaced " + compactedFiles.toString() + + " some of which may have been already removed", e); + } + + // 4. Compute new store size + this.storeSize = 0L; + this.totalUncompressedBytes = 0L; + for (StoreFile hsf : this.storefiles) { + StoreFile.Reader r = hsf.getReader(); + if (r == null) { + LOG.warn("StoreFile " + hsf + " has a null Reader"); + continue; } - } finally { - this.lock.writeLock().unlock(); + this.storeSize += r.length(); + this.totalUncompressedBytes += r.getTotalUncompressedBytes(); } return result; } @@ -1707,7 +1778,7 @@ public class Store extends SchemaConfigured implements HeapSize { * Return a scanner for both the memstore and the HStore files * @throws IOException */ - public KeyValueScanner getScanner(Scan scan, + public StoreScanner getScanner(Scan scan, final NavigableSet targetCols) throws IOException { lock.readLock().lock(); try { @@ -1871,7 +1942,7 @@ public class Store extends SchemaConfigured implements HeapSize { throws IOException { this.lock.readLock().lock(); try { - // TODO: Make this operation atomic w/ RWCC + // TODO: Make this operation atomic w/ MVCC return this.memstore.upsert(kvs); } finally { this.lock.readLock().unlock(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 8fa63b8..13c46a4 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileWriterV1; +import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; import org.apache.hadoop.hbase.util.BloomFilter; @@ -156,6 +157,18 @@ public class StoreFile { // Set when we obtain a Reader. private long sequenceid = -1; + // max of the MemstoreTS in the KV's in this store + // Set when we obtain a Reader. + private long maxMemstoreTS = -1; + + public long getMaxMemstoreTS() { + return maxMemstoreTS; + } + + public void setMaxMemstoreTS(long maxMemstoreTS) { + this.maxMemstoreTS = maxMemstoreTS; + } + // If true, this file was product of a major compaction. Its then set // whenever you get a Reader. private AtomicBoolean majorCompaction = null; @@ -343,6 +356,24 @@ public class StoreFile { } /** + * Return the largest memstoreTS found across all storefiles in + * the given list. Store files that were created by a mapreduce + * bulk load are ignored, as they do not correspond to any specific + * put operation, and thus do not have a memstoreTS associated with them. + * @return 0 if no non-bulk-load files are provided or, this is Store that + * does not yet have any store files. + */ + public static long getMaxMemstoreTSInList(Collection sfs) { + long max = 0; + for (StoreFile sf : sfs) { + if (!sf.isBulkLoadResult()) { + max = Math.max(max, sf.getMaxMemstoreTS()); + } + } + return max; + } + + /** * Return the highest sequence ID found across all storefiles in * the given list. Store files that were created by a mapreduce * bulk load are ignored, as they do not correspond to any edit @@ -491,6 +522,11 @@ public class StoreFile { } this.reader.setSequenceID(this.sequenceid); + b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); + if (b != null) { + this.maxMemstoreTS = Bytes.toLong(b); + } + b = metadataMap.get(MAJOR_COMPACTION_KEY); if (b != null) { boolean mc = Bytes.toBoolean(b); @@ -1119,7 +1155,7 @@ public class StoreFile { boolean isCompaction) { return new StoreFileScanner(this, getScanner(cacheBlocks, pread, - isCompaction)); + isCompaction), !isCompaction); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 13d0c8c..4bb53dc 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -51,6 +51,8 @@ class StoreFileScanner implements KeyValueScanner { private boolean realSeekDone; private boolean delayedReseek; private KeyValue delayedSeekKV; + + private boolean enforceMVCC = false; //The variable, realSeekDone, may cheat on store file scanner for the // multi-column bloom-filter optimization. @@ -65,9 +67,10 @@ class StoreFileScanner implements KeyValueScanner { * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param hfs HFile scanner */ - public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs) { + public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC) { this.reader = reader; this.hfs = hfs; + this.enforceMVCC = useMVCC; } /** @@ -122,11 +125,13 @@ class StoreFileScanner implements KeyValueScanner { public KeyValue next() throws IOException { KeyValue retKey = cur; + try { // only seek if we aren't at the end. cur == null implies 'end'. if (cur != null) { hfs.next(); cur = hfs.getKeyValue(); + skipKVsNewerThanReadpoint(); } } catch(IOException e) { throw new IOException("Could not iterate " + this, e); @@ -136,6 +141,7 @@ class StoreFileScanner implements KeyValueScanner { public boolean seek(KeyValue key) throws IOException { seekCount.incrementAndGet(); + try { try { if(!seekAtOrAfter(hfs, key)) { @@ -145,7 +151,8 @@ class StoreFileScanner implements KeyValueScanner { this.isReseekable = true; cur = hfs.getKeyValue(); - return true; + + return skipKVsNewerThanReadpoint(); } finally { realSeekDone = true; } @@ -156,6 +163,7 @@ class StoreFileScanner implements KeyValueScanner { public boolean reseek(KeyValue key) throws IOException { seekCount.incrementAndGet(); + try { try { if (!reseekAtOrAfter(hfs, key)) { @@ -163,7 +171,8 @@ class StoreFileScanner implements KeyValueScanner { return false; } cur = hfs.getKeyValue(); - return true; + + return skipKVsNewerThanReadpoint(); } finally { realSeekDone = true; } @@ -172,6 +181,35 @@ class StoreFileScanner implements KeyValueScanner { } } + protected boolean skipKVsNewerThanReadpoint() throws IOException { + long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); + + // We want to ignore all key-values that are newer than our current + // readPoint + while(enforceMVCC + && cur != null + && (cur.getMemstoreTS() > readPoint)) { + hfs.next(); + cur = hfs.getKeyValue(); + } + + if (cur == null) { + close(); + return false; + } + + // For the optimisation in HBASE-4346, we set the KV's memstoreTS to + // 0, if it is older than all the scanners' read points. It is possible + // that a newer KV's memstoreTS was reset to 0. But, there is an + // older KV which was not reset to 0 (because it was + // not old enough during flush). Make sure that we set it correctly now, + // so that the comparision order does not change. + if (cur.getMemstoreTS() <= readPoint) { + cur.setMemstoreTS(0); + } + return true; + } + public void close() { // Nothing to close on HFileScanner? cur = null; @@ -329,5 +367,4 @@ class StoreFileScanner implements KeyValueScanner { static final long getSeekCount() { return seekCount.get(); } - } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 6512a54..f34ecfd 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -88,14 +88,14 @@ class StoreScanner extends NonLazyKeyValueScanner * @throws IOException */ StoreScanner(Store store, Scan scan, final NavigableSet columns) - throws IOException { + throws IOException { this(store, scan.getCacheBlocks(), scan, columns); if (columns != null && scan.isRaw()) { throw new DoNotRetryIOException( "Cannot specify any column for a raw scan"); } matcher = new ScanQueryMatcher(scan, store.scanInfo, columns, - ScanType.USER_SCAN, HConstants.LATEST_TIMESTAMP); + ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP); // Pass columns to try to filter out unnecessary StoreFiles. List scanners = getScanners(scan, columns); @@ -127,13 +127,15 @@ class StoreScanner extends NonLazyKeyValueScanner * @param store who we scan * @param scan the spec * @param scanners ancilliary scanners + * @param smallestReadPoint the readPoint that we should use for tracking versions + * @param retainDeletesInOutput should we retain deletes after compaction? */ StoreScanner(Store store, Scan scan, List scanners, ScanType scanType, - long earliestPutTs) throws IOException { + long smallestReadPoint, long earliestPutTs) throws IOException { this(store, false, scan, null); matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType, - earliestPutTs); + smallestReadPoint, earliestPutTs); // Seek all scanners to the initial key for(KeyValueScanner scanner : scanners) { @@ -150,7 +152,7 @@ class StoreScanner extends NonLazyKeyValueScanner final List scanners) throws IOException { this(null, scan.getCacheBlocks(), scan, columns); this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, - HConstants.LATEST_TIMESTAMP); + Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP); // Seek all scanners to the initial key for (KeyValueScanner scanner : scanners) { @@ -163,20 +165,7 @@ class StoreScanner extends NonLazyKeyValueScanner * @return List of scanners ordered properly. */ private List getScanners() throws IOException { - // First the store file scanners - - // TODO this used to get the store files in descending order, - // but now we get them in ascending order, which I think is - // actually more correct, since memstore get put at the end. - List sfScanners = StoreFileScanner - .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet, - false); - List scanners = - new ArrayList(sfScanners.size()+1); - scanners.addAll(sfScanners); - // Then the memstore scanners - scanners.addAll(this.store.memstore.getScanners()); - return scanners; + return this.store.getScanners(cacheBlocks, isGet, false, null); } /* @@ -194,25 +183,27 @@ class StoreScanner extends NonLazyKeyValueScanner memOnly = false; filesOnly = false; } - List scanners = new LinkedList(); - // First the store file scanners - if (memOnly == false) { - List sfScanners = StoreFileScanner - .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, - isGet, false, this.matcher); - - // include only those scan files which pass all filters - for (StoreFileScanner sfs : sfScanners) { - if (sfs.shouldSeek(scan, columns)) { - scanners.add(sfs); + List allStoreScanners = + this.store.getScanners(cacheBlocks, isGet, false, this.matcher); + + List scanners = + new ArrayList(allStoreScanners.size()); + + // include only those scan files which pass all filters + for (KeyValueScanner kvs : allStoreScanners) { + if (kvs instanceof StoreFileScanner) { + if (memOnly == false && ((StoreFileScanner)kvs).shouldSeek(scan, columns)) { + scanners.add(kvs); + } + } + else { + // kvs is a MemStoreScanner + if (filesOnly == false && this.store.memstore.shouldSeek(scan)) { + scanners.add(kvs); } } } - - // Then the memstore scanners - if ((filesOnly == false) && (this.store.memstore.shouldSeek(scan))) { - scanners.addAll(this.store.memstore.getScanners()); - } + return scanners; } diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 3de109b..d1b7647 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.security.User; @@ -1525,7 +1525,7 @@ public class HBaseTestingUtility { */ public static List getFromStoreFile(Store store, Get get) throws IOException { - ReadWriteConsistencyControl.resetThreadReadPoint(); + MultiVersionConsistencyControl.resetThreadReadPoint(); Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, scan.getFamilyMap().get(store.getFamily().getName())); diff --git a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java index 0ec7687..b479e5f 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java +++ b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java @@ -252,6 +252,12 @@ public class TestAcidGuarantees { writers.add(writer); ctx.addThread(writer); } + // Add a flusher + ctx.addThread(new RepeatingTestThread(ctx) { + public void doAnAction() throws Exception { + util.flush(); + } + }); List getters = Lists.newArrayList(); for (int i = 0; i < numGetters; i++) { @@ -288,7 +294,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2856") public void testGetAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -299,7 +304,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2670") public void testScanAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -310,7 +314,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2670") public void testMixedAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -324,7 +327,7 @@ public class TestAcidGuarantees { Configuration c = HBaseConfiguration.create(); TestAcidGuarantees test = new TestAcidGuarantees(); test.setConf(c); - test.runTestAtomicity(5*60*1000, 5, 2, 2, 3); + test.runTestAtomicity(5000, 50, 2, 2, 3); } private void setConf(Configuration c) { diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 812f11f..58fd155 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -185,9 +185,10 @@ public class TestCacheOnWrite { } LOG.info("Block count by type: " + blockCountByType); + String countByType = blockCountByType.toString(); assertEquals( - "{DATA=1367, LEAF_INDEX=172, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", - blockCountByType.toString()); + "{DATA=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", + countByType); reader.close(); } diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java index b144ce8..371195f 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.*; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -35,8 +37,13 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -116,10 +123,36 @@ public class TestHFileWriterV2 { HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize); + // Comparator class name is stored in the trailer in version 2. + RawComparator comparator = trailer.createComparator(); + HFileBlockIndex.BlockIndexReader dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator, + trailer.getNumDataIndexLevels()); + HFileBlockIndex.BlockIndexReader metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader( + Bytes.BYTES_RAWCOMPARATOR, 1); + + HFileBlock.BlockIterator blockIter = blockReader.blockRange( + trailer.getLoadOnOpenDataOffset(), + fileSize - trailer.getTrailerSize()); + // Data index. We also read statistics about the block index written after + // the root level. + dataBlockIndexReader.readMultiLevelIndexRoot( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getDataIndexCount()); + + // Meta index. + metaBlockIndexReader.readRootIndex( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getMetaIndexCount()); + // File info + FileInfo fileInfo = new FileInfo(); + fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO)); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); + boolean includeMemstoreTS = (keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0); // Counters for the number of key/value pairs and the number of blocks int entriesRead = 0; int blocksRead = 0; + long memstoreTS = 0; // Scan blocks the way the reader would scan them fsdis.seek(0); @@ -138,6 +171,15 @@ public class TestHFileWriterV2 { byte[] value = new byte[valueLen]; buf.get(value); + if (includeMemstoreTS) { + ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(), + buf.arrayOffset() + buf.position(), buf.remaining()); + DataInputStream data_input = new DataInputStream(byte_input); + + memstoreTS = WritableUtils.readVLong(data_input); + buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS)); + } + // A brute-force check to see that all keys and values are correct. assertTrue(Bytes.compareTo(key, keys.get(entriesRead)) == 0); assertTrue(Bytes.compareTo(value, values.get(entriesRead)) == 0); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java index e95044f..f279f09 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java @@ -57,7 +57,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase { //"Match" for(byte [] col : scannerColumns){ result.add(exp.checkColumn(col, 0, col.length, ++timestamp, - KeyValue.Type.Put.getCode())); + KeyValue.Type.Put.getCode(), false)); } assertEquals(expected.size(), result.size()); @@ -83,9 +83,9 @@ public class TestExplicitColumnTracker extends HBaseTestCase { List expected = new ArrayList(); expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col1 expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); // col2 - expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col3 + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col3 expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); // col4 - expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); // col5 + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); // col5 int maxVersions = 1; //Create "Scanner" @@ -169,13 +169,15 @@ public class TestExplicitColumnTracker extends HBaseTestCase { Long.MAX_VALUE); for (int i = 0; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode()); + explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode(), + false); } explicit.update(); for (int i = 1; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode()); + explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode(), + false); } } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index 3ca7aac..bb0e500 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -121,7 +121,7 @@ public class TestFSErrorsExposed { StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf, BloomType.NONE); List scanners = StoreFileScanner.getScannersForStoreFiles( - Collections.singletonList(sf), false, true); + Collections.singletonList(sf), false, true, false); KeyValueScanner scanner = scanners.get(0); FaultyInputStream inStream = fs.inStreams.get(0).get(); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a8a548c..5daa02b 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1444,12 +1444,12 @@ public class TestHRegion extends HBaseTestCase { scan.addFamily(fam2); scan.addFamily(fam4); is = (RegionScannerImpl) region.getScanner(scan); - ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); scan = new Scan(); is = (RegionScannerImpl) region.getScanner(scan); - ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(families.length -1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 925b385..be75cb1 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -55,12 +55,12 @@ public class TestMemStore extends TestCase { private static final byte [] CONTENTS = Bytes.toBytes("contents"); private static final byte [] BASIC = Bytes.toBytes("basic"); private static final String CONTENTSTR = "contentstr"; - private ReadWriteConsistencyControl rwcc; + private MultiVersionConsistencyControl mvcc; @Override public void setUp() throws Exception { super.setUp(); - this.rwcc = new ReadWriteConsistencyControl(); + this.mvcc = new MultiVersionConsistencyControl(); this.memstore = new MemStore(); } @@ -86,7 +86,7 @@ public class TestMemStore extends TestCase { List memstorescanners = this.memstore.getScanners(); Scan scan = new Scan(); List result = new ArrayList(); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false, this.memstore.comparator); ScanType scanType = ScanType.USER_SCAN; @@ -108,7 +108,7 @@ public class TestMemStore extends TestCase { scanner.close(); } - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); memstorescanners = this.memstore.getScanners(); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); @@ -198,7 +198,7 @@ public class TestMemStore extends TestCase { private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); List memstorescanners = this.memstore.getScanners(); assertEquals(1, memstorescanners.size()); final KeyValueScanner scanner = memstorescanners.get(0); @@ -233,35 +233,35 @@ public class TestMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v = Bytes.toBytes("value"); - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv1 = new KeyValue(row, f, q1, v); kv1.setMemstoreTS(w.getWriteNumber()); memstore.add(kv1); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{}); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kv2 = new KeyValue(row, f, q2, v); kv2.setMemstoreTS(w.getWriteNumber()); memstore.add(kv2); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1, kv2}); } @@ -281,8 +281,8 @@ public class TestMemStore extends TestCase { final byte[] v2 = Bytes.toBytes("value2"); // INSERT 1: Write both columns val1 - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMemstoreTS(w.getWriteNumber()); @@ -291,15 +291,15 @@ public class TestMemStore extends TestCase { KeyValue kv12 = new KeyValue(row, f, q2, v1); kv12.setMemstoreTS(w.getWriteNumber()); memstore.add(kv12); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kv21 = new KeyValue(row, f, q1, v2); kv21.setMemstoreTS(w.getWriteNumber()); memstore.add(kv21); @@ -309,17 +309,17 @@ public class TestMemStore extends TestCase { memstore.add(kv22); // BEFORE COMPLETING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE INSERT 2 - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // See HBASE-1485 for discussion about what we should do with // the duplicate-TS inserts - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); } @@ -336,8 +336,8 @@ public class TestMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMemstoreTS(w.getWriteNumber()); @@ -346,30 +346,30 @@ public class TestMemStore extends TestCase { KeyValue kv12 = new KeyValue(row, f, q2, v1); kv12.setMemstoreTS(w.getWriteNumber()); memstore.add(kv12); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); kvDel.setMemstoreTS(w.getWriteNumber()); memstore.add(kvDel); // BEFORE COMPLETING DELETE, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE DELETE - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // NOW WE SHOULD SEE DELETE - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); } @@ -383,7 +383,7 @@ public class TestMemStore extends TestCase { final byte[] f = Bytes.toBytes("family"); final byte[] q1 = Bytes.toBytes("q1"); - final ReadWriteConsistencyControl rwcc; + final MultiVersionConsistencyControl mvcc; final MemStore memstore; AtomicReference caughtException; @@ -391,10 +391,10 @@ public class TestMemStore extends TestCase { public ReadOwnWritesTester(int id, MemStore memstore, - ReadWriteConsistencyControl rwcc, + MultiVersionConsistencyControl mvcc, AtomicReference caughtException) { - this.rwcc = rwcc; + this.mvcc = mvcc; this.memstore = memstore; this.caughtException = caughtException; row = Bytes.toBytes(id); @@ -410,8 +410,8 @@ public class TestMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); @@ -419,10 +419,10 @@ public class TestMemStore extends TestCase { KeyValue kv = new KeyValue(row, f, q1, i, v); kv.setMemstoreTS(w.getWriteNumber()); memstore.add(kv); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // Assert that we can read back - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); s.seek(kv); @@ -443,7 +443,7 @@ public class TestMemStore extends TestCase { AtomicReference caught = new AtomicReference(); for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught); + threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught); threads[i].start(); } @@ -531,7 +531,7 @@ public class TestMemStore extends TestCase { * @throws InterruptedException */ public void testGetNextRow() throws Exception { - ReadWriteConsistencyControl.resetThreadReadPoint(); + MultiVersionConsistencyControl.resetThreadReadPoint(); addRows(this.memstore); // Add more versions to make it a little more interesting. Thread.sleep(1); @@ -947,7 +947,7 @@ public class TestMemStore extends TestCase { } public static void main(String [] args) throws IOException { - ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl(); MemStore ms = new MemStore(); long n1 = System.nanoTime(); @@ -957,7 +957,7 @@ public class TestMemStore extends TestCase { System.out.println("foo"); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); for (int i = 0 ; i < 50 ; i++) doScan(ms, i); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java index ea6d6a5..5c09a7e 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java @@ -31,12 +31,12 @@ import java.util.concurrent.atomic.AtomicLong; public class TestReadWriteConsistencyControl extends TestCase { static class Writer implements Runnable { final AtomicBoolean finished; - final ReadWriteConsistencyControl rwcc; + final MultiVersionConsistencyControl mvcc; final AtomicBoolean status; - Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) { + Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) { this.finished = finished; - this.rwcc = rwcc; + this.mvcc = mvcc; this.status = status; } private Random rnd = new Random(); @@ -44,7 +44,7 @@ public class TestReadWriteConsistencyControl extends TestCase { public void run() { while (!finished.get()) { - ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert(); // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) int sleepTime = rnd.nextInt(500); @@ -56,7 +56,7 @@ public class TestReadWriteConsistencyControl extends TestCase { } catch (InterruptedException e1) { } try { - rwcc.completeMemstoreInsert(e); + mvcc.completeMemstoreInsert(e); } catch (RuntimeException ex) { // got failure System.out.println(ex.toString()); @@ -70,7 +70,7 @@ public class TestReadWriteConsistencyControl extends TestCase { } public void testParallelism() throws Exception { - final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl(); final AtomicBoolean finished = new AtomicBoolean(false); @@ -79,9 +79,9 @@ public class TestReadWriteConsistencyControl extends TestCase { final AtomicLong failedAt = new AtomicLong(); Runnable reader = new Runnable() { public void run() { - long prev = rwcc.memstoreReadPoint(); + long prev = mvcc.memstoreReadPoint(); while (!finished.get()) { - long newPrev = rwcc.memstoreReadPoint(); + long newPrev = mvcc.memstoreReadPoint(); if (newPrev < prev) { // serious problem. System.out.println("Reader got out of order, prev: " + @@ -103,7 +103,7 @@ public class TestReadWriteConsistencyControl extends TestCase { for (int i = 0 ; i < n ; ++i ) { statuses[i] = new AtomicBoolean(true); - writers[i] = new Thread(new Writer(finished, rwcc, statuses[i])); + writers[i] = new Thread(new Writer(finished, mvcc, statuses[i])); writers[i].start(); } readThread.start(); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java index ef95cfa..ec306cb 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java @@ -56,7 +56,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { for(byte [] qualifier : qualifiers) { ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0, - qualifier.length, 1, KeyValue.Type.Put.getCode()); + qualifier.length, 1, KeyValue.Type.Put.getCode(), false); actual.add(mc); } @@ -89,7 +89,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { long timestamp = 0; for(byte [] qualifier : qualifiers) { MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length, - ++timestamp, KeyValue.Type.Put.getCode()); + ++timestamp, KeyValue.Type.Put.getCode(), false); actual.add(mc); } @@ -113,7 +113,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { try { for(byte [] qualifier : qualifiers) { tracker.checkColumn(qualifier, 0, qualifier.length, 1, - KeyValue.Type.Put.getCode()); + KeyValue.Type.Put.getCode(), false); } } catch (Exception e) { ok = true; diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index fee22ce..2bc0c59 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -818,7 +818,8 @@ public class TestStoreFile extends HBaseTestCase { for (int i=numKVs;i>0;i--) { KeyValue kv = new KeyValue(b, b, b, i, b); kvs.add(kv); - totalSize += kv.getLength(); + // kv has memstoreTS 0, which takes 1 byte to store. + totalSize += kv.getLength() + 1; } int blockSize = totalSize / numBlocks; StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize,