.../hbase/regionserver/CompactingMemStore.java | 27 ++++++++-- .../hadoop/hbase/regionserver/DefaultMemStore.java | 21 ++++++-- .../apache/hadoop/hbase/regionserver/HStore.java | 4 +- .../hbase/regionserver/TestDefaultMemStore.java | 5 +- .../hadoop/hbase/regionserver/TestHRegion.java | 58 ++++++++++++++++++++++ 5 files changed, 103 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 177f222..75aeaaa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -239,13 +239,32 @@ public class CompactingMemStore extends AbstractMemStore { // The list of elements in pipeline + the active element + the snapshot segment // TODO : This will change when the snapshot is made of more than one element List list = new ArrayList(pipelineList.size() + 2); - list.add(this.active.getScanner(readPt, order + 1)); + SegmentScanner activeScanner = this.active.getScanner(readPt, order + 1); + if (activeScanner.peek() == null) { + activeScanner.close(); + } else { + list.add(activeScanner); + } for (Segment item : pipelineList) { - list.add(item.getScanner(readPt, order)); + SegmentScanner itemScanner = item.getScanner(readPt, order); + if (itemScanner.peek() == null) { + itemScanner.close(); + } else { + list.add(itemScanner); + } order--; } - list.add(this.snapshot.getScanner(readPt, order)); - return Collections. singletonList(new MemStoreScanner(getComparator(), list)); + SegmentScanner snapshotScanner = this.snapshot.getScanner(readPt, order); + if (snapshotScanner.peek() == null) { + snapshotScanner.close(); + } else { + list.add(snapshotScanner); + } + if (!list.isEmpty()) { + return Collections + . singletonList(new MemStoreScanner(getComparator(), list)); + } + return null; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index b448b04..fccac85 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -115,10 +115,23 @@ public class DefaultMemStore extends AbstractMemStore { */ public List getScanners(long readPt) throws IOException { List list = new ArrayList(2); - list.add(this.active.getScanner(readPt, 1)); - list.add(this.snapshot.getScanner(readPt, 0)); - return Collections. singletonList( - new MemStoreScanner(getComparator(), list)); + SegmentScanner activeScanner = this.active.getScanner(readPt, 1); + if (activeScanner.peek() == null) { + activeScanner.close(); + } else { + list.add(activeScanner); + } + SegmentScanner segmentScanner = this.snapshot.getScanner(readPt, 0); + if (segmentScanner.peek() == null) { + segmentScanner.close(); + } else { + list.add(segmentScanner); + } + if (!list.isEmpty()) { + return Collections + . singletonList(new MemStoreScanner(getComparator(), list)); + } + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c4bd849..8da9d1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1107,7 +1107,9 @@ public class HStore implements Store { new ArrayList(sfScanners.size()+1); scanners.addAll(sfScanners); // Then the memstore scanners - scanners.addAll(memStoreScanners); + if (memStoreScanners != null) { + scanners.addAll(memStoreScanners); + } return scanners; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index fdc6c92..10c8886 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -303,12 +303,11 @@ public class TestDefaultMemStore { kv1.setSequenceId(w.getWriteNumber()); memstore.add(kv1); - KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); - assertScannerResults(s, new KeyValue[]{}); + assertTrue(this.memstore.getScanners(mvcc.getReadPoint()) == null); mvcc.completeAndWait(w); - s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); + KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1}); w = mvcc.begin(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 2042f52..1e5041b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -5894,6 +5894,64 @@ public class TestHRegion { } @Test + public void testReverseScanShouldNotScanMemstoreIfReadPtlesser() throws Exception { + byte[] cf1 = Bytes.toBytes("CF1"); + byte[][] families = { cf1 }; + byte[] col = Bytes.toBytes("C"); + String method = this.getName(); + HBaseConfiguration conf = new HBaseConfiguration(); + this.region = initHRegion(tableName, method, conf, families); + try { + // setup with one storefile and one memstore, to create scanner and get an earlier readPt + Put put = new Put(Bytes.toBytes("19996")); + put.addColumn(cf1, col, Bytes.toBytes("val")); + region.put(put); + Put put2 = new Put(Bytes.toBytes("19995")); + put2.addColumn(cf1, col, Bytes.toBytes("val")); + region.put(put2); + // create a reverse scan + Scan scan = new Scan(Bytes.toBytes("19996")); + scan.setReversed(true); + RegionScanner scanner = region.getScanner(scan); + + // flush the cache. This will reset the store scanner + region.flushcache(true, true); + + // create one memstore contains many rows will be skipped + // to check MemStoreScanner.seekToPreviousRow + for (int i = 10000; i < 20000; i++) { + Put p = new Put(Bytes.toBytes("" + i)); + p.addColumn(cf1, col, Bytes.toBytes("" + i)); + region.put(p); + } + List currRow = new ArrayList<>(); + boolean hasNext; + boolean assertDone = false; + do { + hasNext = scanner.next(currRow); + // With HBASE-15871, after the scanner is reset the memstore scanner should not be + // added here + if (!assertDone) { + StoreScanner current = + (StoreScanner) (((RegionScannerImpl) scanner).storeHeap).getCurrentForTesting(); + List scanners = current.getAllScannersForTesting(); + assertEquals("There should be only one scanner the store file scanner", 1, + scanners.size()); + assertDone = true; + } + } while (hasNext); + assertEquals(2, currRow.size()); + assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(), + currRow.get(0).getRowOffset(), currRow.get(0).getRowLength())); + assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(), + currRow.get(1).getRowOffset(), currRow.get(1).getRowLength())); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + } + } + + @Test public void testSplitRegionWithReverseScan() throws IOException { TableName tableName = TableName.valueOf("testSplitRegionWithReverseScan"); byte [] qualifier = Bytes.toBytes("qualifier");