diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java index 0bc75e7d07..6c5c323ad7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.Store.ScannerTicket; /** @@ -30,9 +30,17 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public interface ChangedReadersObserver { + + /** + * @return the read point of the current scan + */ + long getReadPoint(); + /** * Notify observers. + * + * @param ticket The information of scanner * @throws IOException e */ - void updateReaders(List sfs) throws IOException; + void updateReaders(ScannerTicket ticket) throws IOException; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index b244997cd7..725434050a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -317,13 +317,14 @@ public class CompactingMemStore extends AbstractMemStore { * Scanners are ordered from 0 (oldest) to newest in increasing order. */ public List getScanners(long readPt) throws IOException { + MutableSegment activeTmp = active; List pipelineList = pipeline.getSegments(); List snapshotList = snapshot.getAllSegments(); long order = 1 + pipelineList.size() + snapshotList.size(); // The list of elements in pipeline + the active element + the snapshot segment // The order is the Segment ordinal List list = new ArrayList((int) order); - order = addToScanners(active, readPt, order, list); + order = addToScanners(activeTmp, readPt, order, list); order = addToScanners(pipelineList, readPt, order, list); addToScanners(snapshotList, readPt, order, list); return list; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index cbdaa1b822..4d002044fe 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1149,7 +1149,14 @@ public class HStore implements Store { */ private void notifyChangedReadersObservers(List sfs) throws IOException { for (ChangedReadersObserver o : this.changedReaderObservers) { - o.updateReaders(sfs); + List memStoreScanners; + this.lock.readLock().lock(); + try { + memStoreScanners = this.memstore.getScanners(o.getReadPoint()); + } finally { + this.lock.readLock().unlock(); + } + o.updateReaders(new ScannerTicketImpl(sfs, memStoreScanners)); } } @@ -1212,6 +1219,22 @@ public class HStore implements Store { } @Override + public List getScanners(ScannerTicket ticket, boolean cacheBlocks, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException { + if (!(ticket instanceof ScannerTicketImpl)) { + throw new IOException("Unsupported ticket:" + ticket.getClass().getName()); + } + ScannerTicketImpl ticketImpl = (ScannerTicketImpl) ticket; + List memStoreScanners = ticketImpl.getMemStoreScanners(); + List sfScanners = StoreFileScanner.getScannersForStoreFiles(ticketImpl.getStoreFiles(), + cacheBlocks, usePread, isCompaction, false, matcher, readPt); + List scanners = new ArrayList<>(sfScanners.size() + memStoreScanners.size()); + scanners.addAll(sfScanners); + scanners.addAll(memStoreScanners); + return scanners; + } + + @Override public void addChangedReaderObserver(ChangedReadersObserver o) { this.changedReaderObservers.add(o); } @@ -2527,4 +2550,46 @@ public class HStore implements Store { lock.writeLock().unlock(); } } + + private static class ScannerTicketImpl implements ScannerTicket { + + private final List files; + private final List memStoreScanners; + + ScannerTicketImpl(final List files, final List memStoreScanners) { + this.files = new ArrayList<>(files); + this.memStoreScanners = new ArrayList<>(memStoreScanners); + } + + @Override + public void merge(ScannerTicket newOne) throws IOException { + if (newOne instanceof ScannerTicketImpl) { + this.files.addAll(((ScannerTicketImpl) newOne).getStoreFiles()); + closeMemStoreScanners(); + this.memStoreScanners.addAll(((ScannerTicketImpl) newOne).getMemStoreScanners()); + return; + } + throw new IOException("Unsupported ticket:" + newOne.getClass().getName()); + } + + private List getStoreFiles() { + return files; + } + + private List getMemStoreScanners() { + return memStoreScanners; + } + + private void closeMemStoreScanners() { + for (KeyValueScanner s : memStoreScanners) { + s.close(); + } + memStoreScanners.clear(); + } + + @Override + public void close() { + closeMemStoreScanners(); + } + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 76595f3622..9543229199 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.List; @@ -158,6 +159,21 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf byte[] stopRow, boolean includeStopRow, long readPt, boolean includeMemstoreScanner) throws IOException; + /** + * Create scanners on the given files and if needed on the memstore with no + * filtering based on TTL (that happens further down the line). + * + * @param ticket the information of files and memstore on which the scanners has to be created + * @param cacheBlocks cache the blocks or not + * @param usePread true to use pread, false if not + * @param isCompaction true if the scanner is created for compaction + * @param matcher the scan query matcher + * @param readPt the read point of the current scan + * @return scanners on the given files and on the memstore if specified + */ + List getScanners(ScannerTicket ticket, boolean cacheBlocks, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException; + ScanInfo getScanInfo(); /** @@ -544,4 +560,20 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @return true if the memstore may need some extra memory space */ boolean isSloppyMemstore(); + + /** + * Record the information of the new scanners. + */ + interface ScannerTicket extends Closeable { + + /** + * Merge two ticket. + * + * @param newOne the another ticket + */ + void merge(ScannerTicket newOne) throws IOException; + + @Override + void close(); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index e42979efe3..1bff8fcaaf 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -45,12 +45,14 @@ import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; +import org.apache.hadoop.hbase.regionserver.Store.ScannerTicket; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** @@ -130,8 +132,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private final boolean scanUsePread; // Indicates whether there was flush during the course of the scan protected volatile boolean flushed = false; - // generally we get one file from a flush - protected List flushedStoreFiles = new ArrayList<>(1); + // generally we get the information of scanners from a flush + protected ScannerTicket ticket = null; // The current list of scanners protected List currentScanners = new ArrayList<>(); // flush update lock @@ -477,6 +479,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Under test, we dont have a this.store if (this.store != null) this.store.deleteChangedReaderObserver(this); if (withHeapClose) { + if (ticket != null) { + ticket.close(); + ticket = null; + } for (KeyValueHeap h : this.heapsForDelayedClose) { h.close(); } @@ -779,13 +785,25 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return qcode; } + @Override + public long getReadPoint() { + return this.readPt; + } + // Implementation of ChangedReadersObserver @Override - public void updateReaders(List sfs) throws IOException { + public void updateReaders(ScannerTicket ticket) throws IOException { + if (ticket == null) { + return; + } flushed = true; flushLock.lock(); try { - flushedStoreFiles.addAll(sfs); + if (this.ticket == null) { + this.ticket = ticket; + } else { + this.ticket.merge(ticket); + } } finally { flushLock.unlock(); } @@ -821,10 +839,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List scanners = null; try { flushLock.lock(); - scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, - scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); - // Clear the current set of flushed store files so that they don't get added again - flushedStoreFiles.clear(); + scanners = selectScannersFrom(store.getScanners(ticket, cacheBlocks, + scanUsePread, false, matcher, this.readPt)); + ticket = null; } finally { flushLock.unlock(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index ccbf06786e..5a9eddeb12 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -720,9 +720,9 @@ public class TestStoreScanner { // normally cause an NPE because scan.store is null. So as long as we get through these // two calls we are good and the bug was quashed. - scan.updateReaders(new ArrayList<>()); + scan.updateReaders(null); - scan.updateReaders(new ArrayList<>()); + scan.updateReaders(null); scan.peek(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java index cdf84d2990..70b36fcf48 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java @@ -130,7 +130,7 @@ public class TestWideScanner extends HBaseTestCase { ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator(); while (scanners.hasNext()) { StoreScanner ss = (StoreScanner)scanners.next(); - ss.updateReaders(new ArrayList<>()); + ss.updateReaders(null); } } while (more);