Index: src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java (revision 1533311) +++ src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -254,7 +255,8 @@ newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); - return new StoreScanner(store, scanInfo, scan, targetCols); + return new StoreScanner(store, scanInfo, scan, targetCols, + store.getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); } } Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1533311) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -1931,10 +1932,12 @@ */ public static List getFromStoreFile(Store store, Get get) throws IOException { - MultiVersionConsistencyControl.resetThreadReadPoint(); Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, - scan.getFamilyMap().get(store.getFamily().getName())); + scan.getFamilyMap().get(store.getFamily().getName()), + // originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to set + // readpoint 0. + 0); List result = new ArrayList(); scanner.next(result); Index: src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (revision 1533311) +++ src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (working copy) @@ -74,6 +74,8 @@ public KeyValueScanner preStoreScannerOpen(final ObserverContext c, Store store, final Scan scan, final NavigableSet targetCols, KeyValueScanner s) throws IOException { - return new StoreScanner(store, store.getScanInfo(), scan, targetCols); + HRegion r = c.getEnvironment().getRegion(); + return new StoreScanner(store, store.getScanInfo(), scan, targetCols, + r.getReadpoint(scan.getIsolationLevel())); } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (revision 1533311) +++ src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (working copy) @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.BlockCache; @@ -332,10 +333,6 @@ public void runMergeWorkload() throws IOException { long maxKeyCount = prepareForMerge(); - List scanners = - StoreFileScanner.getScannersForStoreFiles(inputStoreFiles, false, - false); - HColumnDescriptor columnDescriptor = new HColumnDescriptor( HFileReadWriteTest.class.getSimpleName()); columnDescriptor.setBlocksize(blockSize); @@ -347,7 +344,9 @@ HRegion region = new HRegion(outputDir, null, fs, conf, regionInfo, htd, null); Store store = new Store(outputDir, region, columnDescriptor, fs, conf); - + List scanners = + StoreFileScanner.getScannersForStoreFiles(inputStoreFiles, false, + false, region.getReadpoint(IsolationLevel.READ_COMMITTED)); StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(conf), fs, blockSize) .withOutputDir(outputDir) Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1533311) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy) @@ -88,10 +88,9 @@ */ public void testScanAcrossSnapshot() throws IOException { int rowCount = addRows(this.memstore); - List memstorescanners = this.memstore.getScanners(); + List memstorescanners = this.memstore.getScanners(0); Scan scan = new Scan(); List result = new ArrayList(); - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false, 0, this.memstore.comparator); ScanType scanType = ScanType.USER_SCAN; @@ -113,8 +112,7 @@ scanner.close(); } - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - memstorescanners = this.memstore.getScanners(); + memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint()); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); count = 0; @@ -139,7 +137,7 @@ for (KeyValueScanner scanner : memstorescanners) { scanner.close(); } - memstorescanners = this.memstore.getScanners(); + memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint()); // Assert that new values are seen in kvset as we scan. long ts = System.currentTimeMillis(); s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); @@ -203,8 +201,7 @@ private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - List memstorescanners = this.memstore.getScanners(); + List memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint()); assertEquals(1, memstorescanners.size()); final KeyValueScanner scanner = memstorescanners.get(0); scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW)); @@ -245,14 +242,12 @@ kv1.setMemstoreTS(w.getWriteNumber()); memstore.add(kv1); - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - KeyValueScanner s = this.memstore.getScanners().get(0); + KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{}); mvcc.completeMemstoreInsert(w); - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1}); w = mvcc.beginMemstoreInsert(); @@ -260,14 +255,12 @@ kv2.setMemstoreTS(w.getWriteNumber()); memstore.add(kv2); - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1}); mvcc.completeMemstoreInsert(w); - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1, kv2}); } @@ -299,8 +292,7 @@ mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - KeyValueScanner s = this.memstore.getScanners().get(0); + KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 @@ -314,8 +306,7 @@ memstore.add(kv22); // BEFORE COMPLETING INSERT 2, SEE FIRST KVS - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE INSERT 2 @@ -324,8 +315,7 @@ // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // See HBASE-1485 for discussion about what we should do with // the duplicate-TS inserts - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); } @@ -354,8 +344,7 @@ mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - KeyValueScanner s = this.memstore.getScanners().get(0); + KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns @@ -366,16 +355,14 @@ memstore.add(kvDel); // BEFORE COMPLETING DELETE, SEE FIRST KVS - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE DELETE mvcc.completeMemstoreInsert(w); // NOW WE SHOULD SEE DELETE - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - s = this.memstore.getScanners().get(0); + s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); } @@ -427,9 +414,7 @@ mvcc.completeMemstoreInsert(w); // Assert that we can read back - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - - KeyValueScanner s = this.memstore.getScanners().get(0); + KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); s.seek(kv); KeyValue ret = s.next(); @@ -536,7 +521,6 @@ * @throws InterruptedException */ public void testGetNextRow() throws Exception { - MultiVersionConsistencyControl.resetThreadReadPoint(); addRows(this.memstore); // Add more versions to make it a little more interesting. Thread.sleep(1); @@ -561,7 +545,7 @@ ScanType scanType = ScanType.USER_SCAN; InternalScanner scanner = new StoreScanner(new Scan( Bytes.toBytes(startRowId)), scanInfo, scanType, null, - memstore.getScanners()); + memstore.getScanners(0)); List results = new ArrayList(); for (int i = 0; scanner.next(results); i++) { int rowId = startRowId + i; @@ -1055,7 +1039,7 @@ static void doScan(MemStore ms, int iteration) throws IOException { long nanos = System.nanoTime(); - KeyValueScanner s = ms.getScanners().get(0); + KeyValueScanner s = ms.getScanners(0).get(0); s.seek(KeyValue.createFirstOnRow(new byte[]{})); System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000); @@ -1074,11 +1058,6 @@ addRows(25000, ms); System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000); - - System.out.println("foo"); - - MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - for (int i = 0 ; i < 50 ; i++) doScan(ms, i); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java (revision 1533311) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java (working copy) @@ -134,7 +134,9 @@ cacheConf, BloomType.NONE, NoOpDataBlockEncoder.INSTANCE); List scanners = StoreFileScanner.getScannersForStoreFiles( - Collections.singletonList(sf), false, true, false); + Collections.singletonList(sf), false, true, false, + // 0 is passed as readpoint because this test operates on StoreFile directly + 0); KeyValueScanner scanner = scanners.get(0); FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (revision 1533311) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (working copy) @@ -559,7 +559,7 @@ StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.MAJOR_COMPACT, null, scanners, - HConstants.OLDEST_TIMESTAMP); + HConstants.OLDEST_TIMESTAMP, 0); List results = new ArrayList(); results = new ArrayList(); assertEquals(true, scanner.next(results)); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1533311) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -1923,12 +1923,10 @@ scan.addFamily(fam2); scan.addFamily(fam4); is = (RegionScannerImpl) region.getScanner(scan); - MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); scan = new Scan(); is = (RegionScannerImpl) region.getScanner(scan); - MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(families.length -1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); } finally { Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java (revision 1533311) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java (working copy) @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -111,7 +112,8 @@ Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s) throws IOException { scan.setFilter(new NoDataFilter()); - return new StoreScanner(store, store.getScanInfo(), scan, targetCols); + return new StoreScanner(store, store.getScanInfo(), scan, targetCols, + store.getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1533311) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -57,7 +57,7 @@ private KeyValueScanner current = null; private KVScannerComparator comparator; - + /** * Constructor. This KeyValueHeap will handle closing of passed in * KeyValueScanners. Index: src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (revision 1533311) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (working copy) @@ -145,16 +145,15 @@ (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ? store.getFamily().getCompactionCompression(): compression; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = store.getHRegion().getSmallestReadPoint(); // For each file, obtain a scanner: List scanners = StoreFileScanner - .getScannersForStoreFiles(filesToCompact, false, false, true); + .getScannersForStoreFiles(filesToCompact, false, false, true, smallestReadPoint); // Make the instantiation lazy in case compaction produces no product; i.e. // where all source cells are expired or deleted. StoreFile.Writer writer = null; - // Find the smallest read point across all the Scanners. - long smallestReadPoint = store.getHRegion().getSmallestReadPoint(); - MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); try { InternalScanner scanner = null; try { Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1533311) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -72,10 +72,12 @@ // if heap == null and lastTop != null, you need to reseek given the key below private KeyValue lastTop = null; + private final long readPt; /** An internal constructor. */ private StoreScanner(Store store, boolean cacheBlocks, Scan scan, - final NavigableSet columns, long ttl, int minVersions) { + final NavigableSet columns, long ttl, int minVersions, long readPt) { + this.readPt = readPt; this.store = store; this.cacheBlocks = cacheBlocks; isGet = scan.isGetScan(); @@ -102,10 +104,10 @@ * @param columns which columns we are scanning * @throws IOException */ - public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet columns) - throws IOException { + public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet columns, + long readPt) throws IOException { this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), - scanInfo.getMinVersions()); + scanInfo.getMinVersions(), readPt); initializeMetricNames(); if (columns != null && scan.isRaw()) { throw new DoNotRetryIOException( @@ -152,7 +154,7 @@ List scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { this(store, false, scan, null, scanInfo.getTtl(), - scanInfo.getMinVersions()); + scanInfo.getMinVersions(), smallestReadPoint); initializeMetricNames(); matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint, earliestPutTs, oldestUnexpiredTS); @@ -174,16 +176,18 @@ ScanType scanType, final NavigableSet columns, final List scanners) throws IOException { this(scan, scanInfo, scanType, columns, scanners, - HConstants.LATEST_TIMESTAMP); + HConstants.LATEST_TIMESTAMP, + // 0 is passed as readpoint because the test bypasses Store + 0); } // Constructor for testing. StoreScanner(final Scan scan, Store.ScanInfo scanInfo, ScanType scanType, final NavigableSet columns, - final List scanners, long earliestPutTs) + final List scanners, long earliestPutTs, long readPt) throws IOException { this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), - scanInfo.getMinVersions()); + scanInfo.getMinVersions(), readPt); this.initializeMetricNames(); this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); @@ -219,7 +223,7 @@ private List getScannersNoCompaction() throws IOException { final boolean isCompaction = false; return selectScannersFrom(store.getScanners(cacheBlocks, isGet, - isCompaction, matcher)); + isCompaction, matcher, this.readPt)); } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (revision 1533311) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (working copy) @@ -56,12 +56,16 @@ private static AtomicLong seekCount; private ScanQueryMatcher matcher; + + private long readPt; /** * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param hfs HFile scanner */ - public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, boolean hasMVCC) { + public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, + boolean hasMVCC, long readPt) { + this.readPt = readPt; this.reader = reader; this.hfs = hfs; this.enforceMVCC = useMVCC; @@ -75,9 +79,9 @@ public static List getScannersForStoreFiles( Collection files, boolean cacheBlocks, - boolean usePread) throws IOException { + boolean usePread, long readPt) throws IOException { return getScannersForStoreFiles(files, cacheBlocks, - usePread, false); + usePread, false, readPt); } /** @@ -85,9 +89,9 @@ */ public static List getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction) throws IOException { + boolean isCompaction, long readPt) throws IOException { return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, - null); + null, readPt); } /** @@ -97,13 +101,13 @@ */ public static List getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, ScanQueryMatcher matcher) throws IOException { + boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException { List scanners = new ArrayList( files.size()); for (StoreFile file : files) { StoreFile.Reader r = file.createReader(); StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, - isCompaction); + isCompaction, readPt); scanner.setScanQueryMatcher(matcher); scanners.add(scanner); } @@ -178,13 +182,11 @@ } protected boolean skipKVsNewerThanReadpoint() throws IOException { - long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); - // We want to ignore all key-values that are newer than our current // readPoint while(enforceMVCC && cur != null - && (cur.getMemstoreTS() > readPoint)) { + && (cur.getMemstoreTS() > readPt)) { hfs.next(); cur = hfs.getKeyValue(); } @@ -200,7 +202,7 @@ // older KV which was not reset to 0 (because it was // not old enough during flush). Make sure that we set it correctly now, // so that the comparision order does not change. - if (cur.getMemstoreTS() <= readPoint) { + if (cur.getMemstoreTS() <= readPt) { cur.setMemstoreTS(0); } return true; Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1533311) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -925,6 +925,17 @@ public MultiVersionConsistencyControl getMVCC() { return mvcc; } + + /* + * Returns readpoint considering given IsolationLevel + */ + public long getReadpoint(IsolationLevel isolationLevel) { + if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { + // This scan can read even uncommitted transactions + return Long.MAX_VALUE; + } + return mvcc.memstoreReadPoint(); + } public boolean isLoadingCfsOnDemandDefault() { return this.isLoadingCfsOnDemandDefault; @@ -3823,13 +3834,7 @@ // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { - if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { - // This scan can read even uncommitted transactions - this.readPt = Long.MAX_VALUE; - MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); - } else { - this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - } + this.readPt = getReadpoint(isolationLevel); scannerReadPoints.put(this, this.readPt); } @@ -3844,7 +3849,7 @@ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); + KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || FilterBase.isFamilyEssential(this.filter, entry.getKey())) { scanners.add(scanner); @@ -3893,10 +3898,6 @@ readRequestsCount.increment(); opMetrics.setReadRequestCountMetrics(readRequestsCount.get()); try { - - // This could be a new thread from the last time we called next(). - MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); - return nextRaw(outResults, limit, metric); } finally { closeRegionOperation(); @@ -4175,8 +4176,6 @@ boolean result = false; startRegionOperation(); try { - // This could be a new thread from the last time we called next(). - MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); KeyValue kv = KeyValue.createFirstOnRow(row); // use request seek to make use of the lazy seek option. See HBASE-5520 result = this.storeHeap.requestSeek(kv, true, true); Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1533311) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -1062,13 +1062,14 @@ protected List getScanners(boolean cacheBlocks, boolean isGet, boolean isCompaction, - ScanQueryMatcher matcher) throws IOException { + ScanQueryMatcher matcher, + long readPt) throws IOException { List storeFiles; List memStoreScanners; this.lock.readLock().lock(); try { storeFiles = this.getStorefiles(); - memStoreScanners = this.memstore.getScanners(); + memStoreScanners = this.memstore.getScanners(readPt); } finally { this.lock.readLock().unlock(); } @@ -1079,7 +1080,7 @@ // but now we get them in ascending order, which I think is // actually more correct, since memstore get put at the end. List sfScanners = StoreFileScanner - .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher); + .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher, readPt); List scanners = new ArrayList(sfScanners.size()+1); scanners.addAll(sfScanners); @@ -2200,7 +2201,7 @@ * @throws IOException */ public KeyValueScanner getScanner(Scan scan, - final NavigableSet targetCols) throws IOException { + final NavigableSet targetCols, long readPt) throws IOException { lock.readLock().lock(); try { KeyValueScanner scanner = null; @@ -2208,7 +2209,7 @@ scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); } if (scanner == null) { - scanner = new StoreScanner(this, getScanInfo(), scan, targetCols); + scanner = new StoreScanner(this, getScanInfo(), scan, targetCols, readPt); } return scanner; } finally { Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1533311) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -1408,7 +1408,10 @@ */ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) { - return getStoreFileScanner(cacheBlocks, pread, false); + return getStoreFileScanner(cacheBlocks, pread, false, + // 0 is passed as readpoint because this method is only used by test + // where StoreFile is directly operated upon + 0); } /** @@ -1421,10 +1424,10 @@ */ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread, - boolean isCompaction) { + boolean isCompaction, long readPt) { return new StoreFileScanner(this, - getScanner(cacheBlocks, pread, - isCompaction), !isCompaction, reader.hasMVCCInfo()); + getScanner(cacheBlocks, pread, isCompaction), + !isCompaction, reader.hasMVCCInfo(), readPt); } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1533311) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2635,7 +2635,6 @@ } } - MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint()); region.startRegionOperation(); try { int i = 0; Index: src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java (revision 1533311) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java (working copy) @@ -24,9 +24,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.Log; - /** * Manages the read/write consistency within memstore. This provides * an interface for readers to determine what entries to ignore, and @@ -43,15 +40,6 @@ private final LinkedList writeQueue = new LinkedList(); - private static final ThreadLocal perThreadReadPoint = - new ThreadLocal() { - @Override - protected - Long initialValue() { - return Long.MAX_VALUE; - } - }; - /** * Default constructor. Initializes the memstoreRead/Write points to 0. */ @@ -73,40 +61,6 @@ } } - /** - * Get this thread's read point. Used primarily by the memstore scanner to - * know which values to skip (ie: have not been completed/committed to - * memstore). - */ - public static long getThreadReadPoint() { - return perThreadReadPoint.get(); - } - - /** - * Set the thread read point to the given value. The thread MVCC - * is used by the Memstore scanner so it knows which values to skip. - * Give it a value of 0 if you want everything. - */ - public static void setThreadReadPoint(long readPoint) { - perThreadReadPoint.set(readPoint); - } - - /** - * Set the thread MVCC read point to whatever the current read point is in - * this particular instance of MVCC. Returns the new thread read point value. - */ - public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) { - perThreadReadPoint.set(mvcc.memstoreReadPoint()); - return getThreadReadPoint(); - } - - /** - * Set the thread MVCC read point to 0 (include everything). - */ - public static void resetThreadReadPoint() { - perThreadReadPoint.set(0L); - } - public WriteEntry beginMemstoreInsert() { synchronized (writeQueue) { long nextWriteNumber = ++memstoreWrite; Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1533311) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -664,11 +664,11 @@ /** * @return scanner on memstore and snapshot in this order. */ - List getScanners() { + List getScanners(long readPt) { this.lock.readLock().lock(); try { return Collections.singletonList( - new MemStoreScanner(MultiVersionConsistencyControl.getThreadReadPoint())); + new MemStoreScanner(readPt)); } finally { this.lock.readLock().unlock(); } Index: src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java (revision 1533311) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java (working copy) @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -225,6 +226,7 @@ // take default action return null; } - return new StoreScanner(store, scanInfo, scan, targetCols); + return new StoreScanner(store, scanInfo, scan, targetCols, + store.getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); } }