.../apache/hadoop/hbase/regionserver/HStore.java | 54 +++++++++++++------- .../apache/hadoop/hbase/regionserver/Store.java | 2 +- .../hadoop/hbase/regionserver/StoreScanner.java | 25 +++++++--- .../hadoop/hbase/regionserver/TestHRegion.java | 58 ++++++++++++++++++++++ .../hbase/regionserver/TestScanWithBloomError.java | 2 +- 5 files changed, 113 insertions(+), 28 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c4bd849..5235f72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -18,23 +18,14 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import java.io.IOException; import java.io.InterruptedIOException; import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.Set; @@ -62,9 +53,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.TagType; -import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.ConfigurationManager; @@ -99,6 +87,13 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + /** * A Store holds a column family in a Region. Its a memstore and a set of zero * or more StoreFiles, which stretch backwards over time. @@ -1086,12 +1081,14 @@ public class HStore implements Store { boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) throws IOException { Collection storeFilesToScan; - List memStoreScanners; + List memStoreScanners = null; this.lock.readLock().lock(); try { storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow); - memStoreScanners = this.memstore.getScanners(readPt); + if (readPt >= this.getMaxSequenceId()) { + memStoreScanners = this.memstore.getScanners(readPt); + } } finally { this.lock.readLock().unlock(); } @@ -1107,22 +1104,41 @@ public class HStore implements Store { new ArrayList(sfScanners.size()+1); scanners.addAll(sfScanners); // Then the memstore scanners - scanners.addAll(memStoreScanners); + if (memStoreScanners != null) { + scanners.addAll(memStoreScanners); + } return scanners; } @Override - public List getScanners(List files, boolean cacheBlocks, + public List getScanners(Collection files, boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, - byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException { + byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) + throws IOException { List memStoreScanners = null; - if (includeMemstoreScanner) { + if (files == null) { + // we need to create the store files on which we need to scan this.lock.readLock().lock(); try { - memStoreScanners = this.memstore.getScanners(readPt); + files = + this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow); + if (readPt >= this.getMaxSequenceId()) { + memStoreScanners = this.memstore.getScanners(readPt); + } } finally { this.lock.readLock().unlock(); } + } else { + if (includeMemstoreScanner) { + if (readPt >= this.getMaxSequenceId()) { + this.lock.readLock().lock(); + try { + memStoreScanners = this.memstore.getScanners(readPt); + } finally { + this.lock.readLock().unlock(); + } + } + } } List sfScanners = StoreFileScanner.getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 853a4cf..f207186 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -122,7 +122,7 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @param includeMemstoreScanner true if memstore has to be included * @return scanners on the given files and on the memstore if specified */ - List getScanners(List files, boolean cacheBlocks, boolean isGet, + List getScanners(Collection files, boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index e008a40..5a6c06c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.NavigableSet; @@ -132,7 +133,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Indicates whether there was flush during the course of the scan protected volatile boolean flushed = false; // generally we get one file from a flush - protected List flushedStoreFiles = new ArrayList(1); + protected Collection flushedStoreFiles = new ArrayList(1); // The current list of scanners protected List currentScanners = new ArrayList(); // flush update lock @@ -351,8 +352,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected List getScannersNoCompaction() throws IOException { final boolean isCompaction = false; boolean usePread = get || scanUsePread; - return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt)); + return selectScannersFrom(store.getScanners(null, cacheBlocks, get, usePread, + isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); } /** @@ -963,13 +964,23 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * Used in testing. * @return all scanners in no particular order */ + @VisibleForTesting List getAllScannersForTesting() { List allScanners = new ArrayList(); - KeyValueScanner current = heap.getCurrentForTesting(); - if (current != null) - allScanners.add(current); - for (KeyValueScanner scanner : heap.getHeap()) + for (KeyValueScanner scanner : currentScanners) { allScanners.add(scanner); + } + return allScanners; + } + + @VisibleForTesting + List getAllStoreFileScannersForTesting() { + List allScanners = new ArrayList(); + for (KeyValueScanner scanner : currentScanners) { + if (scanner.isFileScanner()) { + allScanners.add(scanner); + } + } return allScanners; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 2042f52..1e5041b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -5894,6 +5894,64 @@ public class TestHRegion { } @Test + public void testReverseScanShouldNotScanMemstoreIfReadPtlesser() throws Exception { + byte[] cf1 = Bytes.toBytes("CF1"); + byte[][] families = { cf1 }; + byte[] col = Bytes.toBytes("C"); + String method = this.getName(); + HBaseConfiguration conf = new HBaseConfiguration(); + this.region = initHRegion(tableName, method, conf, families); + try { + // setup with one storefile and one memstore, to create scanner and get an earlier readPt + Put put = new Put(Bytes.toBytes("19996")); + put.addColumn(cf1, col, Bytes.toBytes("val")); + region.put(put); + Put put2 = new Put(Bytes.toBytes("19995")); + put2.addColumn(cf1, col, Bytes.toBytes("val")); + region.put(put2); + // create a reverse scan + Scan scan = new Scan(Bytes.toBytes("19996")); + scan.setReversed(true); + RegionScanner scanner = region.getScanner(scan); + + // flush the cache. This will reset the store scanner + region.flushcache(true, true); + + // create one memstore contains many rows will be skipped + // to check MemStoreScanner.seekToPreviousRow + for (int i = 10000; i < 20000; i++) { + Put p = new Put(Bytes.toBytes("" + i)); + p.addColumn(cf1, col, Bytes.toBytes("" + i)); + region.put(p); + } + List currRow = new ArrayList<>(); + boolean hasNext; + boolean assertDone = false; + do { + hasNext = scanner.next(currRow); + // With HBASE-15871, after the scanner is reset the memstore scanner should not be + // added here + if (!assertDone) { + StoreScanner current = + (StoreScanner) (((RegionScannerImpl) scanner).storeHeap).getCurrentForTesting(); + List scanners = current.getAllScannersForTesting(); + assertEquals("There should be only one scanner the store file scanner", 1, + scanners.size()); + assertDone = true; + } + } while (hasNext); + assertEquals(2, currRow.size()); + assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(), + currRow.get(0).getRowOffset(), currRow.get(0).getRowLength())); + assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(), + currRow.get(1).getRowOffset(), currRow.get(1).getRowLength())); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + } + } + + @Test public void testSplitRegionWithReverseScan() throws IOException { TableName tableName = TableName.valueOf("testSplitRegionWithReverseScan"); byte [] qualifier = Bytes.toBytes("qualifier"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java index 027193f..0e5cbcd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java @@ -128,7 +128,7 @@ public class TestScanWithBloomError { (StoreScanner) storeHeap.getCurrentForTesting(); @SuppressWarnings({ "unchecked", "rawtypes" }) List scanners = (List) - (List) storeScanner.getAllScannersForTesting(); + (List) storeScanner.getAllStoreFileScannersForTesting(); // Sort scanners by their HFile's modification time. Collections.sort(scanners, new Comparator() {