Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (revision 1546775) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (working copy) @@ -70,7 +70,9 @@ snapshot.size(), store.getFamily().getCompression(), false, true, true); writer.setTimeRangeTracker(snapshotTimeRangeTracker); try { - flushed = performFlush(scanner, writer, smallestReadPoint); + synchronized(scanner) { + flushed = performFlush(scanner, writer, smallestReadPoint); + } } finally { finalizeWriter(writer, cacheFlushId, status); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1546775) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3565,7 +3565,7 @@ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); + KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt, this); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1546775) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -1742,7 +1742,7 @@ @Override public KeyValueScanner getScanner(Scan scan, - final NavigableSet targetCols, long readPt) throws IOException { + final NavigableSet targetCols, long readPt, Object syncObject) throws IOException { lock.readLock().lock(); try { KeyValueScanner scanner = null; @@ -1752,6 +1752,7 @@ if (scanner == null) { scanner = new StoreScanner(this, getScanInfo(), scan, targetCols, readPt); } + scanner.setSyncObject(syncObject); return scanner; } finally { lock.readLock().unlock(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (revision 1546775) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (working copy) @@ -123,4 +123,10 @@ * assumed. */ boolean isFileScanner(); + + /** + * Set the object to lock when the scanners readers (if any) are updated + * @param obj + */ + public void setSyncObject(Object obj); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java (revision 1546775) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java (working copy) @@ -67,4 +67,9 @@ // Not a file by default. return false; } + + @Override + public void setSyncObject(Object obj) { + // no-op + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1546775) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -72,10 +72,11 @@ * compaction. * @param scan Scan to apply when scanning the stores * @param targetCols columns to scan + * @param syncObject Object to delgate synchronization to * @return a scanner over the current key values * @throws IOException on failure */ - KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols, long readPt) + KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols, long readPt, Object syncObject) throws IOException; /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (revision 1546775) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (working copy) @@ -389,4 +389,8 @@ return reader.passesTimerangeFilter(scan, oldestUnexpiredTS) && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns); } + @Override + public void setSyncObject(Object Obj) { + // no-op + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1546775) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -25,7 +25,6 @@ import java.util.List; import java.util.NavigableSet; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -97,9 +96,9 @@ // A flag whether use pread for scan private boolean scanUsePread = false; - private ReentrantLock lock = new ReentrantLock(); private final long readPt; + private Object syncObject; // used by the injection framework to test race between StoreScanner construction and compaction enum StoreScannerCompactionRace { @@ -242,7 +241,7 @@ matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow); } - + this.syncObject = this; this.store.addChangedReaderObserver(this); // Filter the list of scanners using Bloom filters, time range, TTL, etc. @@ -357,15 +356,10 @@ @Override public KeyValue peek() { - lock.lock(); - try { if (this.heap == null) { return this.lastTop; } return this.heap.peek(); - } finally { - lock.unlock(); - } } @Override @@ -376,32 +370,24 @@ @Override public void close() { - lock.lock(); - try { - if (this.closing) return; - this.closing = true; - // under test, we dont have a this.store - if (this.store != null) - this.store.deleteChangedReaderObserver(this); - if (this.heap != null) - this.heap.close(); - this.heap = null; // CLOSED! - this.lastTop = null; // If both are null, we are closed. - } finally { - lock.unlock(); + synchronized (syncObject) { + if (this.closing) return; + this.closing = true; + // under test, we dont have a this.store + if (this.store != null) + this.store.deleteChangedReaderObserver(this); + if (this.heap != null) + this.heap.close(); + this.heap = null; // CLOSED! + this.lastTop = null; // If both are null, we are closed. } } @Override public boolean seek(KeyValue key) throws IOException { - lock.lock(); - try { // reset matcher state, in case that underlying store changed checkReseek(); return this.heap.seek(key); - } finally { - lock.unlock(); - } } /** @@ -412,8 +398,6 @@ */ @Override public boolean next(List outResult, int limit) throws IOException { - lock.lock(); - try { if (checkReseek()) { return true; } @@ -549,9 +533,6 @@ // No more keys close(); return false; - } finally { - lock.unlock(); - } } @Override @@ -562,29 +543,26 @@ // Implementation of ChangedReadersObserver @Override public void updateReaders() throws IOException { - lock.lock(); - try { - if (this.closing) return; - - // All public synchronized API calls will call 'checkReseek' which will cause - // the scanner stack to reseek if this.heap==null && this.lastTop != null. - // But if two calls to updateReaders() happen without a 'next' or 'peek' then we - // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders - // which is NOT what we want, not to mention could cause an NPE. So we early out here. - if (this.heap == null) return; - - // this could be null. - this.lastTop = this.peek(); - - //DebugPrint.println("SS updateReaders, topKey = " + lastTop); - - // close scanners to old obsolete Store files - this.heap.close(); // bubble thru and close all scanners. - this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP - - // Let the next() call handle re-creating and seeking - } finally { - lock.unlock(); + synchronized (syncObject) { + if (this.closing) return; + + // All public synchronized API calls will call 'checkReseek' which will cause + // the scanner stack to reseek if this.heap==null && this.lastTop != null. + // But if two calls to updateReaders() happen without a 'next' or 'peek' then we + // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders + // which is NOT what we want, not to mention could cause an NPE. So we early out here. + if (this.heap == null) return; + + // this could be null. + this.lastTop = this.peek(); + + //DebugPrint.println("SS updateReaders, topKey = " + lastTop); + + // close scanners to old obsolete Store files + this.heap.close(); // bubble thru and close all scanners. + this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP + + // Let the next() call handle re-creating and seeking } } @@ -650,8 +628,6 @@ @Override public boolean reseek(KeyValue kv) throws IOException { - lock.lock(); - try { //Heap will not be null, if this is called from next() which. //If called from RegionScanner.reseek(...) make sure the scanner //stack is reset if needed. @@ -660,9 +636,6 @@ return heap.requestSeek(kv, true, useRowColBloom); } return heap.reseek(kv); - } finally { - lock.unlock(); - } } @Override @@ -732,5 +705,9 @@ public long getEstimatedNumberOfKvsScanned() { return this.kvsScanned; } + + @Override + public void setSyncObject(Object obj) { + this.syncObject = obj; + } } - Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java (revision 1546775) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java (working copy) @@ -83,7 +83,9 @@ mw.init(storeScanner, factory, store.getComparator()); synchronized (flushLock) { - flushedBytes = performFlush(scanner, mw, smallestReadPoint); + synchronized(scanner) { + flushedBytes = performFlush(scanner, mw, smallestReadPoint); + } result = mw.commitWriters(cacheFlushSeqNum, false); success = true; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (revision 1546775) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (working copy) @@ -73,7 +73,10 @@ // because we need record the max seq id for the store file, see HBASE-6059 writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); - boolean finished = performCompaction(scanner, writer, smallestReadPoint); + boolean finished; + synchronized(scanner) { + finished = performCompaction(scanner, writer, smallestReadPoint); + } if (!finished) { writer.close(); store.getFileSystem().delete(writer.getPath(), false); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java (revision 1546775) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java (working copy) @@ -122,7 +122,9 @@ // It is ok here if storeScanner is null. StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; mw.init(storeScanner, factory, store.getComparator()); - finished = performCompaction(scanner, mw, smallestReadPoint); + synchronized(scanner) { + finished = performCompaction(scanner, mw, smallestReadPoint); + } if (!finished) { throw new InterruptedIOException( "Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + Index: hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1546775) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -2809,7 +2809,7 @@ scan.getFamilyMap().get(store.getFamily().getName()), // originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to set // readpoint 0. - 0); + 0, null); List result = new ArrayList(); scanner.next(result);