From c670f4e6f18fe08dbda3e4a016025fc36bc92ab1 Mon Sep 17 00:00:00 2001 From: Amitanand Aiyer Date: Mon, 17 Oct 2011 15:08:32 -0700 Subject: [PATCH 1/8] version 6 of the diff posted on the opensource review board. --- .../hadoop/hbase/io/hfile/AbstractHFileReader.java | 13 +--- .../hadoop/hbase/io/hfile/HFileReaderV1.java | 20 +++--- .../hadoop/hbase/io/hfile/HFileReaderV2.java | 69 ++++++++++++++++++-- .../hadoop/hbase/io/hfile/HFileWriterV2.java | 32 ++++++++- .../apache/hadoop/hbase/regionserver/HRegion.java | 57 ++++++++++++++-- .../regionserver/ReadWriteConsistencyControl.java | 29 ++++++++ .../hbase/regionserver/ScanQueryMatcher.java | 13 ++++ .../apache/hadoop/hbase/regionserver/Store.java | 21 ++++++- .../hadoop/hbase/regionserver/StoreFile.java | 36 ++++++++++ .../hadoop/hbase/regionserver/StoreScanner.java | 9 +++ .../apache/hadoop/hbase/TestAcidGuarantees.java | 11 ++- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 5 +- .../hadoop/hbase/io/hfile/TestHFileWriterV2.java | 40 +++++++++++ .../hadoop/hbase/regionserver/TestStoreFile.java | 3 +- 14 files changed, 316 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java index 90ef2de..fd109e8 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java @@ -290,7 +290,6 @@ public abstract class AbstractHFileReader implements HFile.Reader { } protected static abstract class Scanner implements HFileScanner { - protected HFile.Reader reader; protected ByteBuffer blockBuffer; protected boolean cacheBlocks; @@ -299,30 +298,26 @@ public abstract class AbstractHFileReader implements HFile.Reader { protected int currKeyLen; protected int currValueLen; + protected int currMemstoreTSLen; + protected long currMemstoreTS; protected int blockFetches; - public Scanner(final HFile.Reader reader, final boolean cacheBlocks, + public Scanner(final boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - this.reader = reader; this.cacheBlocks = cacheBlocks; this.pread = pread; this.isCompaction = isCompaction; } @Override - public Reader getReader() { - return reader; - } - - @Override public boolean isSeeked(){ return blockBuffer != null; } @Override public String toString() { - return "HFileScanner for reader " + String.valueOf(reader); + return "HFileScanner for reader " + String.valueOf(getReader()); } protected void assertSeeked() { diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java index 2bb93fa..d9067af 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java @@ -374,13 +374,13 @@ public class HFileReaderV1 extends AbstractHFileReader { * Implementation of {@link HFileScanner} interface. */ protected static class ScannerV1 extends AbstractHFileReader.Scanner { - private final HFileReaderV1 readerV1; + private final HFileReaderV1 reader; private int currBlock; public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(reader, cacheBlocks, pread, isCompaction); - readerV1 = reader; + super(cacheBlocks, pread, isCompaction); + this.reader = reader; } @Override @@ -447,7 +447,7 @@ public class HFileReaderV1 extends AbstractHFileReader { blockBuffer = null; return false; } - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -467,7 +467,7 @@ public class HFileReaderV1 extends AbstractHFileReader { @Override public int seekTo(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return -1; // falls before the beginning of the file! :-( // Avoid re-reading the same block (that'd be dumb). loadBlock(b, true); @@ -493,7 +493,7 @@ public class HFileReaderV1 extends AbstractHFileReader { } } - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) { return -1; } @@ -560,7 +560,7 @@ public class HFileReaderV1 extends AbstractHFileReader { @Override public boolean seekBefore(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return false; // key is before the start of the file. @@ -612,7 +612,7 @@ public class HFileReaderV1 extends AbstractHFileReader { return true; } currBlock = 0; - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -622,13 +622,13 @@ public class HFileReaderV1 extends AbstractHFileReader { private void loadBlock(int bloc, boolean rewind) throws IOException { if (blockBuffer == null) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; } else { if (bloc != currBlock) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 6db9abc..f408f0e 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -19,7 +19,9 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.io.WritableUtils; /** * {@link HFile} reader for version 2. @@ -45,7 +48,13 @@ public class HFileReaderV2 extends AbstractHFileReader { * The size of a (key length, value length) tuple that prefixes each entry in * a data block. */ - private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + + private boolean includesMemstoreTS = false; + + private boolean shouldIncludeMemstoreTS() { + return includesMemstoreTS; + } /** * A "sparse lock" implementation allowing to lock on a particular block @@ -114,6 +123,9 @@ public class HFileReaderV2 extends AbstractHFileReader { lastKey = fileInfo.get(FileInfo.LASTKEY); avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN)); avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN)); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); + includesMemstoreTS = (keyValueFormatVersion != null && + Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE); // Store all other load-on-open blocks for further consumption. HFileBlock b; @@ -314,10 +326,17 @@ public class HFileReaderV2 extends AbstractHFileReader { */ protected static class ScannerV2 extends AbstractHFileReader.Scanner { private HFileBlock block; + private HFileReaderV2 reader; public ScannerV2(HFileReaderV2 r, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(r, cacheBlocks, pread, isCompaction); + super(cacheBlocks, pread, isCompaction); + this.reader = r; + } + + @Override + public HFileReaderV2 getReader() { + return reader; } @Override @@ -325,8 +344,12 @@ public class HFileReaderV2 extends AbstractHFileReader { if (!isSeeked()) return null; - return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position()); + KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position()); + if (this.reader.shouldIncludeMemstoreTS()) { + ret.setMemstoreTS(currMemstoreTS); + } + return ret; } @Override @@ -352,6 +375,8 @@ public class HFileReaderV2 extends AbstractHFileReader { blockBuffer = null; currKeyLen = 0; currValueLen = 0; + currMemstoreTS = 0; + currMemstoreTSLen = 0; } /** @@ -367,7 +392,7 @@ public class HFileReaderV2 extends AbstractHFileReader { try { blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE - + currKeyLen + currValueLen); + + currKeyLen + currValueLen + currMemstoreTSLen); } catch (IllegalArgumentException e) { LOG.error("Current pos = " + blockBuffer.position() + "; currKeyLen = " + currKeyLen + "; currValLen = " @@ -560,6 +585,19 @@ public class HFileReaderV2 extends AbstractHFileReader { currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); blockBuffer.reset(); + if (this.reader.shouldIncludeMemstoreTS()) { + try { + ByteArrayInputStream byte_input = new ByteArrayInputStream(blockBuffer.array()); + byte_input.skip(blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen); + DataInputStream data_input = new DataInputStream(byte_input); + + currMemstoreTS = WritableUtils.readVLong(data_input); + currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstoreTS. " + e); + } + } if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit() @@ -587,12 +625,27 @@ public class HFileReaderV2 extends AbstractHFileReader { private int blockSeek(byte[] key, int offset, int length, boolean seekBefore) { int klen, vlen; + long memstoreTS = 0; + int memstoreTSLen = 0; int lastKeyValueSize = -1; do { blockBuffer.mark(); klen = blockBuffer.getInt(); vlen = blockBuffer.getInt(); blockBuffer.reset(); + if (this.reader.shouldIncludeMemstoreTS()) { + try { + ByteArrayInputStream byte_input = new ByteArrayInputStream(blockBuffer.array()); + byte_input.skip(blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + klen + vlen); + DataInputStream data_input = new DataInputStream(byte_input); + + memstoreTS = WritableUtils.readVLong(data_input); + memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstoreTS. " + e); + } + } int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE; @@ -614,6 +667,10 @@ public class HFileReaderV2 extends AbstractHFileReader { } currKeyLen = klen; currValueLen = vlen; + if (this.reader.shouldIncludeMemstoreTS()) { + currMemstoreTS = memstoreTS; + currMemstoreTSLen = memstoreTSLen; + } return 0; // indicate exact match } @@ -625,7 +682,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } // The size of this key/value tuple, including key/value length fields. - lastKeyValueSize = klen + vlen + KEY_VALUE_LEN_SIZE; + lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE; blockBuffer.position(blockBuffer.position() + lastKeyValueSize); } while (blockBuffer.remaining() > 0); diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index 069eedf..a53f9c3 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /** * Writes HFile format version 2. @@ -46,6 +47,13 @@ import org.apache.hadoop.io.Writable; public class HFileWriterV2 extends AbstractHFileWriter { static final Log LOG = LogFactory.getLog(HFileWriterV2.class); + /** Max memstore (rwcc) timestamp in FileInfo */ + public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); + /** KeyValue version in FileInfo */ + public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); + /** Version for KeyValue which includes memstore timestamp */ + public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; + /** Inline block writers for multi-level block index and compound Blooms. */ private List inlineBlockWriters = new ArrayList(); @@ -66,6 +74,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { private List additionalLoadOnOpenData = new ArrayList(); + private final boolean includeMemstoreTS = true; + private long maxMemstoreTS = 0; + static class WriterFactoryV2 extends HFile.WriterFactory { WriterFactoryV2(Configuration conf, CacheConfig cacheConf) { @@ -297,8 +308,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { */ @Override public void append(final KeyValue kv) throws IOException { - append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), + append(kv.getMemstoreTS(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); + this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMemstoreTS()); } /** @@ -313,7 +325,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { */ @Override public void append(final byte[] key, final byte[] value) throws IOException { - append(key, 0, key.length, value, 0, value.length); + append(0, key, 0, key.length, value, 0, value.length); } /** @@ -328,7 +340,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { * @param vlength * @throws IOException */ - private void append(final byte[] key, final int koffset, final int klength, + private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength, final byte[] value, final int voffset, final int vlength) throws IOException { boolean dupKey = checkKey(key, koffset, klength); @@ -341,6 +353,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { newBlock(); // Write length of key and value and then actual key and value bytes. + // Additionally, we may also write down the memstoreTS. { DataOutputStream out = fsBlockWriter.getUserDataStream(); out.writeInt(klength); @@ -349,6 +362,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { totalValueLength += vlength; out.write(key, koffset, klength); out.write(value, voffset, vlength); + if (this.includeMemstoreTS) { + WritableUtils.writeVLong(out, memstoreTS); + } } // Are we the first key in this block? @@ -364,6 +380,10 @@ public class HFileWriterV2 extends AbstractHFileWriter { entryCount++; } + public static int getEncodedLength(long value) { + return WritableUtils.getVIntSize(value); + } + @Override public void close() throws IOException { if (outputStream == null) { @@ -412,6 +432,11 @@ public class HFileWriterV2 extends AbstractHFileWriter { BlockType.ROOT_INDEX, false), "meta"); fsBlockWriter.writeHeaderAndData(outputStream); + if (this.includeMemstoreTS) { + appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); + appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); + } + // File info writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO, false)); @@ -430,6 +455,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { trailer.setComparatorClass(comparator.getClass()); trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); + finishClose(trailer); fsBlockWriter.releaseCompressor(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index fcd071a..5f4bc20 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -205,6 +205,29 @@ public class HRegion implements HeapSize { // , Writable{ final Path regiondir; KeyValue.KVComparator comparator; + private ConcurrentHashMap scannerReadPoints; + + /* + * @return The smallest rwcc readPoint across all the scanners in this + * region. Writes older than this readPoint, are included in every + * read operation. + */ + public long getSmallestReadPoint() { + long minimumReadPoint; + // We need to ensure that while we are calculating the smallestReadPoint + // no new RegionScanners can grab a readPoint that we are unaware of. + // We achieve this by synchronizing on the scannerReadPoints object. + synchronized(scannerReadPoints) { + minimumReadPoint = rwcc.memstoreReadPoint(); + + for (Long readPoint: this.scannerReadPoints.values()) { + if (readPoint < minimumReadPoint) { + minimumReadPoint = readPoint; + } + } + } + return minimumReadPoint; + } /* * Data structure of write state flags used coordinating flushes, * compactions and closes. @@ -291,6 +314,7 @@ public class HRegion implements HeapSize { // , Writable{ this.htableDescriptor = null; this.threadWakeFrequency = 0L; this.coprocessorHost = null; + this.scannerReadPoints = new ConcurrentHashMap(); } /** @@ -334,6 +358,7 @@ public class HRegion implements HeapSize { // , Writable{ String encodedNameStr = this.regionInfo.getEncodedName(); setHTableSpecificConf(); this.regiondir = getRegionDir(this.tableDir, encodedNameStr); + this.scannerReadPoints = new ConcurrentHashMap(); // don't initialize coprocessors if not running within a regionserver // TODO: revisit if coprocessors should load in other cases @@ -399,6 +424,8 @@ public class HRegion implements HeapSize { // , Writable{ // Load in all the HStores. Get maximum seqid. long maxSeqId = -1; + // initialized to -1 so that we pick up MemstoreTS from column families + long maxMemstoreTS = -1; for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) { status.setStatus("Instantiating store for column family " + c); Store store = instantiateHStore(this.tableDir, c); @@ -407,7 +434,12 @@ public class HRegion implements HeapSize { // , Writable{ if (storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } + long maxStoreMemstoreTS = store.getMaxMemstoreTS(); + if (maxStoreMemstoreTS > maxMemstoreTS) { + maxMemstoreTS = maxStoreMemstoreTS; + } } + rwcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = replayRecoveredEditsIfAny( this.regiondir, maxSeqId, reporter, status); @@ -1549,6 +1581,8 @@ public class HRegion implements HeapSize { // , Writable{ this.put(put, lockid, put.getWriteToWAL()); } + + /** * @param put * @param lockid @@ -2082,7 +2116,7 @@ public class HRegion implements HeapSize { // , Writable{ long size = 0; try { w = rwcc.beginMemstoreInsert(); - + for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); List edits = e.getValue(); @@ -2096,6 +2130,7 @@ public class HRegion implements HeapSize { // , Writable{ } finally { rwcc.completeMemstoreInsert(w); } + return size; } @@ -2644,6 +2679,7 @@ public class HRegion implements HeapSize { // , Writable{ } RegionScannerImpl(Scan scan, List additionalScanners) throws IOException { //DebugPrint.println("HRegionScanner."); + this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { @@ -2655,8 +2691,13 @@ public class HRegion implements HeapSize { // , Writable{ // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); - + // synchronize on scannerReadPoints so that nobody calculates + // getSmallestReadPoint, before scannerReadPoints is updated. + synchronized(scannerReadPoints) { + this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + scannerReadPoints.put(this, this.readPt); + } + List scanners = new ArrayList(); if (additionalScanners != null) { scanners.addAll(additionalScanners); @@ -2665,7 +2706,9 @@ public class HRegion implements HeapSize { // , Writable{ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - scanners.add(store.getScanner(scan, entry.getValue())); + StoreScanner scanner = store.getScanner(scan, entry.getValue()); + scanner.useRWCC(true); + scanners.add(scanner); } this.storeHeap = new KeyValueHeap(scanners, comparator); } @@ -2816,6 +2859,8 @@ public class HRegion implements HeapSize { // , Writable{ storeHeap.close(); storeHeap = null; } + // no need to sychronize here. + scannerReadPoints.remove(this); this.filterClosed = true; } @@ -3867,7 +3912,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 28 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + + 29 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); @@ -3876,7 +3921,7 @@ public class HRegion implements HeapSize { // , Writable{ (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing ClassSize.ATOMIC_LONG + // memStoreSize ClassSize.ATOMIC_INTEGER + // lockIdGenerator - (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds + (3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints WriteState.HEAP_SIZE + // writestate ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java index 8ec53d3..53173d7 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java @@ -24,6 +24,9 @@ import java.util.LinkedList; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + /** * Manages the read/write consistency within memstore. This provides * an interface for readers to determine what entries to ignore, and @@ -44,6 +47,31 @@ public class ReadWriteConsistencyControl { new ThreadLocal(); /** + * Default constructor. Initializes the memstoreRead/Write points to 0. + */ + public ReadWriteConsistencyControl() { + this.memstoreRead = this.memstoreWrite = 0; + } + + /** + * Initializes the memstoreRead/Write points appropriately. + * @param startPoint + */ + public void initialize(long startPoint) { + synchronized (writeQueue) { + if (this.memstoreWrite != this.memstoreRead) { + throw new RuntimeException("Already used this rwcc. Too late to initialize"); + } + + if (this.memstoreWrite > startPoint) { + throw new RuntimeException("Cannot decrease RWCC timestamp"); + } + + this.memstoreRead = this.memstoreWrite = startPoint; + } + } + + /** * Get this thread's read point. Used primarily by the memstore scanner to * know which values to skip (ie: have not been completed/committed to * memstore). @@ -137,6 +165,7 @@ public class ReadWriteConsistencyControl { } } if (interrupted) Thread.currentThread().interrupt(); + } public long memstoreReadPoint() { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index f86f1fe..03331df 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -60,6 +60,12 @@ public class ScanQueryMatcher { /** Row the query is on */ protected byte [] row; + /** Should we ignore KV's with a newer RWCC timestamp **/ + private boolean enforceRWCC = false; + public void useRWCC(boolean flag) { + this.enforceRWCC = flag; + } + /** * Constructs a ScanQueryMatcher for a Scan. * @param scan @@ -170,6 +176,13 @@ public class ScanQueryMatcher { return columns.getNextRowOrNextColumn(bytes, offset, qualLength); } + // The compaction thread has no readPoint set. For other operations, we + // will ignore updates that are done after the read operation has started. + if (this.enforceRWCC && + kv.getMemstoreTS() > ReadWriteConsistencyControl.getThreadReadPoint()) { + return MatchCode.SKIP; + } + byte type = kv.getType(); if (isDelete(type)) { if (tr.withinOrAfterTimeRange(timestamp)) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 7761c42..a926d38 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -227,6 +227,13 @@ public class Store implements HeapSize { } /** + * @return The maximum memstoreTS in all store files. + */ + public long getMaxMemstoreTS() { + return StoreFile.getMaxMemstoreTSInList(this.getStorefiles()); + } + + /** * @param tabledir * @param encodedName Encoded region name. * @param family @@ -471,6 +478,8 @@ public class Store implements HeapSize { throws IOException { StoreFile.Writer writer; String fileName; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = region.getSmallestReadPoint(); long flushed = 0; // Don't flush if there are no entries. if (set.size() == 0) { @@ -501,6 +510,11 @@ public class Store implements HeapSize { hasMore = scanner.next(kvs); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to disk. + if (kv.getMemstoreTS() <= smallestReadPoint) { + kv.setMemstoreTS(0); + } writer.append(kv); flushed += this.memstore.heapSizeChange(kv, true); } @@ -1127,6 +1141,8 @@ public class Store implements HeapSize { // Make the instantiation lazy in case compaction produces no product; i.e. // where all source cells are expired or deleted. StoreFile.Writer writer = null; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = region.getSmallestReadPoint(); try { InternalScanner scanner = null; try { @@ -1160,6 +1176,9 @@ public class Store implements HeapSize { if (writer != null) { // output to writer: for (KeyValue kv : kvs) { + if (kv.getMemstoreTS() <= smallestReadPoint) { + kv.setMemstoreTS(0); + } writer.append(kv); // update progress per key ++progress.currentCompactedKVs; @@ -1608,7 +1627,7 @@ public class Store implements HeapSize { * Return a scanner for both the memstore and the HStore files * @throws IOException */ - public KeyValueScanner getScanner(Scan scan, + public StoreScanner getScanner(Scan scan, final NavigableSet targetCols) throws IOException { lock.readLock().lock(); try { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index b21de77..fb17387 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileWriterV1; +import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; @@ -141,6 +142,18 @@ public class StoreFile { // Set when we obtain a Reader. private long sequenceid = -1; + // max of the MemstoreTS in the KV's in this store + // Set when we obtain a Reader. + private long maxMemstoreTS = -1; + + public long getMaxMemstoreTS() { + return maxMemstoreTS; + } + + public void setMaxMemstoreTS(long maxMemstoreTS) { + this.maxMemstoreTS = maxMemstoreTS; + } + // If true, this file was product of a major compaction. Its then set // whenever you get a Reader. private AtomicBoolean majorCompaction = null; @@ -315,6 +328,24 @@ public class StoreFile { } /** + * Return the largest memstoreTS found across all storefiles in + * the given list. Store files that were created by a mapreduce + * bulk load are ignored, as they do not correspond to any specific + * put operation, and thus do not have a memstoreTS associated with them. + * @return 0 if no non-bulk-load files are provided or, this is Store that + * does not yet have any store files. + */ + public static long getMaxMemstoreTSInList(Collection sfs) { + long max = 0; + for (StoreFile sf : sfs) { + if (!sf.isBulkLoadResult()) { + max = Math.max(max, sf.getMaxMemstoreTS()); + } + } + return max; + } + + /** * Return the highest sequence ID found across all storefiles in * the given list. Store files that were created by a mapreduce * bulk load are ignored, as they do not correspond to any edit @@ -463,6 +494,11 @@ public class StoreFile { } this.reader.setSequenceID(this.sequenceid); + b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); + if (b != null) { + this.maxMemstoreTS = Bytes.toLong(b); + } + b = metadataMap.get(MAJOR_COMPACTION_KEY); if (b != null) { boolean mc = Bytes.toBoolean(b); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index f5b5c4c..46ed90d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -157,6 +157,15 @@ class StoreScanner extends NonLazyKeyValueScanner heap = new KeyValueHeap(scanners, comparator); } + /** + * Advise the StoreScanner if it should enforce the RWCC mechanism + * for ignoring newer KVs or not. + * @param flag + */ + public void useRWCC(boolean flag) { + matcher.useRWCC(flag); + } + /* * @return List of scanners ordered properly. */ diff --git a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java index 4ac6e09..bb87e36 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java +++ b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java @@ -250,6 +250,12 @@ public class TestAcidGuarantees { writers.add(writer); ctx.addThread(writer); } + // Add a flusher + ctx.addThread(new RepeatingTestThread(ctx) { + public void doAnAction() throws Exception { + util.flush(); + } + }); List getters = Lists.newArrayList(); for (int i = 0; i < numGetters; i++) { @@ -286,7 +292,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2856") public void testGetAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -297,7 +302,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2670") public void testScanAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -308,7 +312,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2670") public void testMixedAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -322,7 +325,7 @@ public class TestAcidGuarantees { Configuration c = HBaseConfiguration.create(); TestAcidGuarantees test = new TestAcidGuarantees(); test.setConf(c); - test.runTestAtomicity(5*60*1000, 5, 2, 2, 3); + test.runTestAtomicity(5000, 50, 2, 2, 3); } private void setConf(Configuration c) { diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index ef16382..9e3a14a 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -192,9 +192,10 @@ public class TestCacheOnWrite { } LOG.info("Block count by type: " + blockCountByType); + String countByType = blockCountByType.toString(); assertEquals( - "{DATA=1367, LEAF_INDEX=172, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", - blockCountByType.toString()); + "{DATA=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", + countByType); reader.close(); } diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java index 78a7cd6..ed020a3 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.*; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -36,8 +38,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.junit.Before; import org.junit.Test; @@ -115,10 +120,36 @@ public class TestHFileWriterV2 { HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize); + // Comparator class name is stored in the trailer in version 2. + RawComparator comparator = trailer.createComparator(); + HFileBlockIndex.BlockIndexReader dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator, + trailer.getNumDataIndexLevels()); + HFileBlockIndex.BlockIndexReader metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader( + Bytes.BYTES_RAWCOMPARATOR, 1); + + HFileBlock.BlockIterator blockIter = blockReader.blockRange( + trailer.getLoadOnOpenDataOffset(), + fileSize - trailer.getTrailerSize()); + // Data index. We also read statistics about the block index written after + // the root level. + dataBlockIndexReader.readMultiLevelIndexRoot( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getDataIndexCount()); + + // Meta index. + metaBlockIndexReader.readRootIndex( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getMetaIndexCount()); + // File info + FileInfo fileInfo = new FileInfo(); + fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO)); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); + boolean includeMemstoreTS = (keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0); // Counters for the number of key/value pairs and the number of blocks int entriesRead = 0; int blocksRead = 0; + long memstoreTS = 0; // Scan blocks the way the reader would scan them fsdis.seek(0); @@ -137,6 +168,15 @@ public class TestHFileWriterV2 { byte[] value = new byte[valueLen]; buf.get(value); + if (includeMemstoreTS) { + ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(), + buf.arrayOffset() + buf.position(), buf.remaining()); + DataInputStream data_input = new DataInputStream(byte_input); + + memstoreTS = WritableUtils.readVLong(data_input); + buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS)); + } + // A brute-force check to see that all keys and values are correct. assertTrue(Bytes.compareTo(key, keys.get(entriesRead)) == 0); assertTrue(Bytes.compareTo(value, values.get(entriesRead)) == 0); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index 44d2c9d..0a68029 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -773,7 +773,8 @@ public class TestStoreFile extends HBaseTestCase { for (int i=numKVs;i>0;i--) { KeyValue kv = new KeyValue(b, b, b, i, b); kvs.add(kv); - totalSize += kv.getLength(); + // kv has memstoreTS 0, which takes 1 byte to store. + totalSize += kv.getLength() + 1; } int blockSize = totalSize / numBlocks; StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize, -- 1.7.4.1