diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index c71c03a7f771293467eba62889b1e46fecdf81dc..9990bb3de7d3aac2eb5e53b559c5115126176160 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -127,12 +127,7 @@ public class Scan extends Query { // scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)) static public final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name"; - /** - * @deprecated without replacement - * This is now a no-op, SEEKs and SKIPs are optimizated automatically. - * Will be removed in 2.0+ - */ - @Deprecated + public static final String HINT_LOOKAHEAD = "_look_ahead_"; /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 3d095cb030467ba2915b79a65e015873f7bf42f7..95386cd4b08409f9d0167612c45be5f099036c19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -680,7 +680,15 @@ public class DefaultMemStore implements MemStore { */ @Override public List getScanners(long readPt) { - MemStoreScanner scanner = new MemStoreScanner(readPt); + return getScanners(readPt, null); + } + + /** + * @return scanner on memstore and snapshot in this order. + */ + @Override + public List getScanners(long readPt, ScanQueryMatcher matcher) { + MemStoreScanner scanner = new MemStoreScanner(readPt, matcher); scanner.seek(CellUtil.createCell(HConstants.EMPTY_START_ROW)); if (scanner.peek() == null) { scanner.close(); @@ -745,6 +753,9 @@ public class DefaultMemStore implements MemStore { private long readPoint; + // The maximum number of rows to skip instead of seek + private int lookAheadRows; + /* Some notes... @@ -785,6 +796,17 @@ public class DefaultMemStore implements MemStore { } } + MemStoreScanner(long readPoint, ScanQueryMatcher scanQueryMatcher) { + this(readPoint); + + if (scanQueryMatcher != null) { + int lookAheadRows = conf.getInt("hbase.hregion.memstore.scanner.lookahead.rows", 0); + if (scanQueryMatcher.getLookAheadBeforeReseek() <= lookAheadRows) { + this.lookAheadRows = scanQueryMatcher.getLookAheadBeforeReseek(); + } + } + } + /** * Lock on 'this' must be held by caller. * @param it @@ -856,7 +878,6 @@ public class DefaultMemStore implements MemStore { return (theNext != null); } - /** * Move forward on the sub-lists set previously by seek. * @param key seek value (should be non-null) @@ -865,7 +886,7 @@ public class DefaultMemStore implements MemStore { @Override public synchronized boolean reseek(Cell key) { /* - See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. + See HBASE-4195 & HBASE-3855 & HBASE-6561 for the background on this implementation. This code is executed concurrently with flush and puts, without locks. Two points must be known when working on this code: 1) It's not possible to use the 'kvTail' and 'snapshot' @@ -876,6 +897,19 @@ public class DefaultMemStore implements MemStore { get it. So we remember the last keys we iterated to and restore the reseeked set to at least that point. */ + + if (lookAheadRows > 0 && peek() != null) { + for (int i = lookAheadRows; i > 0; --i) { + next(); + if (peek() == null) { + return false; + } + if (comparator.compare(peek(), key) >= 0) { + return true; + } + } + } + cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator(); snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 5de73b36a53ef669db962dba4afdba9c5699fea7..cbd0ce65435ef813f0c9f8fb349504d02560192c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1110,7 +1110,7 @@ public class HStore implements Store { try { storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow); - memStoreScanners = this.memstore.getScanners(readPt); + memStoreScanners = this.memstore.getScanners(readPt, matcher); } finally { this.lock.readLock().unlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 658ba488eacd321a3bffd1376aa83a6e7ace01f4..43d425a857d3d748804a6f7b85f3c9ee05eebcb9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -137,6 +137,12 @@ public interface MemStore extends HeapSize { */ List getScanners(long readPt); + /** + * @return scanner over the memstore. This might include scanner over the snapshot when one is + * present. + */ + List getScanners(long readPt, ScanQueryMatcher matcher); + /** * @return Total memory occupied by this MemStore. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index a8e38271d408b2ec0702e981ed7fc50de980f8f0..730f9eb5a6b3fad3466561826088ec993f6899b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -145,6 +145,8 @@ public class ScanQueryMatcher { private final boolean isReversed; + private final int lookAheadBeforeSeek; + /** * Construct a QueryMatcher for a scan * @param scan @@ -213,6 +215,10 @@ public class ScanQueryMatcher { this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); } + + byte[] attr = scan.getAttribute(Scan.HINT_LOOKAHEAD); + this.lookAheadBeforeSeek = Math.min(attr == null ? 0 : Bytes.toInt(attr), scanInfo.getMaxVersions()); + this.isReversed = scan.isReversed(); } @@ -643,6 +649,10 @@ public class ScanQueryMatcher { return matchCode; } + public int getLookAheadBeforeReseek() { + return lookAheadBeforeSeek; + } + /** * {@link #match} return codes. These instruct the scanner moving through * memstores and StoreFiles what to do with the current KeyValue. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLookAheadBeforeReseek.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLookAheadBeforeReseek.java new file mode 100644 index 0000000000000000000000000000000000000000..423cf48f68471368fa06f55c4ef592958b322440 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLookAheadBeforeReseek.java @@ -0,0 +1,110 @@ + +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.*; +import org.junit.experimental.categories.Category; + + +@Category(MediumTests.class) +public class TestLookAheadBeforeReseek { + private static final Log LOG = LogFactory.getLog(TestLookAheadBeforeReseek.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.scanner.lookahead.rows", 5); + TEST_UTIL.startMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + @Test + public void testLookAheadBeforeSeek() throws Exception { + byte [] TABLE = Bytes.toBytes("testLookAheadBeforeSeek"); + byte [] FAMILY = Bytes.toBytes("f"); + byte [][] FAMILIES = new byte[][] { FAMILY }; + + Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE); + String row = "test_row"; + for (int i = 0; i < 500; i++) { + Put p = new Put(Bytes.toBytes(row)); + p.setDurability(Durability.SKIP_WAL); + + for (int j = 0; j < 1000; j++) { + long id = i * 1000 + j; + p.addColumn(FAMILY, Bytes.toBytes(id), id, Bytes.toBytes(id)); + } + + ht.put(p); + } + + long start = System.currentTimeMillis(); + ht.get(new Get(Bytes.toBytes(row)).setTimeRange(499_999, 500_000)); + long finish = System.currentTimeMillis(); + + LOG.info("elapsed time with timeRange: " + (finish - start)); + + start = System.currentTimeMillis(); + Result result = ht.get(new Get(Bytes.toBytes(row)) + .setTimeRange(499_999, 500_000) + .setAttribute(Scan.HINT_LOOKAHEAD, Bytes.toBytes(4))); + finish = System.currentTimeMillis(); + + result.advance(); + Assert.assertEquals(CellUtil.createCell(Bytes.toBytes(row), FAMILY, + Bytes.toBytes(499_999l), 499_999l, KeyValue.Type.Put.getCode(), + Bytes.toBytes(499_999l)), result.current()); + + LOG.info("elapsed time with timeRange and hint to look ahead: " + (finish - start)); + + ht.close(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 0fa746e330f96360d28e3c0efba885c446debcc8..5a702de6dc60df18e1542b957017d89e9ee6d90f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -58,8 +58,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; /** memstore test case */ @Category(MediumTests.class) @@ -792,6 +791,40 @@ public class TestDefaultMemStore extends TestCase { assertTrue(!memstore.shouldSeek(scan, store, Long.MIN_VALUE)); } + public void testLookAheadBeforeSeek() throws IOException { + final int LOOK_AHEAD_CELLS = 4; + + Configuration conf = HBaseConfiguration.create(); + conf.setInt("hbase.hregion.memstore.scanner.lookahead.rows", 5); + MemStore memstore = new DefaultMemStore(conf, KeyValue.COMPARATOR); + + addRows(100, memstore); + + ScanQueryMatcher matcher = mock(ScanQueryMatcher.class); + when(matcher.getLookAheadBeforeReseek()).thenReturn(LOOK_AHEAD_CELLS); + + KeyValueScanner scanner = spy(memstore.getScanners(0, matcher).get(0)); + scanner.seek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1))); + scanner.reseek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1), + FAMILY, makeQualifier(1, 1))); + verify(scanner, times(1)).next(); + + scanner = spy(memstore.getScanners(0, matcher).get(0)); + scanner.seek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1))); + Cell key = KeyValueUtil.createFirstOnRow(Bytes.toBytes(50)); + scanner.reseek(key); + verify(scanner, times(LOOK_AHEAD_CELLS)).next(); + assertTrue(CellUtil.matchingRow(scanner.next(), key)); + + conf.setInt("hbase.hregion.memstore.scanner.lookahead.rows", 0); + scanner = spy(memstore.getScanners(0, matcher).get(0)); + + scanner.seek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1))); + scanner.reseek(KeyValueUtil.createFirstOnRow(Bytes.toBytes(1), + FAMILY, makeQualifier(1, 1))); + verify(scanner, never()).next(); + } + //////////////////////////////////// //Test for upsert with MSLAB ////////////////////////////////////