diff --git a/pom.xml b/pom.xml index bfe8103..fc5e5ca 100644 --- a/pom.xml +++ b/pom.xml @@ -258,7 +258,7 @@ maven-surefire-plugin - 2.9 + 2.8 900 -enableassertions -Xmx1400m @@ -322,7 +322,7 @@ maven-surefire-report-plugin - 2.9 + 2.8 org.apache.avro diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java index 90ef2de..fd109e8 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java @@ -290,7 +290,6 @@ public abstract class AbstractHFileReader implements HFile.Reader { } protected static abstract class Scanner implements HFileScanner { - protected HFile.Reader reader; protected ByteBuffer blockBuffer; protected boolean cacheBlocks; @@ -299,30 +298,26 @@ public abstract class AbstractHFileReader implements HFile.Reader { protected int currKeyLen; protected int currValueLen; + protected int currMemstoreTSLen; + protected long currMemstoreTS; protected int blockFetches; - public Scanner(final HFile.Reader reader, final boolean cacheBlocks, + public Scanner(final boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - this.reader = reader; this.cacheBlocks = cacheBlocks; this.pread = pread; this.isCompaction = isCompaction; } @Override - public Reader getReader() { - return reader; - } - - @Override public boolean isSeeked(){ return blockBuffer != null; } @Override public String toString() { - return "HFileScanner for reader " + String.valueOf(reader); + return "HFileScanner for reader " + String.valueOf(getReader()); } protected void assertSeeked() { diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java index 2bb93fa..d9067af 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java @@ -374,13 +374,13 @@ public class HFileReaderV1 extends AbstractHFileReader { * Implementation of {@link HFileScanner} interface. */ protected static class ScannerV1 extends AbstractHFileReader.Scanner { - private final HFileReaderV1 readerV1; + private final HFileReaderV1 reader; private int currBlock; public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(reader, cacheBlocks, pread, isCompaction); - readerV1 = reader; + super(cacheBlocks, pread, isCompaction); + this.reader = reader; } @Override @@ -447,7 +447,7 @@ public class HFileReaderV1 extends AbstractHFileReader { blockBuffer = null; return false; } - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -467,7 +467,7 @@ public class HFileReaderV1 extends AbstractHFileReader { @Override public int seekTo(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return -1; // falls before the beginning of the file! :-( // Avoid re-reading the same block (that'd be dumb). loadBlock(b, true); @@ -493,7 +493,7 @@ public class HFileReaderV1 extends AbstractHFileReader { } } - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) { return -1; } @@ -560,7 +560,7 @@ public class HFileReaderV1 extends AbstractHFileReader { @Override public boolean seekBefore(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return false; // key is before the start of the file. @@ -612,7 +612,7 @@ public class HFileReaderV1 extends AbstractHFileReader { return true; } currBlock = 0; - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -622,13 +622,13 @@ public class HFileReaderV1 extends AbstractHFileReader { private void loadBlock(int bloc, boolean rewind) throws IOException { if (blockBuffer == null) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; } else { if (bloc != currBlock) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 6db9abc..e39fbee 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -19,7 +19,9 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.io.WritableUtils; /** * {@link HFile} reader for version 2. @@ -45,7 +48,13 @@ public class HFileReaderV2 extends AbstractHFileReader { * The size of a (key length, value length) tuple that prefixes each entry in * a data block. */ - private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + + private boolean includesMemstoreTS = false; + + private boolean shouldIncludeMemstoreTS() { + return includesMemstoreTS; + } /** * A "sparse lock" implementation allowing to lock on a particular block @@ -114,6 +123,9 @@ public class HFileReaderV2 extends AbstractHFileReader { lastKey = fileInfo.get(FileInfo.LASTKEY); avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN)); avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN)); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); + includesMemstoreTS = (keyValueFormatVersion != null && + Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE); // Store all other load-on-open blocks for further consumption. HFileBlock b; @@ -314,10 +326,17 @@ public class HFileReaderV2 extends AbstractHFileReader { */ protected static class ScannerV2 extends AbstractHFileReader.Scanner { private HFileBlock block; + private HFileReaderV2 reader; public ScannerV2(HFileReaderV2 r, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(r, cacheBlocks, pread, isCompaction); + super(cacheBlocks, pread, isCompaction); + this.reader = r; + } + + @Override + public HFileReaderV2 getReader() { + return reader; } @Override @@ -325,8 +344,12 @@ public class HFileReaderV2 extends AbstractHFileReader { if (!isSeeked()) return null; - return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position()); + if (this.reader.shouldIncludeMemstoreTS()) { + ret.setMemstoreTS(currMemstoreTS); + } + return ret; } @Override @@ -352,6 +375,8 @@ public class HFileReaderV2 extends AbstractHFileReader { blockBuffer = null; currKeyLen = 0; currValueLen = 0; + currMemstoreTS = 0; + currMemstoreTSLen = 0; } /** @@ -367,7 +392,7 @@ public class HFileReaderV2 extends AbstractHFileReader { try { blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE - + currKeyLen + currValueLen); + + currKeyLen + currValueLen + currMemstoreTSLen); } catch (IllegalArgumentException e) { LOG.error("Current pos = " + blockBuffer.position() + "; currKeyLen = " + currKeyLen + "; currValLen = " @@ -560,6 +585,16 @@ public class HFileReaderV2 extends AbstractHFileReader { currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); blockBuffer.reset(); + if (this.reader.shouldIncludeMemstoreTS()) { + try { + int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen; + currMemstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset); + currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstoreTS. " + e); + } + } if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit() @@ -587,12 +622,24 @@ public class HFileReaderV2 extends AbstractHFileReader { private int blockSeek(byte[] key, int offset, int length, boolean seekBefore) { int klen, vlen; + long memstoreTS = 0; + int memstoreTSLen = 0; int lastKeyValueSize = -1; do { blockBuffer.mark(); klen = blockBuffer.getInt(); vlen = blockBuffer.getInt(); blockBuffer.reset(); + if (this.reader.shouldIncludeMemstoreTS()) { + try { + int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + klen + vlen; + memstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset); + memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstoreTS. " + e); + } + } int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE; @@ -614,6 +661,10 @@ public class HFileReaderV2 extends AbstractHFileReader { } currKeyLen = klen; currValueLen = vlen; + if (this.reader.shouldIncludeMemstoreTS()) { + currMemstoreTS = memstoreTS; + currMemstoreTSLen = memstoreTSLen; + } return 0; // indicate exact match } @@ -625,7 +676,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } // The size of this key/value tuple, including key/value length fields. - lastKeyValueSize = klen + vlen + KEY_VALUE_LEN_SIZE; + lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE; blockBuffer.position(blockBuffer.position() + lastKeyValueSize); } while (blockBuffer.remaining() > 0); diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index 069eedf..fad2079 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -36,9 +36,11 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; +import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /** * Writes HFile format version 2. @@ -46,6 +48,13 @@ import org.apache.hadoop.io.Writable; public class HFileWriterV2 extends AbstractHFileWriter { static final Log LOG = LogFactory.getLog(HFileWriterV2.class); + /** Max memstore (rwcc) timestamp in FileInfo */ + public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); + /** KeyValue version in FileInfo */ + public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); + /** Version for KeyValue which includes memstore timestamp */ + public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; + /** Inline block writers for multi-level block index and compound Blooms. */ private List inlineBlockWriters = new ArrayList(); @@ -66,6 +75,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { private List additionalLoadOnOpenData = new ArrayList(); + private final boolean includeMemstoreTS = true; + private long maxMemstoreTS = 0; + static class WriterFactoryV2 extends HFile.WriterFactory { WriterFactoryV2(Configuration conf, CacheConfig cacheConf) { @@ -297,8 +309,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { */ @Override public void append(final KeyValue kv) throws IOException { - append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), + append(kv.getMemstoreTS(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); + this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMemstoreTS()); } /** @@ -313,7 +326,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { */ @Override public void append(final byte[] key, final byte[] value) throws IOException { - append(key, 0, key.length, value, 0, value.length); + append(0, key, 0, key.length, value, 0, value.length); } /** @@ -328,7 +341,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { * @param vlength * @throws IOException */ - private void append(final byte[] key, final int koffset, final int klength, + private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength, final byte[] value, final int voffset, final int vlength) throws IOException { boolean dupKey = checkKey(key, koffset, klength); @@ -341,6 +354,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { newBlock(); // Write length of key and value and then actual key and value bytes. + // Additionally, we may also write down the memstoreTS. { DataOutputStream out = fsBlockWriter.getUserDataStream(); out.writeInt(klength); @@ -349,6 +363,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { totalValueLength += vlength; out.write(key, koffset, klength); out.write(value, voffset, vlength); + if (this.includeMemstoreTS) { + WritableUtils.writeVLong(out, memstoreTS); + } } // Are we the first key in this block? @@ -412,6 +429,11 @@ public class HFileWriterV2 extends AbstractHFileWriter { BlockType.ROOT_INDEX, false), "meta"); fsBlockWriter.writeHeaderAndData(outputStream); + if (this.includeMemstoreTS) { + appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); + appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); + } + // File info writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO, false)); @@ -430,6 +452,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { trailer.setComparatorClass(comparator.getClass()); trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); + finishClose(trailer); fsBlockWriter.releaseCompressor(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java index 8e90952..798b54d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java @@ -47,12 +47,13 @@ public interface ColumnTracker { * @param offset * @param length * @param ttl The timeToLive to enforce. + * @param ignoreCount -- should we keep count of this KV * @return The match code instance. * @throws IOException in case there is an internal consistency problem * caused by a data corruption. */ public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, - int length, long ttl) throws IOException; + int length, long ttl, boolean ignoreCount) throws IOException; /** * Updates internal variables in between files diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java index e1cfdb9..cc62a7b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java @@ -103,10 +103,13 @@ public class ExplicitColumnTracker implements ColumnTracker { * @param offset offset to the start of the qualifier * @param length length of the qualifier * @param timestamp timestamp of the key being checked + * @param ignoreCount indicates if the KV needs to be excluded while counting + * (used during compactions. We only count KV's that are older than all the + * scanners' read points.) * @return MatchCode telling ScanQueryMatcher what action to take */ public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, - int length, long timestamp) { + int length, long timestamp, boolean ignoreCount) { do { // No more columns left, we are done with this query if(this.columns.size() == 0) { @@ -125,6 +128,8 @@ public class ExplicitColumnTracker implements ColumnTracker { // Column Matches. If it is not a duplicate key, increment the version count // and include. if(ret == 0) { + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; + //If column matches, check if it is a duplicate timestamp if (sameAsPreviousTS(timestamp)) { //If duplicate, skip this Key diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index fcd071a..280dfdf 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -205,6 +205,29 @@ public class HRegion implements HeapSize { // , Writable{ final Path regiondir; KeyValue.KVComparator comparator; + private ConcurrentHashMap scannerReadPoints; + + /* + * @return The smallest rwcc readPoint across all the scanners in this + * region. Writes older than this readPoint, are included in every + * read operation. + */ + public long getSmallestReadPoint() { + long minimumReadPoint; + // We need to ensure that while we are calculating the smallestReadPoint + // no new RegionScanners can grab a readPoint that we are unaware of. + // We achieve this by synchronizing on the scannerReadPoints object. + synchronized(scannerReadPoints) { + minimumReadPoint = rwcc.memstoreReadPoint(); + + for (Long readPoint: this.scannerReadPoints.values()) { + if (readPoint < minimumReadPoint) { + minimumReadPoint = readPoint; + } + } + } + return minimumReadPoint; + } /* * Data structure of write state flags used coordinating flushes, * compactions and closes. @@ -291,6 +314,7 @@ public class HRegion implements HeapSize { // , Writable{ this.htableDescriptor = null; this.threadWakeFrequency = 0L; this.coprocessorHost = null; + this.scannerReadPoints = new ConcurrentHashMap(); } /** @@ -334,6 +358,7 @@ public class HRegion implements HeapSize { // , Writable{ String encodedNameStr = this.regionInfo.getEncodedName(); setHTableSpecificConf(); this.regiondir = getRegionDir(this.tableDir, encodedNameStr); + this.scannerReadPoints = new ConcurrentHashMap(); // don't initialize coprocessors if not running within a regionserver // TODO: revisit if coprocessors should load in other cases @@ -399,6 +424,8 @@ public class HRegion implements HeapSize { // , Writable{ // Load in all the HStores. Get maximum seqid. long maxSeqId = -1; + // initialized to -1 so that we pick up MemstoreTS from column families + long maxMemstoreTS = -1; for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) { status.setStatus("Instantiating store for column family " + c); Store store = instantiateHStore(this.tableDir, c); @@ -407,7 +434,12 @@ public class HRegion implements HeapSize { // , Writable{ if (storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } + long maxStoreMemstoreTS = store.getMaxMemstoreTS(); + if (maxStoreMemstoreTS > maxMemstoreTS) { + maxMemstoreTS = maxStoreMemstoreTS; + } } + rwcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = replayRecoveredEditsIfAny( this.regiondir, maxSeqId, reporter, status); @@ -1549,6 +1581,8 @@ public class HRegion implements HeapSize { // , Writable{ this.put(put, lockid, put.getWriteToWAL()); } + + /** * @param put * @param lockid @@ -2082,7 +2116,7 @@ public class HRegion implements HeapSize { // , Writable{ long size = 0; try { w = rwcc.beginMemstoreInsert(); - + for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); List edits = e.getValue(); @@ -2093,11 +2127,12 @@ public class HRegion implements HeapSize { // , Writable{ size += store.add(kv); } } - } finally { - rwcc.completeMemstoreInsert(w); - } - return size; - } + } finally { + rwcc.completeMemstoreInsert(w); + } + + return size; + } /** * Check the collection of families for validity. @@ -2644,6 +2679,7 @@ public class HRegion implements HeapSize { // , Writable{ } RegionScannerImpl(Scan scan, List additionalScanners) throws IOException { //DebugPrint.println("HRegionScanner."); + this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { @@ -2655,8 +2691,13 @@ public class HRegion implements HeapSize { // , Writable{ // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); - + // synchronize on scannerReadPoints so that nobody calculates + // getSmallestReadPoint, before scannerReadPoints is updated. + synchronized(scannerReadPoints) { + this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + scannerReadPoints.put(this, this.readPt); + } + List scanners = new ArrayList(); if (additionalScanners != null) { scanners.addAll(additionalScanners); @@ -2665,7 +2706,8 @@ public class HRegion implements HeapSize { // , Writable{ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - scanners.add(store.getScanner(scan, entry.getValue())); + StoreScanner scanner = store.getScanner(scan, entry.getValue()); + scanners.add(scanner); } this.storeHeap = new KeyValueHeap(scanners, comparator); } @@ -2816,6 +2858,8 @@ public class HRegion implements HeapSize { // , Writable{ storeHeap.close(); storeHeap = null; } + // no need to sychronize here. + scannerReadPoints.remove(this); this.filterClosed = true; } @@ -3867,7 +3911,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 28 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + + 29 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); @@ -3876,7 +3920,7 @@ public class HRegion implements HeapSize { // , Writable{ (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing ClassSize.ATOMIC_LONG + // memStoreSize ClassSize.ATOMIC_INTEGER + // lockIdGenerator - (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds + (3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints WriteState.HEAP_SIZE + // writestate ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 34263e4..d751f35 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -641,6 +641,10 @@ public class MemStore implements HeapSize { private Iterator kvsetIt; private Iterator snapshotIt; + // The kvset and snapshot at the time of creating this scanner + volatile KeyValueSkipListSet kvsetAtCreation; + volatile KeyValueSkipListSet snapshotAtCreation; + // Sub lists on which we're iterating private SortedSet kvTail; private SortedSet snapshotTail; @@ -671,6 +675,9 @@ public class MemStore implements HeapSize { MemStoreScanner() { super(); + + kvsetAtCreation = kvset; + snapshotAtCreation = snapshot; } protected KeyValue getNext(Iterator it) { @@ -702,8 +709,8 @@ public class MemStore implements HeapSize { // kvset and snapshot will never be null. // if tailSet can't find anything, SortedSet is empty (not null). - kvTail = kvset.tailSet(key); - snapshotTail = snapshot.tailSet(key); + kvTail = kvsetAtCreation.tailSet(key); + snapshotTail = snapshotAtCreation.tailSet(key); return seekInSubLists(key); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java index 8ec53d3..fbd8cf1 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java @@ -24,6 +24,9 @@ import java.util.LinkedList; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + /** * Manages the read/write consistency within memstore. This provides * an interface for readers to determine what entries to ignore, and @@ -41,7 +44,34 @@ public class ReadWriteConsistencyControl { new LinkedList(); private static final ThreadLocal perThreadReadPoint = - new ThreadLocal(); + new ThreadLocal() { + @Override + protected + Long initialValue() { + return Long.MAX_VALUE; + } + }; + + /** + * Default constructor. Initializes the memstoreRead/Write points to 0. + */ + public ReadWriteConsistencyControl() { + this.memstoreRead = this.memstoreWrite = 0; + } + + /** + * Initializes the memstoreRead/Write points appropriately. + * @param startPoint + */ + public void initialize(long startPoint) { + synchronized (writeQueue) { + if (this.memstoreWrite != this.memstoreRead) { + throw new RuntimeException("Already used this rwcc. Too late to initialize"); + } + + this.memstoreRead = this.memstoreWrite = startPoint; + } + } /** * Get this thread's read point. Used primarily by the memstore scanner to @@ -49,7 +79,7 @@ public class ReadWriteConsistencyControl { * memstore). */ public static long getThreadReadPoint() { - return perThreadReadPoint.get(); + return perThreadReadPoint.get(); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index f86f1fe..ed02421 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -60,6 +60,9 @@ public class ScanQueryMatcher { /** Row the query is on */ protected byte [] row; + /** readPoint over which the KVs are unconditionally included */ + protected long maxReadPointToTrackVersions; + /** * Constructs a ScanQueryMatcher for a Scan. * @param scan @@ -71,6 +74,7 @@ public class ScanQueryMatcher { public ScanQueryMatcher(Scan scan, byte [] family, NavigableSet columns, long ttl, KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions, + long readPointToUse, boolean retainDeletesInOutput) { this.tr = scan.getTimeRange(); this.rowComparator = rowComparator; @@ -79,6 +83,7 @@ public class ScanQueryMatcher { this.startKey = KeyValue.createFirstOnRow(scan.getStartRow(), family, null); this.filter = scan.getFilter(); this.retainDeletesInOutput = retainDeletesInOutput; + this.maxReadPointToTrackVersions = readPointToUse; // Single branch to deal with two types of reads (columns vs all in family) if (columns == null || columns.size() == 0) { @@ -98,6 +103,7 @@ public class ScanQueryMatcher { /* By default we will not include deletes */ /* deletes are included explicitly (for minor compaction) */ this(scan, family, columns, ttl, rowComparator, minVersions, maxVersions, + Long.MAX_VALUE, /* max Readpoint to track versions */ false); } public ScanQueryMatcher(Scan scan, byte [] family, @@ -222,7 +228,8 @@ public class ScanQueryMatcher { } } - MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp); + MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp, + kv.getMemstoreTS() > maxReadPointToTrackVersions); /* * According to current implementation, colChecker can only be * SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java index 0914b04..49b4b27 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java @@ -66,19 +66,25 @@ public class ScanWildcardColumnTracker implements ColumnTracker { * @param offset * @param length * @param timestamp + * @param ignoreCount indicates if the KV needs to be excluded while counting + * (used during compactions. We only count KV's that are older than all the + * scanners' read points.) * @return The match code instance. */ @Override public MatchCode checkColumn(byte[] bytes, int offset, int length, - long timestamp) throws IOException { + long timestamp, boolean ignoreCount) throws IOException { if (columnBuffer == null) { // first iteration. resetBuffer(bytes, offset, length); + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; return checkVersion(++currentCount, timestamp); } int cmp = Bytes.compareTo(bytes, offset, length, columnBuffer, columnOffset, columnLength); if (cmp == 0) { + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; + //If column matches, check if it is a duplicate timestamp if (sameAsPreviousTS(timestamp)) { return ScanQueryMatcher.MatchCode.SKIP; @@ -92,6 +98,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker { if (cmp > 0) { // switched columns, lets do something.x resetBuffer(bytes, offset, length); + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; return checkVersion(++currentCount, timestamp); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 7761c42..fdeb0cb 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -227,6 +227,13 @@ public class Store implements HeapSize { } /** + * @return The maximum memstoreTS in all store files. + */ + public long getMaxMemstoreTS() { + return StoreFile.getMaxMemstoreTSInList(this.getStorefiles()); + } + + /** * @param tabledir * @param encodedName Encoded region name. * @param family @@ -386,10 +393,15 @@ public class Store implements HeapSize { ArrayList newFiles = new ArrayList(storefiles); newFiles.add(sf); this.storefiles = sortAndClone(newFiles); - notifyChangedReadersObservers(); } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. this.lock.writeLock().unlock(); } + notifyChangedReadersObservers(); LOG.info("Successfully loaded store file " + srcPath + " into store " + this + " (new location: " + dstPath + ")"); } @@ -471,6 +483,8 @@ public class Store implements HeapSize { throws IOException { StoreFile.Writer writer; String fileName; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = region.getSmallestReadPoint(); long flushed = 0; // Don't flush if there are no entries. if (set.size() == 0) { @@ -483,7 +497,7 @@ public class Store implements HeapSize { // pass true as the StoreScanner's retainDeletesInOutput argument. InternalScanner scanner = new StoreScanner(this, scan, Collections.singletonList(new CollectionBackedScanner(set, - this.comparator)), true); + this.comparator)), this.region.getSmallestReadPoint(), true); try { // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem @@ -501,6 +515,14 @@ public class Store implements HeapSize { hasMore = scanner.next(kvs); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to disk. + if (kv.getMemstoreTS() <= smallestReadPoint) { + // let us not change the original KV. It could be in the memstore + // changing its memstoreTS could affect other threads/scanners. + kv = kv.shallowCopy(); + kv.setMemstoreTS(0); + } writer.append(kv); flushed += this.memstore.heapSizeChange(kv, true); } @@ -582,15 +604,21 @@ public class Store implements HeapSize { ArrayList newList = new ArrayList(storefiles); newList.add(sf); storefiles = sortAndClone(newList); - this.memstore.clearSnapshot(set); - // Tell listeners of the change in readers. - notifyChangedReadersObservers(); - - return needsCompaction(); + this.memstore.clearSnapshot(set); } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. this.lock.writeLock().unlock(); } + + // Tell listeners of the change in readers. + notifyChangedReadersObservers(); + + return needsCompaction(); } /* @@ -603,6 +631,33 @@ public class Store implements HeapSize { } } + protected List getScanners(boolean cacheBlocks, + boolean isGet, boolean isCompaction) throws IOException { + List storeFiles; + List memStoreScanners; + this.lock.readLock().lock(); + try { + storeFiles = this.getStorefiles(); + memStoreScanners = this.memstore.getScanners(); + } finally { + this.lock.readLock().unlock(); + } + + // First the store file scanners + + // TODO this used to get the store files in descending order, + // but now we get them in ascending order, which I think is + // actually more correct, since memstore get put at the end. + List sfScanners = StoreFileScanner + .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction); + List scanners = + new ArrayList(sfScanners.size()+1); + scanners.addAll(sfScanners); + // Then the memstore scanners + scanners.addAll(memStoreScanners); + return scanners; + } + /* * @param o Observer who wants to know about changes in set of Readers */ @@ -1122,18 +1177,21 @@ public class Store implements HeapSize { // For each file, obtain a scanner: List scanners = StoreFileScanner - .getScannersForStoreFiles(filesToCompact, false, false); + .getScannersForStoreFiles(filesToCompact, false, false, true); // Make the instantiation lazy in case compaction produces no product; i.e. // where all source cells are expired or deleted. StoreFile.Writer writer = null; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = region.getSmallestReadPoint(); + ReadWriteConsistencyControl.setThreadReadPoint(smallestReadPoint); try { InternalScanner scanner = null; try { Scan scan = new Scan(); scan.setMaxVersions(family.getMaxVersions()); /* include deletes, unless we are doing a major compaction */ - scanner = new StoreScanner(this, scan, scanners, !majorCompaction); + scanner = new StoreScanner(this, scan, scanners, smallestReadPoint, !majorCompaction); if (region.getCoprocessorHost() != null) { InternalScanner cpScanner = region.getCoprocessorHost().preCompact( this, scanner); @@ -1160,6 +1218,9 @@ public class Store implements HeapSize { if (writer != null) { // output to writer: for (KeyValue kv : kvs) { + if (kv.getMemstoreTS() <= smallestReadPoint) { + kv.setMemstoreTS(0); + } writer.append(kv); // update progress per key ++progress.currentCompactedKVs; @@ -1262,8 +1323,8 @@ public class Store implements HeapSize { this.family.getBloomFilterType()); result.createReader(); } - this.lock.writeLock().lock(); try { + this.lock.writeLock().lock(); try { // Change this.storefiles so it reflects new state but do not // delete old store files until we have sent out notification of @@ -1279,34 +1340,40 @@ public class Store implements HeapSize { } this.storefiles = sortAndClone(newStoreFiles); + } finally { + // We need the lock, as long as we are updating the storefiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. + this.lock.writeLock().unlock(); + } - // Tell observers that list of StoreFiles has changed. - notifyChangedReadersObservers(); - // Finally, delete old store files. - for (StoreFile hsf: compactedFiles) { - hsf.deleteReader(); - } - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files in " + this.storeNameStr + - ". Compacted file is " + (result == null? "none": result.toString()) + - ". Files replaced " + compactedFiles.toString() + - " some of which may have been already removed", e); + // Tell observers that list of StoreFiles has changed. + notifyChangedReadersObservers(); + // Finally, delete old store files. + for (StoreFile hsf: compactedFiles) { + hsf.deleteReader(); } - // 4. Compute new store size - this.storeSize = 0L; - this.totalUncompressedBytes = 0L; - for (StoreFile hsf : this.storefiles) { - StoreFile.Reader r = hsf.getReader(); - if (r == null) { - LOG.warn("StoreFile " + hsf + " has a null Reader"); - continue; - } - this.storeSize += r.length(); - this.totalUncompressedBytes += r.getTotalUncompressedBytes(); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.error("Failed replacing compacted files in " + this.storeNameStr + + ". Compacted file is " + (result == null? "none": result.toString()) + + ". Files replaced " + compactedFiles.toString() + + " some of which may have been already removed", e); + } + + // 4. Compute new store size + this.storeSize = 0L; + this.totalUncompressedBytes = 0L; + for (StoreFile hsf : this.storefiles) { + StoreFile.Reader r = hsf.getReader(); + if (r == null) { + LOG.warn("StoreFile " + hsf + " has a null Reader"); + continue; } - } finally { - this.lock.writeLock().unlock(); + this.storeSize += r.length(); + this.totalUncompressedBytes += r.getTotalUncompressedBytes(); } return result; } @@ -1608,7 +1675,7 @@ public class Store implements HeapSize { * Return a scanner for both the memstore and the HStore files * @throws IOException */ - public KeyValueScanner getScanner(Scan scan, + public StoreScanner getScanner(Scan scan, final NavigableSet targetCols) throws IOException { lock.readLock().lock(); try { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index b21de77..167daea 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileWriterV1; +import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; @@ -141,6 +142,18 @@ public class StoreFile { // Set when we obtain a Reader. private long sequenceid = -1; + // max of the MemstoreTS in the KV's in this store + // Set when we obtain a Reader. + private long maxMemstoreTS = -1; + + public long getMaxMemstoreTS() { + return maxMemstoreTS; + } + + public void setMaxMemstoreTS(long maxMemstoreTS) { + this.maxMemstoreTS = maxMemstoreTS; + } + // If true, this file was product of a major compaction. Its then set // whenever you get a Reader. private AtomicBoolean majorCompaction = null; @@ -315,6 +328,24 @@ public class StoreFile { } /** + * Return the largest memstoreTS found across all storefiles in + * the given list. Store files that were created by a mapreduce + * bulk load are ignored, as they do not correspond to any specific + * put operation, and thus do not have a memstoreTS associated with them. + * @return 0 if no non-bulk-load files are provided or, this is Store that + * does not yet have any store files. + */ + public static long getMaxMemstoreTSInList(Collection sfs) { + long max = 0; + for (StoreFile sf : sfs) { + if (!sf.isBulkLoadResult()) { + max = Math.max(max, sf.getMaxMemstoreTS()); + } + } + return max; + } + + /** * Return the highest sequence ID found across all storefiles in * the given list. Store files that were created by a mapreduce * bulk load are ignored, as they do not correspond to any edit @@ -463,6 +494,11 @@ public class StoreFile { } this.reader.setSequenceID(this.sequenceid); + b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); + if (b != null) { + this.maxMemstoreTS = Bytes.toLong(b); + } + b = metadataMap.get(MAJOR_COMPACTION_KEY); if (b != null) { boolean mc = Bytes.toBoolean(b); @@ -993,13 +1029,27 @@ public class StoreFile { * * @param cacheBlocks should this scanner cache blocks? * @param pread use pread (for highly concurrent small readers) + * @param isCompaction is this a call for compaction? * @return a scanner */ - public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) { - return new StoreFileScanner(this, getScanner(cacheBlocks, pread)); + public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, + boolean pread, + boolean isCompaction) { + return new StoreFileScanner(this, getScanner(cacheBlocks, pread), !isCompaction); } /** + * Get a scanner to scan over this StoreFile. + * + * @param cacheBlocks should this scanner cache blocks? + * @param pread use pread (for highly concurrent small readers) + * @return a scanner + */ + public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, + boolean pread) { + return getStoreFileScanner(cacheBlocks, pread, false); + } + /** * Warning: Do not write further code which depends on this call. Instead * use getStoreFileScanner() which uses the StoreFileScanner class/interface * which is the preferred way to scan a store with higher level concepts. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 4c0a536..eb9ca40 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -49,6 +49,8 @@ class StoreFileScanner implements KeyValueScanner { private boolean realSeekDone; private boolean delayedReseek; private KeyValue delayedSeekKV; + + private boolean enforceRWCC = false; //The variable, realSeekDone, may cheat on store file scanner for the // multi-column bloom-filter optimization. @@ -61,9 +63,10 @@ class StoreFileScanner implements KeyValueScanner { * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param hfs HFile scanner */ - public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs) { + public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useRWCC) { this.reader = reader; this.hfs = hfs; + this.enforceRWCC = useRWCC; } /** @@ -73,12 +76,13 @@ class StoreFileScanner implements KeyValueScanner { public static List getScannersForStoreFiles( Collection filesToCompact, boolean cacheBlocks, - boolean usePread) throws IOException { + boolean usePread, + boolean isCompaction) throws IOException { List scanners = new ArrayList(filesToCompact.size()); for (StoreFile file : filesToCompact) { StoreFile.Reader r = file.createReader(); - scanners.add(r.getStoreFileScanner(cacheBlocks, usePread)); + scanners.add(r.getStoreFileScanner(cacheBlocks, usePread, isCompaction)); } return scanners; } @@ -93,11 +97,13 @@ class StoreFileScanner implements KeyValueScanner { public KeyValue next() throws IOException { KeyValue retKey = cur; + try { // only seek if we aren't at the end. cur == null implies 'end'. if (cur != null) { hfs.next(); cur = hfs.getKeyValue(); + skipKVsNewerThanReadpoint(); } } catch(IOException e) { throw new IOException("Could not iterate " + this, e); @@ -107,6 +113,7 @@ class StoreFileScanner implements KeyValueScanner { public boolean seek(KeyValue key) throws IOException { seekCount.incrementAndGet(); + try { try { if(!seekAtOrAfter(hfs, key)) { @@ -116,7 +123,8 @@ class StoreFileScanner implements KeyValueScanner { this.isReseekable = true; cur = hfs.getKeyValue(); - return true; + + return skipKVsNewerThanReadpoint(); } finally { realSeekDone = true; } @@ -127,6 +135,7 @@ class StoreFileScanner implements KeyValueScanner { public boolean reseek(KeyValue key) throws IOException { seekCount.incrementAndGet(); + try { try { if (!reseekAtOrAfter(hfs, key)) { @@ -134,7 +143,8 @@ class StoreFileScanner implements KeyValueScanner { return false; } cur = hfs.getKeyValue(); - return true; + + return skipKVsNewerThanReadpoint(); } finally { realSeekDone = true; } @@ -143,6 +153,35 @@ class StoreFileScanner implements KeyValueScanner { } } + protected boolean skipKVsNewerThanReadpoint() throws IOException { + long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); + + // We want to ignore all key-values that are newer than our current + // readPoint + while(enforceRWCC + && cur != null + && (cur.getMemstoreTS() > readPoint)) { + hfs.next(); + cur = hfs.getKeyValue(); + } + + if (cur == null) { + close(); + return false; + } + + // For the optimisation in HBASE-4346, we set the KV's memstoreTS to + // 0, if it is older than all the scanners' read points. It is possible + // that a newer KV's memstoreTS was reset to 0. But, there is an + // older KV which was not reset to 0 (because it was + // not old enough during flush). Make sure that we set it correctly now, + // so that the comparision order does not change. + if (cur.getMemstoreTS() <= readPoint) { + cur.setMemstoreTS(0); + } + return true; + } + public void close() { // Nothing to close on HFileScanner? cur = null; @@ -288,5 +327,4 @@ class StoreFileScanner implements KeyValueScanner { static final long getSeekCount() { return seekCount.get(); } - } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index f5b5c4c..ad4e419 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -91,6 +91,7 @@ class StoreScanner extends NonLazyKeyValueScanner matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), columns, store.ttl, store.comparator.getRawComparator(), store.minVersions, store.versionsToReturn(scan.getMaxVersions()), + Long.MAX_VALUE, false); // Pass columns to try to filter out unnecessary StoreFiles. @@ -123,13 +124,16 @@ class StoreScanner extends NonLazyKeyValueScanner * @param store who we scan * @param scan the spec * @param scanners ancilliary scanners + * @param smallestReadPoint the readPoint that we should use for tracking versions + * @param retainDeletesInOutput should we retain deletes after compaction? */ StoreScanner(Store store, Scan scan, List scanners, - boolean retainDeletesInOutput) throws IOException { + long smallestReadPoint, boolean retainDeletesInOutput) throws IOException { this(store, false, scan, null); matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), null, store.ttl, store.comparator.getRawComparator(), store.minVersions, - store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput); + store.versionsToReturn(scan.getMaxVersions()), + smallestReadPoint, retainDeletesInOutput); // Seek all scanners to the initial key for(KeyValueScanner scanner : scanners) { @@ -148,7 +152,8 @@ class StoreScanner extends NonLazyKeyValueScanner throws IOException { this(null, scan.getCacheBlocks(), scan, columns); this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, - comparator.getRawComparator(), 0, scan.getMaxVersions(), false); + comparator.getRawComparator(), 0, scan.getMaxVersions(), + Long.MAX_VALUE, false); // Seek all scanners to the initial key for(KeyValueScanner scanner : scanners) { @@ -161,18 +166,7 @@ class StoreScanner extends NonLazyKeyValueScanner * @return List of scanners ordered properly. */ private List getScanners() throws IOException { - // First the store file scanners - - // TODO this used to get the store files in descending order, - // but now we get them in ascending order, which I think is - // actually more correct, since memstore get put at the end. - List sfScanners = StoreFileScanner - .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet); - List scanners = - new ArrayList(sfScanners.size()+1); - scanners.addAll(sfScanners); - // Then the memstore scanners - scanners.addAll(this.store.memstore.getScanners()); + Listscanners = this.store.getScanners(cacheBlocks, isGet, false); return scanners; } @@ -191,24 +185,27 @@ class StoreScanner extends NonLazyKeyValueScanner memOnly = false; filesOnly = false; } - List scanners = new LinkedList(); - // First the store file scanners - if (memOnly == false) { - List sfScanners = StoreFileScanner - .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet); - - // include only those scan files which pass all filters - for (StoreFileScanner sfs : sfScanners) { - if (sfs.shouldSeek(scan, columns)) { - scanners.add(sfs); + List allStoreScanners = + this.store.getScanners(cacheBlocks, isGet, false); + + List scanners = + new ArrayList(allStoreScanners.size()); + + // include only those scan files which pass all filters + for (KeyValueScanner kvs : allStoreScanners) { + if (kvs instanceof StoreFileScanner) { + if (memOnly == false && ((StoreFileScanner)kvs).shouldSeek(scan, columns)) { + scanners.add(kvs); + } + } + else { + // kvs is a MemStoreScanner + if (filesOnly == false && this.store.memstore.shouldSeek(scan)) { + scanners.add(kvs); } } } - - // Then the memstore scanners - if ((filesOnly == false) && (this.store.memstore.shouldSeek(scan))) { - scanners.addAll(this.store.memstore.getScanners()); - } + return scanners; } diff --git a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java index 4ac6e09..bb87e36 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java +++ b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java @@ -250,6 +250,12 @@ public class TestAcidGuarantees { writers.add(writer); ctx.addThread(writer); } + // Add a flusher + ctx.addThread(new RepeatingTestThread(ctx) { + public void doAnAction() throws Exception { + util.flush(); + } + }); List getters = Lists.newArrayList(); for (int i = 0; i < numGetters; i++) { @@ -286,7 +292,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2856") public void testGetAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -297,7 +302,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2670") public void testScanAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -308,7 +312,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2670") public void testMixedAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -322,7 +325,7 @@ public class TestAcidGuarantees { Configuration c = HBaseConfiguration.create(); TestAcidGuarantees test = new TestAcidGuarantees(); test.setConf(c); - test.runTestAtomicity(5*60*1000, 5, 2, 2, 3); + test.runTestAtomicity(5000, 50, 2, 2, 3); } private void setConf(Configuration c) { diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index ef16382..9e3a14a 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -192,9 +192,10 @@ public class TestCacheOnWrite { } LOG.info("Block count by type: " + blockCountByType); + String countByType = blockCountByType.toString(); assertEquals( - "{DATA=1367, LEAF_INDEX=172, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", - blockCountByType.toString()); + "{DATA=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", + countByType); reader.close(); } diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java index 78a7cd6..ed020a3 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.*; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -36,8 +38,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.junit.Before; import org.junit.Test; @@ -115,10 +120,36 @@ public class TestHFileWriterV2 { HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize); + // Comparator class name is stored in the trailer in version 2. + RawComparator comparator = trailer.createComparator(); + HFileBlockIndex.BlockIndexReader dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator, + trailer.getNumDataIndexLevels()); + HFileBlockIndex.BlockIndexReader metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader( + Bytes.BYTES_RAWCOMPARATOR, 1); + + HFileBlock.BlockIterator blockIter = blockReader.blockRange( + trailer.getLoadOnOpenDataOffset(), + fileSize - trailer.getTrailerSize()); + // Data index. We also read statistics about the block index written after + // the root level. + dataBlockIndexReader.readMultiLevelIndexRoot( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getDataIndexCount()); + + // Meta index. + metaBlockIndexReader.readRootIndex( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getMetaIndexCount()); + // File info + FileInfo fileInfo = new FileInfo(); + fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO)); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); + boolean includeMemstoreTS = (keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0); // Counters for the number of key/value pairs and the number of blocks int entriesRead = 0; int blocksRead = 0; + long memstoreTS = 0; // Scan blocks the way the reader would scan them fsdis.seek(0); @@ -137,6 +168,15 @@ public class TestHFileWriterV2 { byte[] value = new byte[valueLen]; buf.get(value); + if (includeMemstoreTS) { + ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(), + buf.arrayOffset() + buf.position(), buf.remaining()); + DataInputStream data_input = new DataInputStream(byte_input); + + memstoreTS = WritableUtils.readVLong(data_input); + buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS)); + } + // A brute-force check to see that all keys and values are correct. assertTrue(Bytes.compareTo(key, keys.get(entriesRead)) == 0); assertTrue(Bytes.compareTo(value, values.get(entriesRead)) == 0); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java index 09b66ed..a6bfa94 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java @@ -54,7 +54,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase { long timestamp = 0; //"Match" for(byte [] col : scannerColumns){ - result.add(exp.checkColumn(col, 0, col.length, ++timestamp)); + result.add(exp.checkColumn(col, 0, col.length, ++timestamp, false)); } assertEquals(expected.size(), result.size()); @@ -80,9 +80,9 @@ public class TestExplicitColumnTracker extends HBaseTestCase { List expected = new ArrayList(); expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col1 expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); // col2 - expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col3 + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col3 expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); // col4 - expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); // col5 + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); // col5 int maxVersions = 1; //Create "Scanner" @@ -166,13 +166,13 @@ public class TestExplicitColumnTracker extends HBaseTestCase { Long.MAX_VALUE); for (int i = 0; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length, 1); + explicit.checkColumn(col, 0, col.length, 1, false); } explicit.update(); for (int i = 1; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length, 1); + explicit.checkColumn(col, 0, col.length, 1, false); } } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index 696a0f0..1fce238 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -123,7 +123,7 @@ public class TestFSErrorsExposed { StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf, BloomType.NONE); List scanners = StoreFileScanner.getScannersForStoreFiles( - Collections.singletonList(sf), false, true); + Collections.singletonList(sf), false, true, false); KeyValueScanner scanner = scanners.get(0); FaultyInputStream inStream = fs.inStreams.get(0).get(); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java index 135a136..f3543a7 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java @@ -54,7 +54,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { for(byte [] qualifier : qualifiers) { ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0, - qualifier.length, 1); + qualifier.length, 1, false); actual.add(mc); } @@ -87,7 +87,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { long timestamp = 0; for(byte [] qualifier : qualifiers) { MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length, - ++timestamp); + ++timestamp, false); actual.add(mc); } @@ -110,7 +110,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase { try { for(byte [] qualifier : qualifiers) { - tracker.checkColumn(qualifier, 0, qualifier.length, 1); + tracker.checkColumn(qualifier, 0, qualifier.length, 1, false); } } catch (Exception e) { ok = true; diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index 44d2c9d..0a68029 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -773,7 +773,8 @@ public class TestStoreFile extends HBaseTestCase { for (int i=numKVs;i>0;i--) { KeyValue kv = new KeyValue(b, b, b, i, b); kvs.add(kv); - totalSize += kv.getLength(); + // kv has memstoreTS 0, which takes 1 byte to store. + totalSize += kv.getLength() + 1; } int blockSize = totalSize / numBlocks; StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize,