diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 953e9111de..c7bf78d486 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -146,10 +146,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private volatile boolean flushed = false; // generally we get one file from a flush private final List flushedStoreFiles = new ArrayList<>(1); - // generally we get one memstore scanner from a flush - private final List memStoreScannersAfterFlush = new ArrayList<>(1); + // Since CompactingMemstore is now default, we get three memstore scanners from a flush + private final List memStoreScannersAfterFlush = new ArrayList<>(3); // The current list of scanners - private final List currentScanners = new ArrayList<>(); + @VisibleForTesting + final List currentScanners = new ArrayList<>(); // flush update lock private final ReentrantLock flushLock = new ReentrantLock(); @@ -876,9 +877,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Seek the new scanners to the last key seekScanners(scanners, lastTop, false, parallelSeekEnabled); // remove the older memstore scanner - for (int i = 0; i < currentScanners.size(); i++) { + for (int i = currentScanners.size() - 1; i >=0; i--) { if (!currentScanners.get(i).isFileScanner()) { currentScanners.remove(i).close(); + } else { + // we add the memstore scanner to the end of currentScanners break; } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 6ea8eaad8e..ff213b8fe9 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.lang.ref.SoftReference; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -44,9 +45,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.stream.IntStream; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -958,6 +957,76 @@ public class TestStore { verify(spiedStore, times(0)).replaceStoreFiles(null, null); } + private long countMemStoreScanner(StoreScanner scanner) { + if (scanner.currentScanners == null) { + return 0; + } + return scanner.currentScanners.stream() + .filter(s -> !s.isFileScanner()) + .count(); + } + + @Test + public void testNumberOfMemStoreScannersAfterFlush() throws IOException { + long seqId = 100; + long timestamp = System.currentTimeMillis(); + Cell cell0 = CellUtil.createCell(row, family, qf1, timestamp, + KeyValue.Type.Put.getCode(), qf1); + CellUtil.setSequenceId(cell0, seqId); + testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.EMPTY_LIST); + + Cell cell1 = CellUtil.createCell(row, family, qf2, timestamp, + KeyValue.Type.Put.getCode(), qf1); + CellUtil.setSequenceId(cell1, seqId); + testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); + + seqId = 101; + timestamp = System.currentTimeMillis(); + Cell cell2 = CellUtil.createCell(row2, family, qf2, timestamp, + KeyValue.Type.Put.getCode(), qf1); + CellUtil.setSequenceId(cell2, seqId); + testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); + } + + private void testNumberOfMemStoreScannersAfterFlush(List inputCellsBeforeSnapshot, + List inputCellsAfterSnapshot) throws IOException { + init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); + TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + long seqId = Long.MIN_VALUE; + for (Cell c : inputCellsBeforeSnapshot) { + quals.add(CellUtil.cloneQualifier(c)); + seqId = Math.max(seqId, c.getSequenceId()); + } + for (Cell c : inputCellsAfterSnapshot) { + quals.add(CellUtil.cloneQualifier(c)); + seqId = Math.max(seqId, c.getSequenceId()); + } + inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); + StoreFlushContext storeFlushCtx = store.createFlushContext(id++); + storeFlushCtx.prepare(); + inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); + int numberOfMemScannersWhenScaning = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; + try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { + // snaptshot + active (if it isn't empty) + assertEquals(numberOfMemScannersWhenScaning, countMemStoreScanner(s)); + storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + boolean more; + int cellCount = 0; + do { + List cells = new ArrayList<>(); + more = s.next(cells); + cellCount += cells.size(); + assertEquals(more ? numberOfMemScannersWhenScaning : 0, countMemStoreScanner(s)); + } while (more); + assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() + + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), + inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); + // the current scanners is cleared + assertEquals(0, countMemStoreScanner(s)); + } + } + private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException { Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value); CellUtil.setSequenceId(c, sequenceId);