Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java (revision 1532155) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java (working copy) @@ -287,7 +287,7 @@ newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); - return new StoreScanner(store, scanInfo, scan, targetCols); + return new StoreScanner(store, scanInfo, scan, targetCols, store.getSmallestReadPoint()); } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1532155) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -2772,10 +2772,9 @@ */ public static List getFromStoreFile(HStore store, Get get) throws IOException { - MultiVersionConsistencyControl.resetThreadReadPoint(); Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, - scan.getFamilyMap().get(store.getFamily().getName())); + scan.getFamilyMap().get(store.getFamily().getName()), 0); List result = new ArrayList(); scanner.next(result); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (revision 1532155) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (working copy) @@ -74,6 +74,7 @@ public KeyValueScanner preStoreScannerOpen(final ObserverContext c, Store store, final Scan scan, final NavigableSet targetCols, KeyValueScanner s) throws IOException { - return new StoreScanner(store, store.getScanInfo(), scan, targetCols); + return new StoreScanner(store, store.getScanInfo(), scan, targetCols, + store.getSmallestReadPoint()); } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (revision 1532155) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (working copy) @@ -337,7 +337,7 @@ List scanners = StoreFileScanner.getScannersForStoreFiles(inputStoreFiles, false, - false); + false, 0); HColumnDescriptor columnDescriptor = new HColumnDescriptor( HFileReadWriteTest.class.getSimpleName()); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1532155) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy) @@ -90,10 +90,9 @@ */ public void testScanAcrossSnapshot() throws IOException { int rowCount = addRows(this.memstore); - List memstorescanners = this.memstore.getScanners(); + List memstorescanners = this.memstore.getScanners(0); Scan scan = new Scan(); List result = new ArrayList(); - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false, 0, this.memstore.comparator); ScanType scanType = ScanType.USER_SCAN; @@ -115,8 +114,7 @@ scanner.close(); } - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - memstorescanners = this.memstore.getScanners(); + memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint()); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); count = 0; @@ -141,7 +139,7 @@ for (KeyValueScanner scanner : memstorescanners) { scanner.close(); } - memstorescanners = this.memstore.getScanners(); + memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint()); // Assert that new values are seen in kvset as we scan. long ts = System.currentTimeMillis(); s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); @@ -206,8 +204,7 @@ private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - List memstorescanners = this.memstore.getScanners(); + List memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint()); assertEquals(1, memstorescanners.size()); final KeyValueScanner scanner = memstorescanners.get(0); scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW)); @@ -248,14 +245,12 @@ kv1.setMvccVersion(w.getWriteNumber()); memstore.add(kv1); - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - KeyValueScanner s = this.memstore.getScanners().get(0); + KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{}); mvcc.completeMemstoreInsert(w); - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1}); w = mvcc.beginMemstoreInsert(); @@ -263,14 +258,12 @@ kv2.setMvccVersion(w.getWriteNumber()); memstore.add(kv2); - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1}); mvcc.completeMemstoreInsert(w); - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1, kv2}); } @@ -302,8 +295,7 @@ mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - KeyValueScanner s = this.memstore.getScanners().get(0); + KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 @@ -317,8 +309,7 @@ memstore.add(kv22); // BEFORE COMPLETING INSERT 2, SEE FIRST KVS - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE INSERT 2 @@ -327,8 +318,7 @@ // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // See HBASE-1485 for discussion about what we should do with // the duplicate-TS inserts - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); } @@ -357,8 +347,7 @@ mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - KeyValueScanner s = this.memstore.getScanners().get(0); + KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns @@ -369,16 +358,14 @@ memstore.add(kvDel); // BEFORE COMPLETING DELETE, SEE FIRST KVS - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE DELETE mvcc.completeMemstoreInsert(w); // NOW WE SHOULD SEE DELETE - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); } @@ -430,9 +417,7 @@ mvcc.completeMemstoreInsert(w); // Assert that we can read back - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - - KeyValueScanner s = this.memstore.getScanners().get(0); + KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); s.seek(kv); KeyValue ret = s.next(); @@ -507,7 +492,6 @@ * @throws InterruptedException */ public void testGetNextRow() throws Exception { - MultiVersionConsistencyControl.resetThreadReadPoint(); addRows(this.memstore); // Add more versions to make it a little more interesting. Thread.sleep(1); @@ -532,7 +516,7 @@ ScanType scanType = ScanType.USER_SCAN; InternalScanner scanner = new StoreScanner(new Scan( Bytes.toBytes(startRowId)), scanInfo, scanType, null, - memstore.getScanners()); + memstore.getScanners(0)); List results = new ArrayList(); for (int i = 0; scanner.next(results); i++) { int rowId = startRowId + i; @@ -1036,7 +1020,7 @@ static void doScan(MemStore ms, int iteration) throws IOException { long nanos = System.nanoTime(); - KeyValueScanner s = ms.getScanners().get(0); + KeyValueScanner s = ms.getScanners(0).get(0); s.seek(KeyValue.createFirstOnRow(new byte[]{})); System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000); @@ -1055,17 +1039,11 @@ addRows(25000, ms); System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000); - System.out.println("foo"); - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - for (int i = 0 ; i < 50 ; i++) doScan(ms, i); } - - - } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java (revision 1532155) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java (working copy) @@ -164,7 +164,7 @@ assertEquals(2, memstore.kvset.size()); // opening scanner before clear the snapshot - List scanners = memstore.getScanners(); + List scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data memstore.clearSnapshot(snapshot); @@ -187,7 +187,7 @@ memstore.add(new KeyValue(row, fam, qf6, val)); memstore.add(new KeyValue(row, fam, qf7, val)); // opening scanners - scanners = memstore.getScanners(); + scanners = memstore.getScanners(0); // close scanners before clear the snapshot for (KeyValueScanner scanner : scanners) { scanner.close(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java (revision 1532155) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java (working copy) @@ -144,7 +144,7 @@ cacheConf, BloomType.NONE, NoOpDataBlockEncoder.INSTANCE); List scanners = StoreFileScanner.getScannersForStoreFiles( - Collections.singletonList(sf), false, true, false); + Collections.singletonList(sf), false, true, false, 0); KeyValueScanner scanner = scanners.get(0); FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1532155) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -1861,12 +1861,10 @@ scan.addFamily(fam2); scan.addFamily(fam4); is = (RegionScannerImpl) region.getScanner(scan); - MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(1, ((RegionScannerImpl) is).storeHeap.getHeap().size()); scan = new Scan(); is = (RegionScannerImpl) region.getScanner(scan); - MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(families.length - 1, ((RegionScannerImpl) is).storeHeap.getHeap().size()); } finally { HRegion.closeHRegion(this.region); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java (revision 1532155) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java (working copy) @@ -99,7 +99,8 @@ Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s) throws IOException { scan.setFilter(new NoDataFilter()); - return new StoreScanner(store, store.getScanInfo(), scan, targetCols); + return new StoreScanner(store, store.getScanInfo(), scan, targetCols, + store.getSmallestReadPoint()); } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1532155) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -880,14 +880,14 @@ @Override public List getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, - byte[] stopRow) throws IOException { + byte[] stopRow, long readPt) throws IOException { Collection storeFilesToScan; List memStoreScanners; this.lock.readLock().lock(); try { storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow); - memStoreScanners = this.memstore.getScanners(); + memStoreScanners = this.memstore.getScanners(readPt); } finally { this.lock.readLock().unlock(); } @@ -898,7 +898,8 @@ // but now we get them in ascending order, which I think is // actually more correct, since memstore get put at the end. List sfScanners = StoreFileScanner - .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher); + .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher, + readPt); List scanners = new ArrayList(sfScanners.size()+1); scanners.addAll(sfScanners); @@ -1648,7 +1649,7 @@ @Override public KeyValueScanner getScanner(Scan scan, - final NavigableSet targetCols) throws IOException { + final NavigableSet targetCols, long readPt) throws IOException { lock.readLock().lock(); try { KeyValueScanner scanner = null; @@ -1656,7 +1657,7 @@ scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); } if (scanner == null) { - scanner = new StoreScanner(this, getScanInfo(), scan, targetCols); + scanner = new StoreScanner(this, getScanInfo(), scan, targetCols, readPt); } return scanner; } finally { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1532155) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -59,7 +59,7 @@ private KeyValueScanner current = null; private KVScannerComparator comparator; - + /** * Constructor. This KeyValueHeap will handle closing of passed in * KeyValueScanners. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ParallelSeekHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ParallelSeekHandler.java (revision 1532155) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ParallelSeekHandler.java (working copy) @@ -54,7 +54,6 @@ @Override public void process() { try { - MultiVersionConsistencyControl.setThreadReadPoint(readPoint); scanner.seek(keyValue); } catch (IOException e) { LOG.error("", e); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1532155) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -95,10 +95,13 @@ // A flag whether use pread for scan private boolean scanUsePread = false; + + private long readPt = 0; /** An internal constructor. */ protected StoreScanner(Store store, boolean cacheBlocks, Scan scan, - final NavigableSet columns, long ttl, int minVersions) { + final NavigableSet columns, long ttl, int minVersions, long readPt) { + this.readPt = readPt; this.store = store; this.cacheBlocks = cacheBlocks; isGet = scan.isGetScan(); @@ -137,10 +140,11 @@ * @param columns which columns we are scanning * @throws IOException */ - public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet columns) + public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet columns, + long readPt) throws IOException { this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), - scanInfo.getMinVersions()); + scanInfo.getMinVersions(), readPt); if (columns != null && scan.isRaw()) { throw new DoNotRetryIOException( "Cannot specify any column for a raw scan"); @@ -220,7 +224,7 @@ List scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { this(store, false, scan, null, scanInfo.getTtl(), - scanInfo.getMinVersions()); + scanInfo.getMinVersions(), smallestReadPoint); if (dropDeletesFromRow == null) { matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint, earliestPutTs, oldestUnexpiredTS); @@ -250,16 +254,23 @@ ScanType scanType, final NavigableSet columns, final List scanners) throws IOException { this(scan, scanInfo, scanType, columns, scanners, - HConstants.LATEST_TIMESTAMP); + HConstants.LATEST_TIMESTAMP, 0); } // Constructor for testing. StoreScanner(final Scan scan, ScanInfo scanInfo, + ScanType scanType, final NavigableSet columns, + final List scanners, long earliestPutTs) + throws IOException { + this(scan, scanInfo, scanType, columns, scanners, earliestPutTs, 0); + } + + private StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, final NavigableSet columns, - final List scanners, long earliestPutTs) + final List scanners, long earliestPutTs, long readPt) throws IOException { this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), - scanInfo.getMinVersions()); + scanInfo.getMinVersions(), readPt); this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); @@ -282,7 +293,7 @@ final boolean isCompaction = false; boolean usePread = isGet || scanUsePread; return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow())); + isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt)); } /** @@ -626,7 +637,7 @@ for (KeyValueScanner scanner : scanners) { if (scanner instanceof StoreFileScanner) { ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, - MultiVersionConsistencyControl.getThreadReadPoint(), latch); + this.readPt, latch); executor.submit(seekHandler); handlers.add(seekHandler); } else { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (revision 1532155) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (working copy) @@ -58,12 +58,16 @@ private static AtomicLong seekCount; private ScanQueryMatcher matcher; + + private long readPt; /** * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param hfs HFile scanner */ - public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, boolean hasMVCC) { + public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, + boolean hasMVCC, long readPt) { + this.readPt = readPt; this.reader = reader; this.hfs = hfs; this.enforceMVCC = useMVCC; @@ -77,9 +81,9 @@ public static List getScannersForStoreFiles( Collection files, boolean cacheBlocks, - boolean usePread) throws IOException { + boolean usePread, long readPt) throws IOException { return getScannersForStoreFiles(files, cacheBlocks, - usePread, false); + usePread, false, readPt); } /** @@ -87,9 +91,9 @@ */ public static List getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction) throws IOException { + boolean isCompaction, long readPt) throws IOException { return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, - null); + null, readPt); } /** @@ -99,13 +103,13 @@ */ public static List getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, ScanQueryMatcher matcher) throws IOException { + boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException { List scanners = new ArrayList( files.size()); for (StoreFile file : files) { StoreFile.Reader r = file.createReader(); StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, - isCompaction); + isCompaction, readPt); scanner.setScanQueryMatcher(matcher); scanners.add(scanner); } @@ -180,13 +184,11 @@ } protected boolean skipKVsNewerThanReadpoint() throws IOException { - long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); - // We want to ignore all key-values that are newer than our current // readPoint while(enforceMVCC && cur != null - && (cur.getMvccVersion() > readPoint)) { + && (cur.getMvccVersion() > readPt)) { hfs.next(); cur = hfs.getKeyValue(); } @@ -202,7 +204,7 @@ // older KV which was not reset to 0 (because it was // not old enough during flush). Make sure that we set it correctly now, // so that the comparision order does not change. - if (cur.getMvccVersion() <= readPoint) { + if (cur.getMvccVersion() <= readPt) { cur.setMvccVersion(0); } return true; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1532155) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3395,9 +3395,8 @@ if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { // This scan can read even uncommitted transactions this.readPt = Long.MAX_VALUE; - MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); } else { - this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); + this.readPt = mvcc.memstoreReadPoint(); } scannerReadPoints.put(this, this.readPt); } @@ -3413,7 +3412,7 @@ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); + KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); @@ -3471,7 +3470,7 @@ try { // This could be a new thread from the last time we called next(). - MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); + // MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); return nextRaw(outResults, limit); } finally { @@ -3741,7 +3740,7 @@ startRegionOperation(); try { // This could be a new thread from the last time we called next(). - MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); + // MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); KeyValue kv = KeyValue.createFirstOnRow(row); // use request seek to make use of the lazy seek option. See HBASE-5520 result = this.storeHeap.requestSeek(kv, true, true); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1532155) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -75,7 +75,7 @@ * @return a scanner over the current key values * @throws IOException on failure */ - KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols) + KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols, long readPt) throws IOException; /** @@ -88,6 +88,7 @@ * @param matcher * @param startRow * @param stopRow + * @param readPt * @return all scanners for this store */ List getScanners( @@ -97,7 +98,8 @@ boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, - byte[] stopRow + byte[] stopRow, + long readPt ) throws IOException; ScanInfo getScanInfo(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1532155) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -1054,7 +1054,7 @@ */ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) { - return getStoreFileScanner(cacheBlocks, pread, false); + return getStoreFileScanner(cacheBlocks, pread, false, 0); } /** @@ -1067,10 +1067,10 @@ */ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread, - boolean isCompaction) { + boolean isCompaction, long readPt) { return new StoreFileScanner(this, - getScanner(cacheBlocks, pread, - isCompaction), !isCompaction, reader.hasMVCCInfo()); + getScanner(cacheBlocks, pread, isCompaction), + !isCompaction, reader.hasMVCCInfo(), readPt); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (revision 1532155) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (working copy) @@ -179,12 +179,12 @@ protected List createFileScanners( final Collection filesToCompact) throws IOException { - return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true); + return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true, + store.getSmallestReadPoint()); } protected long setSmallestReadPoint() { long smallestReadPoint = store.getSmallestReadPoint(); - MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); return smallestReadPoint; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1532155) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -3066,7 +3066,6 @@ maxResultSize = maxScannerResultSize; } List values = new ArrayList(); - MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint()); region.startRegionOperation(Operation.SCAN); try { int i = 0; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java (revision 1532155) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java (working copy) @@ -24,9 +24,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.Log; - /** * Manages the read/write consistency within memstore. This provides * an interface for readers to determine what entries to ignore, and @@ -44,15 +41,6 @@ private final LinkedList writeQueue = new LinkedList(); - private static final ThreadLocal perThreadReadPoint = - new ThreadLocal() { - @Override - protected - Long initialValue() { - return Long.MAX_VALUE; - } - }; - /** * Default constructor. Initializes the memstoreRead/Write points to 0. */ @@ -75,40 +63,6 @@ } /** - * Get this thread's read point. Used primarily by the memstore scanner to - * know which values to skip (ie: have not been completed/committed to - * memstore). - */ - public static long getThreadReadPoint() { - return perThreadReadPoint.get(); - } - - /** - * Set the thread read point to the given value. The thread MVCC - * is used by the Memstore scanner so it knows which values to skip. - * Give it a value of 0 if you want everything. - */ - public static void setThreadReadPoint(long readPoint) { - perThreadReadPoint.set(readPoint); - } - - /** - * Set the thread MVCC read point to whatever the current read point is in - * this particular instance of MVCC. Returns the new thread read point value. - */ - public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) { - perThreadReadPoint.set(mvcc.memstoreReadPoint()); - return getThreadReadPoint(); - } - - /** - * Set the thread MVCC read point to 0 (include everything). - */ - public static void resetThreadReadPoint() { - perThreadReadPoint.set(0L); - } - - /** * Generate and return a {@link WriteEntry} with a new write number. * To complete the WriteEntry and wait for it to be visible, * call {@link #completeMemstoreInsert(WriteEntry)}. Index: hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java =================================================================== --- hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java (revision 1532155) +++ hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java (working copy) @@ -229,6 +229,6 @@ // take default action return null; } - return new StoreScanner(store, scanInfo, scan, targetCols); + return new StoreScanner(store, scanInfo, scan, targetCols, store.getSmallestReadPoint()); } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1532584) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -674,11 +674,11 @@ /** * @return scanner on memstore and snapshot in this order. */ - List getScanners() { + List getScanners(long readPt) { this.lock.readLock().lock(); try { return Collections.singletonList( - new MemStoreScanner(MultiVersionConsistencyControl.getThreadReadPoint())); + new MemStoreScanner(readPt)); } finally { this.lock.readLock().unlock(); }