Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1544388) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,6 +42,9 @@ /** * Scanner scans both the memstore and the HStore. Coalesce KeyValue stream * into List for a single row. + * This class is not thread-safe! Callers must ensure proper exclusion. + * (See {@link HRegion.RegionScannerImpl}) + * Only {@link #updateReaders()} may be called from another thread. */ public class StoreScanner extends NonLazyKeyValueScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver { @@ -53,8 +57,8 @@ private String metricNamePrefix; // Used to indicate that the scanner has closed (see HBASE-1107) - // Doesnt need to be volatile because it's always accessed via synchronized methods - private boolean closing = false; + private volatile boolean closing = false; + private final AtomicBoolean shouldUpdateReaders = new AtomicBoolean(false); private final boolean isGet; private final boolean explicitColumnQuery; private final boolean useRowColBloom; @@ -267,7 +271,7 @@ } @Override - public synchronized KeyValue peek() { + public KeyValue peek() { if (this.heap == null) { return this.lastTop; } @@ -281,7 +285,7 @@ } @Override - public synchronized void close() { + public void close() { if (this.closing) return; this.closing = true; // under test, we dont have a this.store @@ -294,7 +298,7 @@ } @Override - public synchronized boolean seek(KeyValue key) throws IOException { + public boolean seek(KeyValue key) throws IOException { if (this.heap == null) { List scanners = getScannersNoCompaction(); @@ -312,7 +316,7 @@ * @return true if there are more rows, false if scanner is done */ @Override - public synchronized boolean next(List outResult, int limit) throws IOException { + public boolean next(List outResult, int limit) throws IOException { return next(outResult, limit, null); } @@ -323,7 +327,7 @@ * @return true if there are more rows, false if scanner is done */ @Override - public synchronized boolean next(List outResult, int limit, + public boolean next(List outResult, int limit, String metric) throws IOException { if (checkReseek()) { @@ -450,39 +454,43 @@ } @Override - public synchronized boolean next(List outResult) throws IOException { + public boolean next(List outResult) throws IOException { return next(outResult, -1, null); } @Override - public synchronized boolean next(List outResult, String metric) + public boolean next(List outResult, String metric) throws IOException { return next(outResult, -1, metric); } // Implementation of ChangedReadersObserver @Override - public synchronized void updateReaders() throws IOException { + public void updateReaders() throws IOException { if (this.closing) return; - - // All public synchronized API calls will call 'checkReseek' which will cause - // the scanner stack to reseek if this.heap==null && this.lastTop != null. - // But if two calls to updateReaders() happen without a 'next' or 'peek' then we - // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders - // which is NOT what we want, not to mention could cause an NPE. So we early out here. - if (this.heap == null) return; - - // this could be null. - this.lastTop = this.peek(); - - //DebugPrint.println("SS updateReaders, topKey = " + lastTop); - - // close scanners to old obsolete Store files - this.heap.close(); // bubble thru and close all scanners. - this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP - - // Let the next() call handle re-creating and seeking + shouldUpdateReaders.set(true); } + private void checkUpdateReaders() throws IOException { + if (shouldUpdateReaders.getAndSet(false)) { + // All public API calls will call 'checkReseek' which will cause + // the scanner stack to reseek if this.heap==null && this.lastTop != null. + // But if two calls to updateReaders() happen without a 'next' or 'peek' then we + // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders + // which is NOT what we want, not to mention could cause an NPE. So we early out here. + if (this.heap == null) return; + + // this could be null. + this.lastTop = this.peek(); + + //DebugPrint.println("SS updateReaders, topKey = " + lastTop); + + // close scanners to old obsolete Store files + this.heap.close(); // bubble thru and close all scanners. + this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP + + // Let the next() call handle re-creating and seeking + } + } /** * @return true if top of heap has changed (and KeyValueHeap has to try the @@ -490,6 +498,7 @@ * @throws IOException */ private boolean checkReseek() throws IOException { + checkUpdateReaders(); if (this.heap == null && this.lastTop != null) { resetScannerStack(this.lastTop); if (this.heap.peek() == null @@ -539,7 +548,7 @@ } @Override - public synchronized boolean reseek(KeyValue kv) throws IOException { + public boolean reseek(KeyValue kv) throws IOException { //Heap will not be null, if this is called from next() which. //If called from RegionScanner.reseek(...) make sure the scanner //stack is reset if needed.