diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index c71c03a7f771293467eba62889b1e46fecdf81dc..9990bb3de7d3aac2eb5e53b559c5115126176160 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -127,12 +127,7 @@ public class Scan extends Query {
// scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName))
static public final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name";
- /**
- * @deprecated without replacement
- * This is now a no-op, SEEKs and SKIPs are optimizated automatically.
- * Will be removed in 2.0+
- */
- @Deprecated
+
public static final String HINT_LOOKAHEAD = "_look_ahead_";
/*
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
index f23a9a87aa99c6bb1aaeef685eb6dbb1114d3883..1320f2d6136e2271ace176b917ff19521d75fd7d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
@@ -127,4 +127,10 @@ public interface ColumnTracker {
* @return true to early out based on timestamp.
*/
boolean isDone(long timestamp);
+
+ /**
+ * Used by matcher to estimate the number of cells to skip cells when doing re-seek in scanners.
+ * @return the number of columns
+ */
+ long getAvgNumOfColumns();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 3d095cb030467ba2915b79a65e015873f7bf42f7..d2e74d07ffaf578e492bbbb68b0a8d9996e8ecac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -680,7 +680,15 @@ public class DefaultMemStore implements MemStore {
*/
@Override
public List getScanners(long readPt) {
- MemStoreScanner scanner = new MemStoreScanner(readPt);
+ return getScanners(readPt, null);
+ }
+
+ /**
+ * @return scanner on memstore and snapshot in this order.
+ */
+ @Override
+ public List getScanners(long readPt, ScanQueryMatcher matcher) {
+ MemStoreScanner scanner = new MemStoreScanner(readPt, matcher);
scanner.seek(CellUtil.createCell(HConstants.EMPTY_START_ROW));
if (scanner.peek() == null) {
scanner.close();
@@ -716,6 +724,7 @@ public class DefaultMemStore implements MemStore {
* This behaves as if it were a real scanner but does not maintain position.
*/
protected class MemStoreScanner extends NonLazyKeyValueScanner {
+ private ScanQueryMatcher scanQueryMatcher;
// Next row information for either cellSet or snapshot
private Cell cellSetNextRow = null;
private Cell snapshotNextRow = null;
@@ -723,7 +732,7 @@ public class DefaultMemStore implements MemStore {
// last iterated Cells for cellSet and snapshot (to restore iterator state after reseek)
private Cell cellSetItRow = null;
private Cell snapshotItRow = null;
-
+
// iterator based scanning.
private Iterator cellSetIt;
private Iterator| snapshotIt;
@@ -738,7 +747,7 @@ public class DefaultMemStore implements MemStore {
// The allocator and snapshot allocator at the time of creating this scanner
volatile MemStoreLAB allocatorAtCreation;
volatile MemStoreLAB snapshotAllocatorAtCreation;
-
+
// A flag represents whether could stop skipping Cells for MVCC
// if have encountered the next row. Only used for reversed scan
private boolean stopSkippingCellsIfNextRow = false;
@@ -785,6 +794,11 @@ public class DefaultMemStore implements MemStore {
}
}
+ MemStoreScanner(long readPoint, ScanQueryMatcher scanQueryMatcher) {
+ this(readPoint);
+ this.scanQueryMatcher = scanQueryMatcher;
+ }
+
/**
* Lock on 'this' must be held by caller.
* @param it
@@ -856,7 +870,6 @@ public class DefaultMemStore implements MemStore {
return (theNext != null);
}
-
/**
* Move forward on the sub-lists set previously by seek.
* @param key seek value (should be non-null)
@@ -865,7 +878,7 @@ public class DefaultMemStore implements MemStore {
@Override
public synchronized boolean reseek(Cell key) {
/*
- See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
+ See HBASE-4195 & HBASE-3855 & HBASE-6561 for the background on this implementation.
This code is executed concurrently with flush and puts, without locks.
Two points must be known when working on this code:
1) It's not possible to use the 'kvTail' and 'snapshot'
@@ -876,13 +889,26 @@ public class DefaultMemStore implements MemStore {
get it. So we remember the last keys we iterated to and restore
the reseeked set to at least that point.
*/
+
+ // try to skip cells first, if possible, because skip will be better choice when reseek point is close to current.
+ if (scanQueryMatcher != null) {
+ for (long i = 0; i < scanQueryMatcher.getLookAheadHint(); i++) {
+ next();
+ if (peek() == null) {
+ return false;
+ }
+ if (comparator.compare(peek(), key) >= 0) {
+ return true;
+ }
+ }
+ }
+
cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
return seekInSubLists(key);
}
-
@Override
public synchronized Cell peek() {
//DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
index 1877d16a200c02f44afa9c49765e46881225f638..15397d2a224cb192d3665b295edf2b284d171ce2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
@@ -252,4 +252,9 @@ public class ExplicitColumnTracker implements ColumnTracker {
public boolean isDone(long timestamp) {
return minVersions <= 0 && isExpired(timestamp);
}
+
+ @Override
+ public long getAvgNumOfColumns() {
+ return 0;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 5de73b36a53ef669db962dba4afdba9c5699fea7..cbd0ce65435ef813f0c9f8fb349504d02560192c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1110,7 +1110,7 @@ public class HStore implements Store {
try {
storeFilesToScan =
this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
- memStoreScanners = this.memstore.getScanners(readPt);
+ memStoreScanners = this.memstore.getScanners(readPt, matcher);
} finally {
this.lock.readLock().unlock();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 658ba488eacd321a3bffd1376aa83a6e7ace01f4..43d425a857d3d748804a6f7b85f3c9ee05eebcb9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -137,6 +137,12 @@ public interface MemStore extends HeapSize {
*/
List getScanners(long readPt);
+ /**
+ * @return scanner over the memstore. This might include scanner over the snapshot when one is
+ * present.
+ */
+ List getScanners(long readPt, ScanQueryMatcher matcher);
+
/**
* @return Total memory occupied by this MemStore.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
index d00b08784f6574824049d4c6e9948b0199aeff95..de122b3773125264cbba0a937582ad95780fd3e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
@@ -48,6 +48,8 @@ public class ScanInfo {
private long cellsPerTimeoutCheck;
private boolean parallelSeekEnabled;
private final Configuration conf;
+ private final long lookAheadCellsBeforeReseek;
+ private final ColumnStats columnStats;
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
@@ -98,13 +100,20 @@ public class ScanInfo {
perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
this.parallelSeekEnabled =
conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false);
+ this.lookAheadCellsBeforeReseek =
+ conf.getLong("hbase.hregion.memstore.scanner.lookahead.cells", 0);
this.conf = conf;
+ this.columnStats = new ColumnStats();
}
public Configuration getConfiguration() {
return this.conf;
}
+ long getLookAheadCellsBeforeReseek() {
+ return lookAheadCellsBeforeReseek;
+ }
+
long getTableMaxRowSize() {
return this.tableMaxRowSize;
}
@@ -148,4 +157,44 @@ public class ScanInfo {
public KVComparator getComparator() {
return comparator;
}
+
+ public ColumnStats getColumnStats() {
+ return columnStats;
+ }
+
+ public static class ColumnStats {
+ private long count;
+ private long min;
+ private long max;
+ private double mean;
+ private double sum;
+
+ public synchronized void update(long numOfColumns) {
+ if (count == 0) {
+ count = 1;
+ min = numOfColumns;
+ max = numOfColumns;
+ mean = numOfColumns;
+ sum = (double) numOfColumns;
+ } else {
+ count++;
+ sum += (double) numOfColumns;
+ mean = sum / count;
+ min = Math.min(min, numOfColumns);
+ max = Math.max(max, numOfColumns);
+ }
+ }
+
+ public long avg() {
+ return (long) mean;
+ }
+
+ public long max() {
+ return max;
+ }
+
+ public long min() {
+ return min;
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index a8e38271d408b2ec0702e981ed7fc50de980f8f0..5d64f904b0314af7dc6e69672a23bff1ede77b05 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult;
+import org.apache.hadoop.hbase.regionserver.ScanInfo.ColumnStats;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -45,6 +46,7 @@ import com.google.common.base.Preconditions;
*/
@InterfaceAudience.Private
public class ScanQueryMatcher {
+ private final int maxVersions;
// Optimization so we can skip lots of compares when we decide to skip
// to the next row.
private boolean stickyNextRow;
@@ -145,6 +147,10 @@ public class ScanQueryMatcher {
private final boolean isReversed;
+ private final long lookAheadBeforeSeek;
+
+ private long lookAheadHint;
+
/**
* Construct a QueryMatcher for a scan
* @param scan
@@ -192,7 +198,7 @@ public class ScanQueryMatcher {
this.seePastDeleteMarkers =
scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE && isUserScan;
- int maxVersions =
+ this.maxVersions =
scan.isRaw() ? scan.getMaxVersions() : Math.min(scan.getMaxVersions(),
scanInfo.getMaxVersions());
@@ -203,7 +209,9 @@ public class ScanQueryMatcher {
// use a specialized scan for wildcard column tracker.
this.columns = new ScanWildcardColumnTracker(
- scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS);
+ scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS,
+ // collect stats of columns, only when flushing or compacting
+ scanType != ScanType.USER_SCAN ? scanInfo.getColumnStats() : null);
} else {
// whether there is null column in the explicit column query
hasNullColumn = (columns.first().length == 0);
@@ -213,6 +221,9 @@ public class ScanQueryMatcher {
this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
oldestUnexpiredTS);
}
+
+ this.lookAheadBeforeSeek = scanInfo.getLookAheadCellsBeforeReseek();
+
this.isReversed = scan.isReversed();
}
@@ -280,7 +291,7 @@ public class ScanQueryMatcher {
* caused by a data corruption.
*/
public MatchCode match(Cell cell) throws IOException {
- if (filter != null && filter.filterAllRemaining()) {
+ if (filter != null && filter.filterAllRemaining()) {
return MatchCode.DONE_SCAN;
}
if (row != null) {
@@ -643,6 +654,15 @@ public class ScanQueryMatcher {
return matchCode;
}
+ public long getLookAheadHint() {
+ return lookAheadHint;
+ }
+
+ public void setLookAheadHint(boolean seekToNextRow) {
+ long hint = seekToNextRow ? columns.getAvgNumOfColumns() * maxVersions : maxVersions;
+ this.lookAheadHint = hint <= lookAheadBeforeSeek ? hint : 0;
+ }
+
/**
* {@link #match} return codes. These instruct the scanner moving through
* memstores and StoreFiles what to do with the current KeyValue.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
index 85b36fb5d06df134dcf865f00cd1d62023b87118..d94f9b69fb20e9dcdcaffd75697ba0181f8f212c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.ScanInfo.ColumnStats;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
@@ -32,10 +33,12 @@ import org.apache.hadoop.hbase.util.Bytes;
*/
@InterfaceAudience.Private
public class ScanWildcardColumnTracker implements ColumnTracker {
+ private final ColumnStats columnStats;
private byte [] columnBuffer = null;
private int columnOffset = 0;
private int columnLength = 0;
private int currentCount = 0;
+ private int numberOfColumns = 0;
private int maxVersions;
private int minVersions;
/* Keeps track of the latest timestamp and type included for current column.
@@ -45,18 +48,32 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
private long oldestStamp;
+ /**
+ * Return maxVersions of every row.
+ * @param minVersion Minimum number of versions to keep
+ * @param maxVersion Maximum number of versions to return
+ * @param oldestUnexpiredTS oldest timestamp that has not expired according
+ * to the TTL.
+ */
+ public ScanWildcardColumnTracker(int minVersion, int maxVersion,
+ long oldestUnexpiredTS) {
+ this(minVersion, maxVersion, oldestUnexpiredTS, null);
+ }
+
/**
* Return maxVersions of every row.
* @param minVersion Minimum number of versions to keep
* @param maxVersion Maximum number of versions to return
* @param oldestUnexpiredTS oldest timestamp that has not expired according
* to the TTL.
+ * @param columnStats Statistics of column qualifiers
*/
public ScanWildcardColumnTracker(int minVersion, int maxVersion,
- long oldestUnexpiredTS) {
+ long oldestUnexpiredTS, ColumnStats columnStats) {
this.maxVersions = maxVersion;
this.minVersions = minVersion;
this.oldestStamp = oldestUnexpiredTS;
+ this.columnStats = columnStats;
}
/**
@@ -122,6 +139,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
columnOffset = offset;
columnLength = length;
currentCount = 0;
+ numberOfColumns++;
}
/**
@@ -153,6 +171,10 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
@Override
public void reset() {
columnBuffer = null;
+ if (columnStats != null && numberOfColumns > 0) {
+ columnStats.update(numberOfColumns);
+ }
+ numberOfColumns = 0;
resetTSAndType();
}
@@ -202,4 +224,9 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
public boolean isDone(long timestamp) {
return minVersions <= 0 && isExpired(timestamp);
}
+
+ @Override
+ public long getAvgNumOfColumns() {
+ return columnStats != null ? columnStats.avg() : 0;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 30d634ea29484c6261144fc161df2082dd4db792..74ce3213f863ffa49f970c7e8340652a10c71090 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -434,6 +434,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.store.deleteChangedReaderObserver(this);
if (this.heap != null)
this.heap.close();
+ if (this.matcher != null)
+ this.matcher.reset();
this.heap = null; // CLOSED!
this.lastTop = null; // If both are null, we are closed.
} finally {
@@ -665,7 +667,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
case SEEK_NEXT_COL:
{
Cell nextIndexedKey = getNextIndexedKey();
- if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
+ if (nextIndexedKey == null) {
+ matcher.setLookAheadHint(false);
+ } else if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
&& matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
}
@@ -675,7 +679,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
case SEEK_NEXT_ROW:
{
Cell nextIndexedKey = getNextIndexedKey();
- if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
+ if (nextIndexedKey == null) {
+ matcher.setLookAheadHint(true);
+ } else if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
&& matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 0fa746e330f96360d28e3c0efba885c446debcc8..79ef23f5871b4e08878d4664d0095e27b740fa6d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -58,8 +58,7 @@ import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
/** memstore test case */
@Category(MediumTests.class)
@@ -792,6 +791,39 @@ public class TestDefaultMemStore extends TestCase {
assertTrue(!memstore.shouldSeek(scan, store, Long.MIN_VALUE));
}
+ public void testLookAheadBeforeSeek() throws IOException {
+ final long LOOK_AHEAD_CELLS = 4;
+
+ Configuration conf = HBaseConfiguration.create();
+ MemStore memstore = new DefaultMemStore(conf, KeyValue.COMPARATOR);
+
+ addRows(100, memstore);
+
+ ScanQueryMatcher matcher = mock(ScanQueryMatcher.class);
+ when(matcher.getLookAheadHint()).thenReturn(LOOK_AHEAD_CELLS);
+
+ KeyValueScanner scanner = spy(memstore.getScanners(0, matcher).get(0));
+ scanner.seek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1)));
+ scanner.reseek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1),
+ FAMILY, makeQualifier(1, 1)));
+ verify(scanner, times(1)).next();
+
+ scanner = spy(memstore.getScanners(0, matcher).get(0));
+ scanner.seek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1)));
+ Cell key = KeyValueUtil.createFirstOnRow(Bytes.toBytes(50));
+ scanner.reseek(key);
+ verify(scanner, times((int)LOOK_AHEAD_CELLS)).next();
+ assertTrue(CellUtil.matchingRow(scanner.next(), key));
+
+ when(matcher.getLookAheadHint()).thenReturn(0l);
+ scanner = spy(memstore.getScanners(0, matcher).get(0));
+
+ scanner.seek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1)));
+ scanner.reseek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1),
+ FAMILY, makeQualifier(1, 1)));
+ verify(scanner, never()).next();
+ }
+
////////////////////////////////////
//Test for upsert with MSLAB
////////////////////////////////////
| |