diff --git a/pom.xml b/pom.xml
index bfe8103..fc5e5ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -258,7 +258,7 @@
maven-surefire-plugin
- 2.9
+ 2.8
900
-enableassertions -Xmx1400m
@@ -322,7 +322,7 @@
maven-surefire-report-plugin
- 2.9
+ 2.8
org.apache.avro
diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
index 90ef2de..fd109e8 100644
--- a/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
+++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
@@ -290,7 +290,6 @@ public abstract class AbstractHFileReader implements HFile.Reader {
}
protected static abstract class Scanner implements HFileScanner {
- protected HFile.Reader reader;
protected ByteBuffer blockBuffer;
protected boolean cacheBlocks;
@@ -299,30 +298,26 @@ public abstract class AbstractHFileReader implements HFile.Reader {
protected int currKeyLen;
protected int currValueLen;
+ protected int currMemstoreTSLen;
+ protected long currMemstoreTS;
protected int blockFetches;
- public Scanner(final HFile.Reader reader, final boolean cacheBlocks,
+ public Scanner(final boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
- this.reader = reader;
this.cacheBlocks = cacheBlocks;
this.pread = pread;
this.isCompaction = isCompaction;
}
@Override
- public Reader getReader() {
- return reader;
- }
-
- @Override
public boolean isSeeked(){
return blockBuffer != null;
}
@Override
public String toString() {
- return "HFileScanner for reader " + String.valueOf(reader);
+ return "HFileScanner for reader " + String.valueOf(getReader());
}
protected void assertSeeked() {
diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
index 2bb93fa..d9067af 100644
--- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
+++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
@@ -374,13 +374,13 @@ public class HFileReaderV1 extends AbstractHFileReader {
* Implementation of {@link HFileScanner} interface.
*/
protected static class ScannerV1 extends AbstractHFileReader.Scanner {
- private final HFileReaderV1 readerV1;
+ private final HFileReaderV1 reader;
private int currBlock;
public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
- super(reader, cacheBlocks, pread, isCompaction);
- readerV1 = reader;
+ super(cacheBlocks, pread, isCompaction);
+ this.reader = reader;
}
@Override
@@ -447,7 +447,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
blockBuffer = null;
return false;
}
- blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread,
+ blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
isCompaction);
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
@@ -467,7 +467,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
@Override
public int seekTo(byte[] key, int offset, int length) throws IOException {
- int b = readerV1.blockContainingKey(key, offset, length);
+ int b = reader.blockContainingKey(key, offset, length);
if (b < 0) return -1; // falls before the beginning of the file! :-(
// Avoid re-reading the same block (that'd be dumb).
loadBlock(b, true);
@@ -493,7 +493,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
}
}
- int b = readerV1.blockContainingKey(key, offset, length);
+ int b = reader.blockContainingKey(key, offset, length);
if (b < 0) {
return -1;
}
@@ -560,7 +560,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
@Override
public boolean seekBefore(byte[] key, int offset, int length)
throws IOException {
- int b = readerV1.blockContainingKey(key, offset, length);
+ int b = reader.blockContainingKey(key, offset, length);
if (b < 0)
return false; // key is before the start of the file.
@@ -612,7 +612,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
return true;
}
currBlock = 0;
- blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread,
+ blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
isCompaction);
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
@@ -622,13 +622,13 @@ public class HFileReaderV1 extends AbstractHFileReader {
private void loadBlock(int bloc, boolean rewind) throws IOException {
if (blockBuffer == null) {
- blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread,
+ blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
isCompaction);
currBlock = bloc;
blockFetches++;
} else {
if (bloc != currBlock) {
- blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread,
+ blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
isCompaction);
currBlock = bloc;
blockFetches++;
diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
index 6db9abc..e39fbee 100644
--- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
+++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
@@ -19,7 +19,9 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import java.io.ByteArrayInputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.io.WritableUtils;
/**
* {@link HFile} reader for version 2.
@@ -45,7 +48,13 @@ public class HFileReaderV2 extends AbstractHFileReader {
* The size of a (key length, value length) tuple that prefixes each entry in
* a data block.
*/
- private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
+ private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
+
+ private boolean includesMemstoreTS = false;
+
+ private boolean shouldIncludeMemstoreTS() {
+ return includesMemstoreTS;
+ }
/**
* A "sparse lock" implementation allowing to lock on a particular block
@@ -114,6 +123,9 @@ public class HFileReaderV2 extends AbstractHFileReader {
lastKey = fileInfo.get(FileInfo.LASTKEY);
avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
+ byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
+ includesMemstoreTS = (keyValueFormatVersion != null &&
+ Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE);
// Store all other load-on-open blocks for further consumption.
HFileBlock b;
@@ -314,10 +326,17 @@ public class HFileReaderV2 extends AbstractHFileReader {
*/
protected static class ScannerV2 extends AbstractHFileReader.Scanner {
private HFileBlock block;
+ private HFileReaderV2 reader;
public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
- super(r, cacheBlocks, pread, isCompaction);
+ super(cacheBlocks, pread, isCompaction);
+ this.reader = r;
+ }
+
+ @Override
+ public HFileReaderV2 getReader() {
+ return reader;
}
@Override
@@ -325,8 +344,12 @@ public class HFileReaderV2 extends AbstractHFileReader {
if (!isSeeked())
return null;
- return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position());
+ if (this.reader.shouldIncludeMemstoreTS()) {
+ ret.setMemstoreTS(currMemstoreTS);
+ }
+ return ret;
}
@Override
@@ -352,6 +375,8 @@ public class HFileReaderV2 extends AbstractHFileReader {
blockBuffer = null;
currKeyLen = 0;
currValueLen = 0;
+ currMemstoreTS = 0;
+ currMemstoreTSLen = 0;
}
/**
@@ -367,7 +392,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
try {
blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE
- + currKeyLen + currValueLen);
+ + currKeyLen + currValueLen + currMemstoreTSLen);
} catch (IllegalArgumentException e) {
LOG.error("Current pos = " + blockBuffer.position()
+ "; currKeyLen = " + currKeyLen + "; currValLen = "
@@ -560,6 +585,16 @@ public class HFileReaderV2 extends AbstractHFileReader {
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
blockBuffer.reset();
+ if (this.reader.shouldIncludeMemstoreTS()) {
+ try {
+ int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
+ + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
+ currMemstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset);
+ currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading memstoreTS. " + e);
+ }
+ }
if (currKeyLen < 0 || currValueLen < 0
|| currKeyLen > blockBuffer.limit()
@@ -587,12 +622,24 @@ public class HFileReaderV2 extends AbstractHFileReader {
private int blockSeek(byte[] key, int offset, int length,
boolean seekBefore) {
int klen, vlen;
+ long memstoreTS = 0;
+ int memstoreTSLen = 0;
int lastKeyValueSize = -1;
do {
blockBuffer.mark();
klen = blockBuffer.getInt();
vlen = blockBuffer.getInt();
blockBuffer.reset();
+ if (this.reader.shouldIncludeMemstoreTS()) {
+ try {
+ int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
+ + KEY_VALUE_LEN_SIZE + klen + vlen;
+ memstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset);
+ memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading memstoreTS. " + e);
+ }
+ }
int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE;
@@ -614,6 +661,10 @@ public class HFileReaderV2 extends AbstractHFileReader {
}
currKeyLen = klen;
currValueLen = vlen;
+ if (this.reader.shouldIncludeMemstoreTS()) {
+ currMemstoreTS = memstoreTS;
+ currMemstoreTSLen = memstoreTSLen;
+ }
return 0; // indicate exact match
}
@@ -625,7 +676,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
}
// The size of this key/value tuple, including key/value length fields.
- lastKeyValueSize = klen + vlen + KEY_VALUE_LEN_SIZE;
+ lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
} while (blockBuffer.remaining() > 0);
diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
index 069eedf..fad2079 100644
--- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
+++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
@@ -36,9 +36,11 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
/**
* Writes HFile format version 2.
@@ -46,6 +48,13 @@ import org.apache.hadoop.io.Writable;
public class HFileWriterV2 extends AbstractHFileWriter {
static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
+ /** Max memstore (rwcc) timestamp in FileInfo */
+ public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
+ /** KeyValue version in FileInfo */
+ public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
+ /** Version for KeyValue which includes memstore timestamp */
+ public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
+
/** Inline block writers for multi-level block index and compound Blooms. */
private List inlineBlockWriters =
new ArrayList();
@@ -66,6 +75,9 @@ public class HFileWriterV2 extends AbstractHFileWriter {
private List additionalLoadOnOpenData =
new ArrayList();
+ private final boolean includeMemstoreTS = true;
+ private long maxMemstoreTS = 0;
+
static class WriterFactoryV2 extends HFile.WriterFactory {
WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
@@ -297,8 +309,9 @@ public class HFileWriterV2 extends AbstractHFileWriter {
*/
@Override
public void append(final KeyValue kv) throws IOException {
- append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
+ append(kv.getMemstoreTS(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+ this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMemstoreTS());
}
/**
@@ -313,7 +326,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
*/
@Override
public void append(final byte[] key, final byte[] value) throws IOException {
- append(key, 0, key.length, value, 0, value.length);
+ append(0, key, 0, key.length, value, 0, value.length);
}
/**
@@ -328,7 +341,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
* @param vlength
* @throws IOException
*/
- private void append(final byte[] key, final int koffset, final int klength,
+ private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength,
final byte[] value, final int voffset, final int vlength)
throws IOException {
boolean dupKey = checkKey(key, koffset, klength);
@@ -341,6 +354,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
newBlock();
// Write length of key and value and then actual key and value bytes.
+ // Additionally, we may also write down the memstoreTS.
{
DataOutputStream out = fsBlockWriter.getUserDataStream();
out.writeInt(klength);
@@ -349,6 +363,9 @@ public class HFileWriterV2 extends AbstractHFileWriter {
totalValueLength += vlength;
out.write(key, koffset, klength);
out.write(value, voffset, vlength);
+ if (this.includeMemstoreTS) {
+ WritableUtils.writeVLong(out, memstoreTS);
+ }
}
// Are we the first key in this block?
@@ -412,6 +429,11 @@ public class HFileWriterV2 extends AbstractHFileWriter {
BlockType.ROOT_INDEX, false), "meta");
fsBlockWriter.writeHeaderAndData(outputStream);
+ if (this.includeMemstoreTS) {
+ appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
+ appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
+ }
+
// File info
writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO,
false));
@@ -430,6 +452,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
trailer.setComparatorClass(comparator.getClass());
trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
+
finishClose(trailer);
fsBlockWriter.releaseCompressor();
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
index 8e90952..798b54d 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
@@ -47,12 +47,13 @@ public interface ColumnTracker {
* @param offset
* @param length
* @param ttl The timeToLive to enforce.
+ * @param ignoreCount -- should we keep count of this KV
* @return The match code instance.
* @throws IOException in case there is an internal consistency problem
* caused by a data corruption.
*/
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
- int length, long ttl) throws IOException;
+ int length, long ttl, boolean ignoreCount) throws IOException;
/**
* Updates internal variables in between files
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
index e1cfdb9..cc62a7b 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
@@ -103,10 +103,13 @@ public class ExplicitColumnTracker implements ColumnTracker {
* @param offset offset to the start of the qualifier
* @param length length of the qualifier
* @param timestamp timestamp of the key being checked
+ * @param ignoreCount indicates if the KV needs to be excluded while counting
+ * (used during compactions. We only count KV's that are older than all the
+ * scanners' read points.)
* @return MatchCode telling ScanQueryMatcher what action to take
*/
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
- int length, long timestamp) {
+ int length, long timestamp, boolean ignoreCount) {
do {
// No more columns left, we are done with this query
if(this.columns.size() == 0) {
@@ -125,6 +128,8 @@ public class ExplicitColumnTracker implements ColumnTracker {
// Column Matches. If it is not a duplicate key, increment the version count
// and include.
if(ret == 0) {
+ if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
+
//If column matches, check if it is a duplicate timestamp
if (sameAsPreviousTS(timestamp)) {
//If duplicate, skip this Key
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index fcd071a..280dfdf 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -205,6 +205,29 @@ public class HRegion implements HeapSize { // , Writable{
final Path regiondir;
KeyValue.KVComparator comparator;
+ private ConcurrentHashMap scannerReadPoints;
+
+ /*
+ * @return The smallest rwcc readPoint across all the scanners in this
+ * region. Writes older than this readPoint, are included in every
+ * read operation.
+ */
+ public long getSmallestReadPoint() {
+ long minimumReadPoint;
+ // We need to ensure that while we are calculating the smallestReadPoint
+ // no new RegionScanners can grab a readPoint that we are unaware of.
+ // We achieve this by synchronizing on the scannerReadPoints object.
+ synchronized(scannerReadPoints) {
+ minimumReadPoint = rwcc.memstoreReadPoint();
+
+ for (Long readPoint: this.scannerReadPoints.values()) {
+ if (readPoint < minimumReadPoint) {
+ minimumReadPoint = readPoint;
+ }
+ }
+ }
+ return minimumReadPoint;
+ }
/*
* Data structure of write state flags used coordinating flushes,
* compactions and closes.
@@ -291,6 +314,7 @@ public class HRegion implements HeapSize { // , Writable{
this.htableDescriptor = null;
this.threadWakeFrequency = 0L;
this.coprocessorHost = null;
+ this.scannerReadPoints = new ConcurrentHashMap();
}
/**
@@ -334,6 +358,7 @@ public class HRegion implements HeapSize { // , Writable{
String encodedNameStr = this.regionInfo.getEncodedName();
setHTableSpecificConf();
this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
+ this.scannerReadPoints = new ConcurrentHashMap();
// don't initialize coprocessors if not running within a regionserver
// TODO: revisit if coprocessors should load in other cases
@@ -399,6 +424,8 @@ public class HRegion implements HeapSize { // , Writable{
// Load in all the HStores. Get maximum seqid.
long maxSeqId = -1;
+ // initialized to -1 so that we pick up MemstoreTS from column families
+ long maxMemstoreTS = -1;
for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) {
status.setStatus("Instantiating store for column family " + c);
Store store = instantiateHStore(this.tableDir, c);
@@ -407,7 +434,12 @@ public class HRegion implements HeapSize { // , Writable{
if (storeSeqId > maxSeqId) {
maxSeqId = storeSeqId;
}
+ long maxStoreMemstoreTS = store.getMaxMemstoreTS();
+ if (maxStoreMemstoreTS > maxMemstoreTS) {
+ maxMemstoreTS = maxStoreMemstoreTS;
+ }
}
+ rwcc.initialize(maxMemstoreTS + 1);
// Recover any edits if available.
maxSeqId = replayRecoveredEditsIfAny(
this.regiondir, maxSeqId, reporter, status);
@@ -1549,6 +1581,8 @@ public class HRegion implements HeapSize { // , Writable{
this.put(put, lockid, put.getWriteToWAL());
}
+
+
/**
* @param put
* @param lockid
@@ -2082,7 +2116,7 @@ public class HRegion implements HeapSize { // , Writable{
long size = 0;
try {
w = rwcc.beginMemstoreInsert();
-
+
for (Map.Entry> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List edits = e.getValue();
@@ -2093,11 +2127,12 @@ public class HRegion implements HeapSize { // , Writable{
size += store.add(kv);
}
}
- } finally {
- rwcc.completeMemstoreInsert(w);
- }
- return size;
- }
+ } finally {
+ rwcc.completeMemstoreInsert(w);
+ }
+
+ return size;
+ }
/**
* Check the collection of families for validity.
@@ -2644,6 +2679,7 @@ public class HRegion implements HeapSize { // , Writable{
}
RegionScannerImpl(Scan scan, List additionalScanners) throws IOException {
//DebugPrint.println("HRegionScanner.");
+
this.filter = scan.getFilter();
this.batch = scan.getBatch();
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
@@ -2655,8 +2691,13 @@ public class HRegion implements HeapSize { // , Writable{
// it is [startRow,endRow) and if startRow=endRow we get nothing.
this.isScan = scan.isGetScan() ? -1 : 0;
- this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
-
+ // synchronize on scannerReadPoints so that nobody calculates
+ // getSmallestReadPoint, before scannerReadPoints is updated.
+ synchronized(scannerReadPoints) {
+ this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ scannerReadPoints.put(this, this.readPt);
+ }
+
List scanners = new ArrayList();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
@@ -2665,7 +2706,8 @@ public class HRegion implements HeapSize { // , Writable{
for (Map.Entry> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
- scanners.add(store.getScanner(scan, entry.getValue()));
+ StoreScanner scanner = store.getScanner(scan, entry.getValue());
+ scanners.add(scanner);
}
this.storeHeap = new KeyValueHeap(scanners, comparator);
}
@@ -2816,6 +2858,8 @@ public class HRegion implements HeapSize { // , Writable{
storeHeap.close();
storeHeap = null;
}
+ // no need to sychronize here.
+ scannerReadPoints.remove(this);
this.filterClosed = true;
}
@@ -3867,7 +3911,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 28 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
+ 29 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
(4 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN);
@@ -3876,7 +3920,7 @@ public class HRegion implements HeapSize { // , Writable{
(2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
ClassSize.ATOMIC_LONG + // memStoreSize
ClassSize.ATOMIC_INTEGER + // lockIdGenerator
- (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds
+ (3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints
WriteState.HEAP_SIZE + // writestate
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 34263e4..d751f35 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -641,6 +641,10 @@ public class MemStore implements HeapSize {
private Iterator kvsetIt;
private Iterator snapshotIt;
+ // The kvset and snapshot at the time of creating this scanner
+ volatile KeyValueSkipListSet kvsetAtCreation;
+ volatile KeyValueSkipListSet snapshotAtCreation;
+
// Sub lists on which we're iterating
private SortedSet kvTail;
private SortedSet snapshotTail;
@@ -671,6 +675,9 @@ public class MemStore implements HeapSize {
MemStoreScanner() {
super();
+
+ kvsetAtCreation = kvset;
+ snapshotAtCreation = snapshot;
}
protected KeyValue getNext(Iterator it) {
@@ -702,8 +709,8 @@ public class MemStore implements HeapSize {
// kvset and snapshot will never be null.
// if tailSet can't find anything, SortedSet is empty (not null).
- kvTail = kvset.tailSet(key);
- snapshotTail = snapshot.tailSet(key);
+ kvTail = kvsetAtCreation.tailSet(key);
+ snapshotTail = snapshotAtCreation.tailSet(key);
return seekInSubLists(key);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
index 8ec53d3..fbd8cf1 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
@@ -24,6 +24,9 @@ import java.util.LinkedList;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
/**
* Manages the read/write consistency within memstore. This provides
* an interface for readers to determine what entries to ignore, and
@@ -41,7 +44,34 @@ public class ReadWriteConsistencyControl {
new LinkedList();
private static final ThreadLocal perThreadReadPoint =
- new ThreadLocal();
+ new ThreadLocal() {
+ @Override
+ protected
+ Long initialValue() {
+ return Long.MAX_VALUE;
+ }
+ };
+
+ /**
+ * Default constructor. Initializes the memstoreRead/Write points to 0.
+ */
+ public ReadWriteConsistencyControl() {
+ this.memstoreRead = this.memstoreWrite = 0;
+ }
+
+ /**
+ * Initializes the memstoreRead/Write points appropriately.
+ * @param startPoint
+ */
+ public void initialize(long startPoint) {
+ synchronized (writeQueue) {
+ if (this.memstoreWrite != this.memstoreRead) {
+ throw new RuntimeException("Already used this rwcc. Too late to initialize");
+ }
+
+ this.memstoreRead = this.memstoreWrite = startPoint;
+ }
+ }
/**
* Get this thread's read point. Used primarily by the memstore scanner to
@@ -49,7 +79,7 @@ public class ReadWriteConsistencyControl {
* memstore).
*/
public static long getThreadReadPoint() {
- return perThreadReadPoint.get();
+ return perThreadReadPoint.get();
}
/**
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index f86f1fe..ed02421 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -60,6 +60,9 @@ public class ScanQueryMatcher {
/** Row the query is on */
protected byte [] row;
+ /** readPoint over which the KVs are unconditionally included */
+ protected long maxReadPointToTrackVersions;
+
/**
* Constructs a ScanQueryMatcher for a Scan.
* @param scan
@@ -71,6 +74,7 @@ public class ScanQueryMatcher {
public ScanQueryMatcher(Scan scan, byte [] family,
NavigableSet columns, long ttl,
KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions,
+ long readPointToUse,
boolean retainDeletesInOutput) {
this.tr = scan.getTimeRange();
this.rowComparator = rowComparator;
@@ -79,6 +83,7 @@ public class ScanQueryMatcher {
this.startKey = KeyValue.createFirstOnRow(scan.getStartRow(), family, null);
this.filter = scan.getFilter();
this.retainDeletesInOutput = retainDeletesInOutput;
+ this.maxReadPointToTrackVersions = readPointToUse;
// Single branch to deal with two types of reads (columns vs all in family)
if (columns == null || columns.size() == 0) {
@@ -98,6 +103,7 @@ public class ScanQueryMatcher {
/* By default we will not include deletes */
/* deletes are included explicitly (for minor compaction) */
this(scan, family, columns, ttl, rowComparator, minVersions, maxVersions,
+ Long.MAX_VALUE, /* max Readpoint to track versions */
false);
}
public ScanQueryMatcher(Scan scan, byte [] family,
@@ -222,7 +228,8 @@ public class ScanQueryMatcher {
}
}
- MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp);
+ MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp,
+ kv.getMemstoreTS() > maxReadPointToTrackVersions);
/*
* According to current implementation, colChecker can only be
* SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
index 0914b04..49b4b27 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
@@ -66,19 +66,25 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
* @param offset
* @param length
* @param timestamp
+ * @param ignoreCount indicates if the KV needs to be excluded while counting
+ * (used during compactions. We only count KV's that are older than all the
+ * scanners' read points.)
* @return The match code instance.
*/
@Override
public MatchCode checkColumn(byte[] bytes, int offset, int length,
- long timestamp) throws IOException {
+ long timestamp, boolean ignoreCount) throws IOException {
if (columnBuffer == null) {
// first iteration.
resetBuffer(bytes, offset, length);
+ if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
return checkVersion(++currentCount, timestamp);
}
int cmp = Bytes.compareTo(bytes, offset, length,
columnBuffer, columnOffset, columnLength);
if (cmp == 0) {
+ if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
+
//If column matches, check if it is a duplicate timestamp
if (sameAsPreviousTS(timestamp)) {
return ScanQueryMatcher.MatchCode.SKIP;
@@ -92,6 +98,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
if (cmp > 0) {
// switched columns, lets do something.x
resetBuffer(bytes, offset, length);
+ if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
return checkVersion(++currentCount, timestamp);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 7761c42..fdeb0cb 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -227,6 +227,13 @@ public class Store implements HeapSize {
}
/**
+ * @return The maximum memstoreTS in all store files.
+ */
+ public long getMaxMemstoreTS() {
+ return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
+ }
+
+ /**
* @param tabledir
* @param encodedName Encoded region name.
* @param family
@@ -386,10 +393,15 @@ public class Store implements HeapSize {
ArrayList newFiles = new ArrayList(storefiles);
newFiles.add(sf);
this.storefiles = sortAndClone(newFiles);
- notifyChangedReadersObservers();
} finally {
+ // We need the lock, as long as we are updating the storefiles
+ // or changing the memstore. Let us release it before calling
+ // notifyChangeReadersObservers. See HBASE-4485 for a possible
+ // deadlock scenario that could have happened if continue to hold
+ // the lock.
this.lock.writeLock().unlock();
}
+ notifyChangedReadersObservers();
LOG.info("Successfully loaded store file " + srcPath
+ " into store " + this + " (new location: " + dstPath + ")");
}
@@ -471,6 +483,8 @@ public class Store implements HeapSize {
throws IOException {
StoreFile.Writer writer;
String fileName;
+ // Find the smallest read point across all the Scanners.
+ long smallestReadPoint = region.getSmallestReadPoint();
long flushed = 0;
// Don't flush if there are no entries.
if (set.size() == 0) {
@@ -483,7 +497,7 @@ public class Store implements HeapSize {
// pass true as the StoreScanner's retainDeletesInOutput argument.
InternalScanner scanner = new StoreScanner(this, scan,
Collections.singletonList(new CollectionBackedScanner(set,
- this.comparator)), true);
+ this.comparator)), this.region.getSmallestReadPoint(), true);
try {
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
@@ -501,6 +515,14 @@ public class Store implements HeapSize {
hasMore = scanner.next(kvs);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
+ // If we know that this KV is going to be included always, then let us
+ // set its memstoreTS to 0. This will help us save space when writing to disk.
+ if (kv.getMemstoreTS() <= smallestReadPoint) {
+ // let us not change the original KV. It could be in the memstore
+ // changing its memstoreTS could affect other threads/scanners.
+ kv = kv.shallowCopy();
+ kv.setMemstoreTS(0);
+ }
writer.append(kv);
flushed += this.memstore.heapSizeChange(kv, true);
}
@@ -582,15 +604,21 @@ public class Store implements HeapSize {
ArrayList newList = new ArrayList(storefiles);
newList.add(sf);
storefiles = sortAndClone(newList);
- this.memstore.clearSnapshot(set);
- // Tell listeners of the change in readers.
- notifyChangedReadersObservers();
-
- return needsCompaction();
+ this.memstore.clearSnapshot(set);
} finally {
+ // We need the lock, as long as we are updating the storefiles
+ // or changing the memstore. Let us release it before calling
+ // notifyChangeReadersObservers. See HBASE-4485 for a possible
+ // deadlock scenario that could have happened if continue to hold
+ // the lock.
this.lock.writeLock().unlock();
}
+
+ // Tell listeners of the change in readers.
+ notifyChangedReadersObservers();
+
+ return needsCompaction();
}
/*
@@ -603,6 +631,33 @@ public class Store implements HeapSize {
}
}
+ protected List getScanners(boolean cacheBlocks,
+ boolean isGet, boolean isCompaction) throws IOException {
+ List storeFiles;
+ List memStoreScanners;
+ this.lock.readLock().lock();
+ try {
+ storeFiles = this.getStorefiles();
+ memStoreScanners = this.memstore.getScanners();
+ } finally {
+ this.lock.readLock().unlock();
+ }
+
+ // First the store file scanners
+
+ // TODO this used to get the store files in descending order,
+ // but now we get them in ascending order, which I think is
+ // actually more correct, since memstore get put at the end.
+ List sfScanners = StoreFileScanner
+ .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction);
+ List scanners =
+ new ArrayList(sfScanners.size()+1);
+ scanners.addAll(sfScanners);
+ // Then the memstore scanners
+ scanners.addAll(memStoreScanners);
+ return scanners;
+ }
+
/*
* @param o Observer who wants to know about changes in set of Readers
*/
@@ -1122,18 +1177,21 @@ public class Store implements HeapSize {
// For each file, obtain a scanner:
List scanners = StoreFileScanner
- .getScannersForStoreFiles(filesToCompact, false, false);
+ .getScannersForStoreFiles(filesToCompact, false, false, true);
// Make the instantiation lazy in case compaction produces no product; i.e.
// where all source cells are expired or deleted.
StoreFile.Writer writer = null;
+ // Find the smallest read point across all the Scanners.
+ long smallestReadPoint = region.getSmallestReadPoint();
+ ReadWriteConsistencyControl.setThreadReadPoint(smallestReadPoint);
try {
InternalScanner scanner = null;
try {
Scan scan = new Scan();
scan.setMaxVersions(family.getMaxVersions());
/* include deletes, unless we are doing a major compaction */
- scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
+ scanner = new StoreScanner(this, scan, scanners, smallestReadPoint, !majorCompaction);
if (region.getCoprocessorHost() != null) {
InternalScanner cpScanner = region.getCoprocessorHost().preCompact(
this, scanner);
@@ -1160,6 +1218,9 @@ public class Store implements HeapSize {
if (writer != null) {
// output to writer:
for (KeyValue kv : kvs) {
+ if (kv.getMemstoreTS() <= smallestReadPoint) {
+ kv.setMemstoreTS(0);
+ }
writer.append(kv);
// update progress per key
++progress.currentCompactedKVs;
@@ -1262,8 +1323,8 @@ public class Store implements HeapSize {
this.family.getBloomFilterType());
result.createReader();
}
- this.lock.writeLock().lock();
try {
+ this.lock.writeLock().lock();
try {
// Change this.storefiles so it reflects new state but do not
// delete old store files until we have sent out notification of
@@ -1279,34 +1340,40 @@ public class Store implements HeapSize {
}
this.storefiles = sortAndClone(newStoreFiles);
+ } finally {
+ // We need the lock, as long as we are updating the storefiles
+ // or changing the memstore. Let us release it before calling
+ // notifyChangeReadersObservers. See HBASE-4485 for a possible
+ // deadlock scenario that could have happened if continue to hold
+ // the lock.
+ this.lock.writeLock().unlock();
+ }
- // Tell observers that list of StoreFiles has changed.
- notifyChangedReadersObservers();
- // Finally, delete old store files.
- for (StoreFile hsf: compactedFiles) {
- hsf.deleteReader();
- }
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- LOG.error("Failed replacing compacted files in " + this.storeNameStr +
- ". Compacted file is " + (result == null? "none": result.toString()) +
- ". Files replaced " + compactedFiles.toString() +
- " some of which may have been already removed", e);
+ // Tell observers that list of StoreFiles has changed.
+ notifyChangedReadersObservers();
+ // Finally, delete old store files.
+ for (StoreFile hsf: compactedFiles) {
+ hsf.deleteReader();
}
- // 4. Compute new store size
- this.storeSize = 0L;
- this.totalUncompressedBytes = 0L;
- for (StoreFile hsf : this.storefiles) {
- StoreFile.Reader r = hsf.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + hsf + " has a null Reader");
- continue;
- }
- this.storeSize += r.length();
- this.totalUncompressedBytes += r.getTotalUncompressedBytes();
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.error("Failed replacing compacted files in " + this.storeNameStr +
+ ". Compacted file is " + (result == null? "none": result.toString()) +
+ ". Files replaced " + compactedFiles.toString() +
+ " some of which may have been already removed", e);
+ }
+
+ // 4. Compute new store size
+ this.storeSize = 0L;
+ this.totalUncompressedBytes = 0L;
+ for (StoreFile hsf : this.storefiles) {
+ StoreFile.Reader r = hsf.getReader();
+ if (r == null) {
+ LOG.warn("StoreFile " + hsf + " has a null Reader");
+ continue;
}
- } finally {
- this.lock.writeLock().unlock();
+ this.storeSize += r.length();
+ this.totalUncompressedBytes += r.getTotalUncompressedBytes();
}
return result;
}
@@ -1608,7 +1675,7 @@ public class Store implements HeapSize {
* Return a scanner for both the memstore and the HStore files
* @throws IOException
*/
- public KeyValueScanner getScanner(Scan scan,
+ public StoreScanner getScanner(Scan scan,
final NavigableSet targetCols) throws IOException {
lock.readLock().lock();
try {
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index b21de77..167daea 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV1;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
@@ -141,6 +142,18 @@ public class StoreFile {
// Set when we obtain a Reader.
private long sequenceid = -1;
+ // max of the MemstoreTS in the KV's in this store
+ // Set when we obtain a Reader.
+ private long maxMemstoreTS = -1;
+
+ public long getMaxMemstoreTS() {
+ return maxMemstoreTS;
+ }
+
+ public void setMaxMemstoreTS(long maxMemstoreTS) {
+ this.maxMemstoreTS = maxMemstoreTS;
+ }
+
// If true, this file was product of a major compaction. Its then set
// whenever you get a Reader.
private AtomicBoolean majorCompaction = null;
@@ -315,6 +328,24 @@ public class StoreFile {
}
/**
+ * Return the largest memstoreTS found across all storefiles in
+ * the given list. Store files that were created by a mapreduce
+ * bulk load are ignored, as they do not correspond to any specific
+ * put operation, and thus do not have a memstoreTS associated with them.
+ * @return 0 if no non-bulk-load files are provided or, this is Store that
+ * does not yet have any store files.
+ */
+ public static long getMaxMemstoreTSInList(Collection sfs) {
+ long max = 0;
+ for (StoreFile sf : sfs) {
+ if (!sf.isBulkLoadResult()) {
+ max = Math.max(max, sf.getMaxMemstoreTS());
+ }
+ }
+ return max;
+ }
+
+ /**
* Return the highest sequence ID found across all storefiles in
* the given list. Store files that were created by a mapreduce
* bulk load are ignored, as they do not correspond to any edit
@@ -463,6 +494,11 @@ public class StoreFile {
}
this.reader.setSequenceID(this.sequenceid);
+ b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
+ if (b != null) {
+ this.maxMemstoreTS = Bytes.toLong(b);
+ }
+
b = metadataMap.get(MAJOR_COMPACTION_KEY);
if (b != null) {
boolean mc = Bytes.toBoolean(b);
@@ -993,13 +1029,27 @@ public class StoreFile {
*
* @param cacheBlocks should this scanner cache blocks?
* @param pread use pread (for highly concurrent small readers)
+ * @param isCompaction is this a call for compaction?
* @return a scanner
*/
- public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) {
- return new StoreFileScanner(this, getScanner(cacheBlocks, pread));
+ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
+ boolean pread,
+ boolean isCompaction) {
+ return new StoreFileScanner(this, getScanner(cacheBlocks, pread), !isCompaction);
}
/**
+ * Get a scanner to scan over this StoreFile.
+ *
+ * @param cacheBlocks should this scanner cache blocks?
+ * @param pread use pread (for highly concurrent small readers)
+ * @return a scanner
+ */
+ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
+ boolean pread) {
+ return getStoreFileScanner(cacheBlocks, pread, false);
+ }
+ /**
* Warning: Do not write further code which depends on this call. Instead
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
* which is the preferred way to scan a store with higher level concepts.
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 4c0a536..eb9ca40 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -49,6 +49,8 @@ class StoreFileScanner implements KeyValueScanner {
private boolean realSeekDone;
private boolean delayedReseek;
private KeyValue delayedSeekKV;
+
+ private boolean enforceRWCC = false;
//The variable, realSeekDone, may cheat on store file scanner for the
// multi-column bloom-filter optimization.
@@ -61,9 +63,10 @@ class StoreFileScanner implements KeyValueScanner {
* Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
* @param hfs HFile scanner
*/
- public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs) {
+ public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useRWCC) {
this.reader = reader;
this.hfs = hfs;
+ this.enforceRWCC = useRWCC;
}
/**
@@ -73,12 +76,13 @@ class StoreFileScanner implements KeyValueScanner {
public static List getScannersForStoreFiles(
Collection filesToCompact,
boolean cacheBlocks,
- boolean usePread) throws IOException {
+ boolean usePread,
+ boolean isCompaction) throws IOException {
List scanners =
new ArrayList(filesToCompact.size());
for (StoreFile file : filesToCompact) {
StoreFile.Reader r = file.createReader();
- scanners.add(r.getStoreFileScanner(cacheBlocks, usePread));
+ scanners.add(r.getStoreFileScanner(cacheBlocks, usePread, isCompaction));
}
return scanners;
}
@@ -93,11 +97,13 @@ class StoreFileScanner implements KeyValueScanner {
public KeyValue next() throws IOException {
KeyValue retKey = cur;
+
try {
// only seek if we aren't at the end. cur == null implies 'end'.
if (cur != null) {
hfs.next();
cur = hfs.getKeyValue();
+ skipKVsNewerThanReadpoint();
}
} catch(IOException e) {
throw new IOException("Could not iterate " + this, e);
@@ -107,6 +113,7 @@ class StoreFileScanner implements KeyValueScanner {
public boolean seek(KeyValue key) throws IOException {
seekCount.incrementAndGet();
+
try {
try {
if(!seekAtOrAfter(hfs, key)) {
@@ -116,7 +123,8 @@ class StoreFileScanner implements KeyValueScanner {
this.isReseekable = true;
cur = hfs.getKeyValue();
- return true;
+
+ return skipKVsNewerThanReadpoint();
} finally {
realSeekDone = true;
}
@@ -127,6 +135,7 @@ class StoreFileScanner implements KeyValueScanner {
public boolean reseek(KeyValue key) throws IOException {
seekCount.incrementAndGet();
+
try {
try {
if (!reseekAtOrAfter(hfs, key)) {
@@ -134,7 +143,8 @@ class StoreFileScanner implements KeyValueScanner {
return false;
}
cur = hfs.getKeyValue();
- return true;
+
+ return skipKVsNewerThanReadpoint();
} finally {
realSeekDone = true;
}
@@ -143,6 +153,35 @@ class StoreFileScanner implements KeyValueScanner {
}
}
+ protected boolean skipKVsNewerThanReadpoint() throws IOException {
+ long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
+
+ // We want to ignore all key-values that are newer than our current
+ // readPoint
+ while(enforceRWCC
+ && cur != null
+ && (cur.getMemstoreTS() > readPoint)) {
+ hfs.next();
+ cur = hfs.getKeyValue();
+ }
+
+ if (cur == null) {
+ close();
+ return false;
+ }
+
+ // For the optimisation in HBASE-4346, we set the KV's memstoreTS to
+ // 0, if it is older than all the scanners' read points. It is possible
+ // that a newer KV's memstoreTS was reset to 0. But, there is an
+ // older KV which was not reset to 0 (because it was
+ // not old enough during flush). Make sure that we set it correctly now,
+ // so that the comparision order does not change.
+ if (cur.getMemstoreTS() <= readPoint) {
+ cur.setMemstoreTS(0);
+ }
+ return true;
+ }
+
public void close() {
// Nothing to close on HFileScanner?
cur = null;
@@ -288,5 +327,4 @@ class StoreFileScanner implements KeyValueScanner {
static final long getSeekCount() {
return seekCount.get();
}
-
}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index f5b5c4c..ad4e419 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -91,6 +91,7 @@ class StoreScanner extends NonLazyKeyValueScanner
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
columns, store.ttl, store.comparator.getRawComparator(),
store.minVersions, store.versionsToReturn(scan.getMaxVersions()),
+ Long.MAX_VALUE,
false);
// Pass columns to try to filter out unnecessary StoreFiles.
@@ -123,13 +124,16 @@ class StoreScanner extends NonLazyKeyValueScanner
* @param store who we scan
* @param scan the spec
* @param scanners ancilliary scanners
+ * @param smallestReadPoint the readPoint that we should use for tracking versions
+ * @param retainDeletesInOutput should we retain deletes after compaction?
*/
StoreScanner(Store store, Scan scan, List extends KeyValueScanner> scanners,
- boolean retainDeletesInOutput) throws IOException {
+ long smallestReadPoint, boolean retainDeletesInOutput) throws IOException {
this(store, false, scan, null);
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
null, store.ttl, store.comparator.getRawComparator(), store.minVersions,
- store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);
+ store.versionsToReturn(scan.getMaxVersions()),
+ smallestReadPoint, retainDeletesInOutput);
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
@@ -148,7 +152,8 @@ class StoreScanner extends NonLazyKeyValueScanner
throws IOException {
this(null, scan.getCacheBlocks(), scan, columns);
this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
- comparator.getRawComparator(), 0, scan.getMaxVersions(), false);
+ comparator.getRawComparator(), 0, scan.getMaxVersions(),
+ Long.MAX_VALUE, false);
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
@@ -161,18 +166,7 @@ class StoreScanner extends NonLazyKeyValueScanner
* @return List of scanners ordered properly.
*/
private List getScanners() throws IOException {
- // First the store file scanners
-
- // TODO this used to get the store files in descending order,
- // but now we get them in ascending order, which I think is
- // actually more correct, since memstore get put at the end.
- List sfScanners = StoreFileScanner
- .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet);
- List scanners =
- new ArrayList(sfScanners.size()+1);
- scanners.addAll(sfScanners);
- // Then the memstore scanners
- scanners.addAll(this.store.memstore.getScanners());
+ Listscanners = this.store.getScanners(cacheBlocks, isGet, false);
return scanners;
}
@@ -191,24 +185,27 @@ class StoreScanner extends NonLazyKeyValueScanner
memOnly = false;
filesOnly = false;
}
- List scanners = new LinkedList();
- // First the store file scanners
- if (memOnly == false) {
- List sfScanners = StoreFileScanner
- .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet);
-
- // include only those scan files which pass all filters
- for (StoreFileScanner sfs : sfScanners) {
- if (sfs.shouldSeek(scan, columns)) {
- scanners.add(sfs);
+ List allStoreScanners =
+ this.store.getScanners(cacheBlocks, isGet, false);
+
+ List scanners =
+ new ArrayList(allStoreScanners.size());
+
+ // include only those scan files which pass all filters
+ for (KeyValueScanner kvs : allStoreScanners) {
+ if (kvs instanceof StoreFileScanner) {
+ if (memOnly == false && ((StoreFileScanner)kvs).shouldSeek(scan, columns)) {
+ scanners.add(kvs);
+ }
+ }
+ else {
+ // kvs is a MemStoreScanner
+ if (filesOnly == false && this.store.memstore.shouldSeek(scan)) {
+ scanners.add(kvs);
}
}
}
-
- // Then the memstore scanners
- if ((filesOnly == false) && (this.store.memstore.shouldSeek(scan))) {
- scanners.addAll(this.store.memstore.getScanners());
- }
+
return scanners;
}
diff --git a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
index 4ac6e09..bb87e36 100644
--- a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
+++ b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
@@ -250,6 +250,12 @@ public class TestAcidGuarantees {
writers.add(writer);
ctx.addThread(writer);
}
+ // Add a flusher
+ ctx.addThread(new RepeatingTestThread(ctx) {
+ public void doAnAction() throws Exception {
+ util.flush();
+ }
+ });
List getters = Lists.newArrayList();
for (int i = 0; i < numGetters; i++) {
@@ -286,7 +292,6 @@ public class TestAcidGuarantees {
}
@Test
- @Ignore("Currently not passing - see HBASE-2856")
public void testGetAtomicity() throws Exception {
util.startMiniCluster(1);
try {
@@ -297,7 +302,6 @@ public class TestAcidGuarantees {
}
@Test
- @Ignore("Currently not passing - see HBASE-2670")
public void testScanAtomicity() throws Exception {
util.startMiniCluster(1);
try {
@@ -308,7 +312,6 @@ public class TestAcidGuarantees {
}
@Test
- @Ignore("Currently not passing - see HBASE-2670")
public void testMixedAtomicity() throws Exception {
util.startMiniCluster(1);
try {
@@ -322,7 +325,7 @@ public class TestAcidGuarantees {
Configuration c = HBaseConfiguration.create();
TestAcidGuarantees test = new TestAcidGuarantees();
test.setConf(c);
- test.runTestAtomicity(5*60*1000, 5, 2, 2, 3);
+ test.runTestAtomicity(5000, 50, 2, 2, 3);
}
private void setConf(Configuration c) {
diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index ef16382..9e3a14a 100644
--- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -192,9 +192,10 @@ public class TestCacheOnWrite {
}
LOG.info("Block count by type: " + blockCountByType);
+ String countByType = blockCountByType.toString();
assertEquals(
- "{DATA=1367, LEAF_INDEX=172, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
- blockCountByType.toString());
+ "{DATA=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
+ countByType);
reader.close();
}
diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
index 78a7cd6..ed020a3 100644
--- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
+++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
@@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -36,8 +38,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
import org.junit.Before;
import org.junit.Test;
@@ -115,10 +120,36 @@ public class TestHFileWriterV2 {
HFileBlock.FSReader blockReader =
new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize);
+ // Comparator class name is stored in the trailer in version 2.
+ RawComparator comparator = trailer.createComparator();
+ HFileBlockIndex.BlockIndexReader dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
+ trailer.getNumDataIndexLevels());
+ HFileBlockIndex.BlockIndexReader metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
+ Bytes.BYTES_RAWCOMPARATOR, 1);
+
+ HFileBlock.BlockIterator blockIter = blockReader.blockRange(
+ trailer.getLoadOnOpenDataOffset(),
+ fileSize - trailer.getTrailerSize());
+ // Data index. We also read statistics about the block index written after
+ // the root level.
+ dataBlockIndexReader.readMultiLevelIndexRoot(
+ blockIter.nextBlockAsStream(BlockType.ROOT_INDEX),
+ trailer.getDataIndexCount());
+
+ // Meta index.
+ metaBlockIndexReader.readRootIndex(
+ blockIter.nextBlockAsStream(BlockType.ROOT_INDEX),
+ trailer.getMetaIndexCount());
+ // File info
+ FileInfo fileInfo = new FileInfo();
+ fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO));
+ byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
+ boolean includeMemstoreTS = (keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0);
// Counters for the number of key/value pairs and the number of blocks
int entriesRead = 0;
int blocksRead = 0;
+ long memstoreTS = 0;
// Scan blocks the way the reader would scan them
fsdis.seek(0);
@@ -137,6 +168,15 @@ public class TestHFileWriterV2 {
byte[] value = new byte[valueLen];
buf.get(value);
+ if (includeMemstoreTS) {
+ ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(),
+ buf.arrayOffset() + buf.position(), buf.remaining());
+ DataInputStream data_input = new DataInputStream(byte_input);
+
+ memstoreTS = WritableUtils.readVLong(data_input);
+ buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS));
+ }
+
// A brute-force check to see that all keys and values are correct.
assertTrue(Bytes.compareTo(key, keys.get(entriesRead)) == 0);
assertTrue(Bytes.compareTo(value, values.get(entriesRead)) == 0);
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
index 09b66ed..a6bfa94 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
@@ -54,7 +54,7 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
long timestamp = 0;
//"Match"
for(byte [] col : scannerColumns){
- result.add(exp.checkColumn(col, 0, col.length, ++timestamp));
+ result.add(exp.checkColumn(col, 0, col.length, ++timestamp, false));
}
assertEquals(expected.size(), result.size());
@@ -80,9 +80,9 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
List expected = new ArrayList();
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col1
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); // col2
- expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col3
+ expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col3
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); // col4
- expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); // col5
+ expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); // col5
int maxVersions = 1;
//Create "Scanner"
@@ -166,13 +166,13 @@ public class TestExplicitColumnTracker extends HBaseTestCase {
Long.MAX_VALUE);
for (int i = 0; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
- explicit.checkColumn(col, 0, col.length, 1);
+ explicit.checkColumn(col, 0, col.length, 1, false);
}
explicit.update();
for (int i = 1; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
- explicit.checkColumn(col, 0, col.length, 1);
+ explicit.checkColumn(col, 0, col.length, 1, false);
}
}
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
index 696a0f0..1fce238 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
@@ -123,7 +123,7 @@ public class TestFSErrorsExposed {
StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(),
cacheConf, BloomType.NONE);
List scanners = StoreFileScanner.getScannersForStoreFiles(
- Collections.singletonList(sf), false, true);
+ Collections.singletonList(sf), false, true, false);
KeyValueScanner scanner = scanners.get(0);
FaultyInputStream inStream = fs.inStreams.get(0).get();
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
index 135a136..f3543a7 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
@@ -54,7 +54,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
for(byte [] qualifier : qualifiers) {
ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0,
- qualifier.length, 1);
+ qualifier.length, 1, false);
actual.add(mc);
}
@@ -87,7 +87,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
long timestamp = 0;
for(byte [] qualifier : qualifiers) {
MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length,
- ++timestamp);
+ ++timestamp, false);
actual.add(mc);
}
@@ -110,7 +110,7 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
try {
for(byte [] qualifier : qualifiers) {
- tracker.checkColumn(qualifier, 0, qualifier.length, 1);
+ tracker.checkColumn(qualifier, 0, qualifier.length, 1, false);
}
} catch (Exception e) {
ok = true;
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
index 44d2c9d..0a68029 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
@@ -773,7 +773,8 @@ public class TestStoreFile extends HBaseTestCase {
for (int i=numKVs;i>0;i--) {
KeyValue kv = new KeyValue(b, b, b, i, b);
kvs.add(kv);
- totalSize += kv.getLength();
+ // kv has memstoreTS 0, which takes 1 byte to store.
+ totalSize += kv.getLength() + 1;
}
int blockSize = totalSize / numBlocks;
StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize,