diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index c71c03a7f771293467eba62889b1e46fecdf81dc..9990bb3de7d3aac2eb5e53b559c5115126176160 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -127,12 +127,7 @@ public class Scan extends Query { // scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)) static public final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name"; - /** - * @deprecated without replacement - * This is now a no-op, SEEKs and SKIPs are optimizated automatically. - * Will be removed in 2.0+ - */ - @Deprecated + public static final String HINT_LOOKAHEAD = "_look_ahead_"; /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java index f23a9a87aa99c6bb1aaeef685eb6dbb1114d3883..1320f2d6136e2271ace176b917ff19521d75fd7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java @@ -127,4 +127,10 @@ public interface ColumnTracker { * @return true to early out based on timestamp. */ boolean isDone(long timestamp); + + /** + * Used by matcher to estimate the number of cells to skip cells when doing re-seek in scanners. + * @return the number of columns + */ + long getAvgNumOfColumns(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 3d095cb030467ba2915b79a65e015873f7bf42f7..d2e74d07ffaf578e492bbbb68b0a8d9996e8ecac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -680,7 +680,15 @@ public class DefaultMemStore implements MemStore { */ @Override public List getScanners(long readPt) { - MemStoreScanner scanner = new MemStoreScanner(readPt); + return getScanners(readPt, null); + } + + /** + * @return scanner on memstore and snapshot in this order. + */ + @Override + public List getScanners(long readPt, ScanQueryMatcher matcher) { + MemStoreScanner scanner = new MemStoreScanner(readPt, matcher); scanner.seek(CellUtil.createCell(HConstants.EMPTY_START_ROW)); if (scanner.peek() == null) { scanner.close(); @@ -716,6 +724,7 @@ public class DefaultMemStore implements MemStore { * This behaves as if it were a real scanner but does not maintain position. */ protected class MemStoreScanner extends NonLazyKeyValueScanner { + private ScanQueryMatcher scanQueryMatcher; // Next row information for either cellSet or snapshot private Cell cellSetNextRow = null; private Cell snapshotNextRow = null; @@ -723,7 +732,7 @@ public class DefaultMemStore implements MemStore { // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek) private Cell cellSetItRow = null; private Cell snapshotItRow = null; - + // iterator based scanning. private Iterator cellSetIt; private Iterator snapshotIt; @@ -738,7 +747,7 @@ public class DefaultMemStore implements MemStore { // The allocator and snapshot allocator at the time of creating this scanner volatile MemStoreLAB allocatorAtCreation; volatile MemStoreLAB snapshotAllocatorAtCreation; - + // A flag represents whether could stop skipping Cells for MVCC // if have encountered the next row. Only used for reversed scan private boolean stopSkippingCellsIfNextRow = false; @@ -785,6 +794,11 @@ public class DefaultMemStore implements MemStore { } } + MemStoreScanner(long readPoint, ScanQueryMatcher scanQueryMatcher) { + this(readPoint); + this.scanQueryMatcher = scanQueryMatcher; + } + /** * Lock on 'this' must be held by caller. * @param it @@ -856,7 +870,6 @@ public class DefaultMemStore implements MemStore { return (theNext != null); } - /** * Move forward on the sub-lists set previously by seek. * @param key seek value (should be non-null) @@ -865,7 +878,7 @@ public class DefaultMemStore implements MemStore { @Override public synchronized boolean reseek(Cell key) { /* - See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. + See HBASE-4195 & HBASE-3855 & HBASE-6561 for the background on this implementation. This code is executed concurrently with flush and puts, without locks. Two points must be known when working on this code: 1) It's not possible to use the 'kvTail' and 'snapshot' @@ -876,13 +889,26 @@ public class DefaultMemStore implements MemStore { get it. So we remember the last keys we iterated to and restore the reseeked set to at least that point. */ + + // try to skip cells first, if possible, because skip will be better choice when reseek point is close to current. + if (scanQueryMatcher != null) { + for (long i = 0; i < scanQueryMatcher.getLookAheadHint(); i++) { + next(); + if (peek() == null) { + return false; + } + if (comparator.compare(peek(), key) >= 0) { + return true; + } + } + } + cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator(); snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator(); return seekInSubLists(key); } - @Override public synchronized Cell peek() { //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java index 1877d16a200c02f44afa9c49765e46881225f638..15397d2a224cb192d3665b295edf2b284d171ce2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java @@ -252,4 +252,9 @@ public class ExplicitColumnTracker implements ColumnTracker { public boolean isDone(long timestamp) { return minVersions <= 0 && isExpired(timestamp); } + + @Override + public long getAvgNumOfColumns() { + return 0; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 5de73b36a53ef669db962dba4afdba9c5699fea7..cbd0ce65435ef813f0c9f8fb349504d02560192c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1110,7 +1110,7 @@ public class HStore implements Store { try { storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow); - memStoreScanners = this.memstore.getScanners(readPt); + memStoreScanners = this.memstore.getScanners(readPt, matcher); } finally { this.lock.readLock().unlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 658ba488eacd321a3bffd1376aa83a6e7ace01f4..43d425a857d3d748804a6f7b85f3c9ee05eebcb9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -137,6 +137,12 @@ public interface MemStore extends HeapSize { */ List getScanners(long readPt); + /** + * @return scanner over the memstore. This might include scanner over the snapshot when one is + * present. + */ + List getScanners(long readPt, ScanQueryMatcher matcher); + /** * @return Total memory occupied by this MemStore. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java index d00b08784f6574824049d4c6e9948b0199aeff95..de122b3773125264cbba0a937582ad95780fd3e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java @@ -48,6 +48,8 @@ public class ScanInfo { private long cellsPerTimeoutCheck; private boolean parallelSeekEnabled; private final Configuration conf; + private final long lookAheadCellsBeforeReseek; + private final ColumnStats columnStats; public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT) @@ -98,13 +100,20 @@ public class ScanInfo { perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK; this.parallelSeekEnabled = conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false); + this.lookAheadCellsBeforeReseek = + conf.getLong("hbase.hregion.memstore.scanner.lookahead.cells", 0); this.conf = conf; + this.columnStats = new ColumnStats(); } public Configuration getConfiguration() { return this.conf; } + long getLookAheadCellsBeforeReseek() { + return lookAheadCellsBeforeReseek; + } + long getTableMaxRowSize() { return this.tableMaxRowSize; } @@ -148,4 +157,44 @@ public class ScanInfo { public KVComparator getComparator() { return comparator; } + + public ColumnStats getColumnStats() { + return columnStats; + } + + public static class ColumnStats { + private long count; + private long min; + private long max; + private double mean; + private double sum; + + public synchronized void update(long numOfColumns) { + if (count == 0) { + count = 1; + min = numOfColumns; + max = numOfColumns; + mean = numOfColumns; + sum = (double) numOfColumns; + } else { + count++; + sum += (double) numOfColumns; + mean = sum / count; + min = Math.min(min, numOfColumns); + max = Math.max(max, numOfColumns); + } + } + + public long avg() { + return (long) mean; + } + + public long max() { + return max; + } + + public long min() { + return min; + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index a8e38271d408b2ec0702e981ed7fc50de980f8f0..5d64f904b0314af7dc6e69672a23bff1ede77b05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter.ReturnCode; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult; +import org.apache.hadoop.hbase.regionserver.ScanInfo.ColumnStats; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -45,6 +46,7 @@ import com.google.common.base.Preconditions; */ @InterfaceAudience.Private public class ScanQueryMatcher { + private final int maxVersions; // Optimization so we can skip lots of compares when we decide to skip // to the next row. private boolean stickyNextRow; @@ -145,6 +147,10 @@ public class ScanQueryMatcher { private final boolean isReversed; + private final long lookAheadBeforeSeek; + + private long lookAheadHint; + /** * Construct a QueryMatcher for a scan * @param scan @@ -192,7 +198,7 @@ public class ScanQueryMatcher { this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE && isUserScan; - int maxVersions = + this.maxVersions = scan.isRaw() ? scan.getMaxVersions() : Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions()); @@ -203,7 +209,9 @@ public class ScanQueryMatcher { // use a specialized scan for wildcard column tracker. this.columns = new ScanWildcardColumnTracker( - scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); + scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS, + // collect stats of columns, only when flushing or compacting + scanType != ScanType.USER_SCAN ? scanInfo.getColumnStats() : null); } else { // whether there is null column in the explicit column query hasNullColumn = (columns.first().length == 0); @@ -213,6 +221,9 @@ public class ScanQueryMatcher { this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); } + + this.lookAheadBeforeSeek = scanInfo.getLookAheadCellsBeforeReseek(); + this.isReversed = scan.isReversed(); } @@ -280,7 +291,7 @@ public class ScanQueryMatcher { * caused by a data corruption. */ public MatchCode match(Cell cell) throws IOException { - if (filter != null && filter.filterAllRemaining()) { + if (filter != null && filter.filterAllRemaining()) { return MatchCode.DONE_SCAN; } if (row != null) { @@ -643,6 +654,15 @@ public class ScanQueryMatcher { return matchCode; } + public long getLookAheadHint() { + return lookAheadHint; + } + + public void setLookAheadHint(boolean seekToNextRow) { + long hint = seekToNextRow ? columns.getAvgNumOfColumns() * maxVersions : maxVersions; + this.lookAheadHint = hint <= lookAheadBeforeSeek ? hint : 0; + } + /** * {@link #match} return codes. These instruct the scanner moving through * memstores and StoreFiles what to do with the current KeyValue. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java index 85b36fb5d06df134dcf865f00cd1d62023b87118..d94f9b69fb20e9dcdcaffd75697ba0181f8f212c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.ScanInfo.ColumnStats; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; @@ -32,10 +33,12 @@ import org.apache.hadoop.hbase.util.Bytes; */ @InterfaceAudience.Private public class ScanWildcardColumnTracker implements ColumnTracker { + private final ColumnStats columnStats; private byte [] columnBuffer = null; private int columnOffset = 0; private int columnLength = 0; private int currentCount = 0; + private int numberOfColumns = 0; private int maxVersions; private int minVersions; /* Keeps track of the latest timestamp and type included for current column. @@ -45,18 +48,32 @@ public class ScanWildcardColumnTracker implements ColumnTracker { private long oldestStamp; + /** + * Return maxVersions of every row. + * @param minVersion Minimum number of versions to keep + * @param maxVersion Maximum number of versions to return + * @param oldestUnexpiredTS oldest timestamp that has not expired according + * to the TTL. + */ + public ScanWildcardColumnTracker(int minVersion, int maxVersion, + long oldestUnexpiredTS) { + this(minVersion, maxVersion, oldestUnexpiredTS, null); + } + /** * Return maxVersions of every row. * @param minVersion Minimum number of versions to keep * @param maxVersion Maximum number of versions to return * @param oldestUnexpiredTS oldest timestamp that has not expired according * to the TTL. + * @param columnStats Statistics of column qualifiers */ public ScanWildcardColumnTracker(int minVersion, int maxVersion, - long oldestUnexpiredTS) { + long oldestUnexpiredTS, ColumnStats columnStats) { this.maxVersions = maxVersion; this.minVersions = minVersion; this.oldestStamp = oldestUnexpiredTS; + this.columnStats = columnStats; } /** @@ -122,6 +139,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker { columnOffset = offset; columnLength = length; currentCount = 0; + numberOfColumns++; } /** @@ -153,6 +171,10 @@ public class ScanWildcardColumnTracker implements ColumnTracker { @Override public void reset() { columnBuffer = null; + if (columnStats != null && numberOfColumns > 0) { + columnStats.update(numberOfColumns); + } + numberOfColumns = 0; resetTSAndType(); } @@ -202,4 +224,9 @@ public class ScanWildcardColumnTracker implements ColumnTracker { public boolean isDone(long timestamp) { return minVersions <= 0 && isExpired(timestamp); } + + @Override + public long getAvgNumOfColumns() { + return columnStats != null ? columnStats.avg() : 0; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 30d634ea29484c6261144fc161df2082dd4db792..74ce3213f863ffa49f970c7e8340652a10c71090 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -434,6 +434,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.store.deleteChangedReaderObserver(this); if (this.heap != null) this.heap.close(); + if (this.matcher != null) + this.matcher.reset(); this.heap = null; // CLOSED! this.lastTop = null; // If both are null, we are closed. } finally { @@ -665,7 +667,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner case SEEK_NEXT_COL: { Cell nextIndexedKey = getNextIndexedKey(); - if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY + if (nextIndexedKey == null) { + matcher.setLookAheadHint(false); + } else if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) { return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE; } @@ -675,7 +679,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner case SEEK_NEXT_ROW: { Cell nextIndexedKey = getNextIndexedKey(); - if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY + if (nextIndexedKey == null) { + matcher.setLookAheadHint(true); + } else if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) { return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 0fa746e330f96360d28e3c0efba885c446debcc8..79ef23f5871b4e08878d4664d0095e27b740fa6d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -58,8 +58,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; /** memstore test case */ @Category(MediumTests.class) @@ -792,6 +791,39 @@ public class TestDefaultMemStore extends TestCase { assertTrue(!memstore.shouldSeek(scan, store, Long.MIN_VALUE)); } + public void testLookAheadBeforeSeek() throws IOException { + final long LOOK_AHEAD_CELLS = 4; + + Configuration conf = HBaseConfiguration.create(); + MemStore memstore = new DefaultMemStore(conf, KeyValue.COMPARATOR); + + addRows(100, memstore); + + ScanQueryMatcher matcher = mock(ScanQueryMatcher.class); + when(matcher.getLookAheadHint()).thenReturn(LOOK_AHEAD_CELLS); + + KeyValueScanner scanner = spy(memstore.getScanners(0, matcher).get(0)); + scanner.seek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1))); + scanner.reseek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1), + FAMILY, makeQualifier(1, 1))); + verify(scanner, times(1)).next(); + + scanner = spy(memstore.getScanners(0, matcher).get(0)); + scanner.seek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1))); + Cell key = KeyValueUtil.createFirstOnRow(Bytes.toBytes(50)); + scanner.reseek(key); + verify(scanner, times((int)LOOK_AHEAD_CELLS)).next(); + assertTrue(CellUtil.matchingRow(scanner.next(), key)); + + when(matcher.getLookAheadHint()).thenReturn(0l); + scanner = spy(memstore.getScanners(0, matcher).get(0)); + + scanner.seek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1))); + scanner.reseek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1), + FAMILY, makeQualifier(1, 1))); + verify(scanner, never()).next(); + } + //////////////////////////////////// //Test for upsert with MSLAB ////////////////////////////////////