diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java index 0bc75e7..eb4196e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.Store.ScannerTicket; /** @@ -30,9 +30,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public interface ChangedReadersObserver { + + /** + * @return the read point of the current scan + */ + long getReadPoint(); /** * Notify observers. + * @param ticket The information of scanner * @throws IOException e */ - void updateReaders(List sfs) throws IOException; + void updateReaders(ScannerTicket ticket) throws IOException; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 4032a19..969cc84 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1152,7 +1152,14 @@ public class HStore implements Store { */ private void notifyChangedReadersObservers(List sfs) throws IOException { for (ChangedReadersObserver o : this.changedReaderObservers) { - o.updateReaders(sfs); + List memStoreScanners; + this.lock.readLock().lock(); + try { + memStoreScanners = this.memstore.getScanners(o.getReadPoint()); + } finally { + this.lock.readLock().unlock(); + } + o.updateReaders(new ScannerTicketImpl(sfs, memStoreScanners)); } } @@ -1221,6 +1228,22 @@ public class HStore implements Store { } @Override + public List getScanners(ScannerTicket ticket, boolean cacheBlocks, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException { + if (!(ticket instanceof ScannerTicketImpl)) { + throw new IOException("Unsupported ticket:" + ticket.getClass().getName()); + } + ScannerTicketImpl ticketImpl = (ScannerTicketImpl) ticket; + List memStoreScanners = ticketImpl.getMemStoreScanners(); + List sfScanners = StoreFileScanner.getScannersForStoreFiles(ticketImpl.getStoreFiles(), + cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore()); + List scanners = new ArrayList<>(sfScanners.size() + memStoreScanners.size()); + scanners.addAll(sfScanners); + scanners.addAll(memStoreScanners); + return scanners; + } + + @Override public void addChangedReaderObserver(ChangedReadersObserver o) { this.changedReaderObservers.add(o); } @@ -2785,4 +2808,44 @@ public class HStore implements Store { lock.writeLock().unlock(); } } + + private static class ScannerTicketImpl implements ScannerTicket { + private final List files; + private final List memStoreScanners; + ScannerTicketImpl(final List files, final List memStoreScanners) { + this.files = new ArrayList<>(files); + this.memStoreScanners = new ArrayList<>(memStoreScanners); + } + + @Override + public void merge(ScannerTicket newOne) throws IOException { + if (newOne instanceof ScannerTicketImpl) { + this.files.addAll(((ScannerTicketImpl) newOne).getStoreFiles()); + closeMemStoreScanners(); + this.memStoreScanners.addAll(((ScannerTicketImpl) newOne).getMemStoreScanners()); + return; + } + throw new IOException("Unsupported ticket:" + newOne.getClass().getName()); + } + + private List getStoreFiles() { + return files; + } + + private List getMemStoreScanners() { + return memStoreScanners; + } + + private void closeMemStoreScanners() { + for (KeyValueScanner s : memStoreScanners) { + s.close(); + } + memStoreScanners.clear(); + } + + @Override + public void close() { + closeMemStoreScanners(); + } + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 9d5d3b6..5a56ada 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.List; @@ -122,9 +123,23 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @param includeMemstoreScanner true if memstore has to be included * @return scanners on the given files and on the memstore if specified */ - List getScanners(List files, boolean cacheBlocks, boolean isGet, - boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, - byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException; + List getScanners(List files, boolean cacheBlocks, boolean isGet, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, + byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException; + + /** + * Create scanners on the given files and if needed on the memstore with no filtering based on TTL + * (that happens further down the line). + * @param ticket the information of files and memstore on which the scanners has to be created + * @param cacheBlocks cache the blocks or not + * @param usePread true to use pread, false if not + * @param isCompaction true if the scanner is created for compaction + * @param matcher the scan query matcher + * @param readPt the read point of the current scan + * @return scanners on the given files and on the memstore if specified + */ + List getScanners(ScannerTicket ticket, boolean cacheBlocks, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException; ScanInfo getScanInfo(); @@ -560,4 +575,17 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * Closes and archives the compacted files under this store */ void closeAndArchiveCompactedFiles() throws IOException; + + /** + * Record the information of the new scanners. + */ + interface ScannerTicket extends Closeable { + /** + * Merge two ticket. + * @param newOne the another ticket + */ + void merge(ScannerTicket newOne) throws IOException; + @Override + void close(); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 28d9ef2..9024318 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; +import org.apache.hadoop.hbase.regionserver.Store.ScannerTicket; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; @@ -128,7 +129,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Indicates whether there was flush during the course of the scan private volatile boolean flushed = false; // generally we get one file from a flush - private List flushedStoreFiles = new ArrayList(1); + private ScannerTicket ticket = null; // The current list of scanners private List currentScanners = new ArrayList(); // flush update lock @@ -453,6 +454,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner public void close() { if (this.closing) return; this.closing = true; + if (ticket != null) { + ticket.close(); + ticket = null; + } // Under test, we dont have a this.store if (this.store != null) this.store.deleteChangedReaderObserver(this); @@ -755,13 +760,25 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return qcode; } + @Override + public long getReadPoint() { + return readPt; + } + // Implementation of ChangedReadersObserver @Override - public void updateReaders(List sfs) throws IOException { + public void updateReaders(ScannerTicket ticket) throws IOException { + if (ticket == null) { + return; + } flushed = true; flushLock.lock(); try { - flushedStoreFiles.addAll(sfs); + if (this.ticket == null) { + this.ticket = ticket; + } else { + this.ticket.merge(ticket); + } } finally { flushLock.unlock(); } @@ -823,10 +840,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List scanners = null; try { flushLock.lock(); - scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); - // Clear the current set of flushed store files so that they don't get added again - flushedStoreFiles.clear(); + scanners = selectScannersFrom(store.getScanners(ticket, cacheBlocks, usePread, + isCompaction, matcher, this.readPt)); + ticket = null; } finally { flushLock.unlock(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java index c3e0ec4..5b32c30 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java @@ -334,7 +334,7 @@ public class TestAcidGuarantees implements Tool { public void testGetAtomicity() throws Exception { util.startMiniCluster(1); try { - runTestAtomicity(20000, 5, 5, 0, 3); + runTestAtomicity(100000, 5, 5, 0, 3); } finally { util.shutdownMiniCluster(); } @@ -344,7 +344,7 @@ public class TestAcidGuarantees implements Tool { public void testScanAtomicity() throws Exception { util.startMiniCluster(1); try { - runTestAtomicity(20000, 5, 0, 5, 3); + runTestAtomicity(100000, 5, 0, 5, 3); } finally { util.shutdownMiniCluster(); } @@ -354,7 +354,7 @@ public class TestAcidGuarantees implements Tool { public void testMixedAtomicity() throws Exception { util.startMiniCluster(1); try { - runTestAtomicity(20000, 5, 2, 2, 3); + runTestAtomicity(100000, 5, 2, 2, 3); } finally { util.shutdownMiniCluster(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 30ffe0b..161ede7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -452,9 +452,9 @@ public class TestStoreScanner extends TestCase { // normally cause an NPE because scan.store is null. So as long as we get through these // two calls we are good and the bug was quashed. - scan.updateReaders(new ArrayList()); + scan.updateReaders(null); - scan.updateReaders(new ArrayList()); + scan.updateReaders(null); scan.peek(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java index 7e86632..331869d 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java @@ -125,7 +125,7 @@ public class TestWideScanner extends HBaseTestCase { ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator(); while (scanners.hasNext()) { StoreScanner ss = (StoreScanner)scanners.next(); - ss.updateReaders(new ArrayList()); + ss.updateReaders(null); } } while (more);