diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index a65b27d..594a824 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -231,7 +231,7 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver { // take default action return null; } - return new StoreScanner(store, scanInfo, scan, targetCols, - ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); + return new StoreScanner(store, scanInfo, scan, targetCols, ((HStore) store).getHRegion() + .getReadpoint(IsolationLevel.READ_COMMITTED), s.getReaderLock()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6c9c31c..015993e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5336,7 +5336,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); + KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt, this); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 686df49..5fb2de4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1992,8 +1992,8 @@ public class HStore implements Store { ////////////////////////////////////////////////////////////////////////////// @Override - public KeyValueScanner getScanner(Scan scan, - final NavigableSet targetCols, long readPt) throws IOException { + public KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols, long readPt, + Object readerLock) throws IOException { lock.readLock().lock(); try { KeyValueScanner scanner = null; @@ -2002,8 +2002,8 @@ public class HStore implements Store { } if (scanner == null) { scanner = scan.isReversed() ? new ReversedStoreScanner(this, - getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this, - getScanInfo(), scan, targetCols, readPt); + getScanInfo(), scan, targetCols, readPt, readerLock) : new StoreScanner(this, + getScanInfo(), scan, targetCols, readPt, readerLock); } return scanner; } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index 76a9d0f..100bb8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -162,4 +162,11 @@ public interface KeyValueScanner { * if known, or null otherwise */ public Cell getNextIndexedKey(); + + /** + * Get the object to lock when the scanner's readers (if any) are updated (this is here so that the + * coprocessors creating {@link StoreScanner}'s do not have to change) + * @return obj lock object to use + */ + public Object getReaderLock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java index 957f417..96f78fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java @@ -71,4 +71,7 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner { public Cell getNextIndexedKey() { return null; } + + @Override + public Object getReaderLock() {return null;} } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index e319f90..1c5ccf9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -48,9 +48,9 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { * @throws IOException */ ReversedStoreScanner(Store store, ScanInfo scanInfo, Scan scan, - NavigableSet columns, long readPt) + NavigableSet columns, long readPt, Object readerLock) throws IOException { - super(store, scanInfo, scan, columns, readPt); + super(store, scanInfo, scan, columns, readPt, readerLock); } /** Constructor for testing. */ @@ -124,24 +124,13 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { @Override public boolean seekToPreviousRow(Cell key) throws IOException { - lock.lock(); - try { - checkReseek(); - return this.heap.seekToPreviousRow(key); - } finally { - lock.unlock(); - } - + checkReseek(); + return this.heap.seekToPreviousRow(key); } @Override public boolean backwardSeek(Cell key) throws IOException { - lock.lock(); - try { - checkReseek(); - return this.heap.backwardSeek(key); - } finally { - lock.unlock(); - } + checkReseek(); + return this.heap.backwardSeek(key); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index b638a8f..7aa1554 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -75,11 +75,12 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * compaction. * @param scan Scan to apply when scanning the stores * @param targetCols columns to scan + * @param readerLock object to lock when the scanner's readers (if any) are updated * @return a scanner over the current key values * @throws IOException on failure */ - KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols, long readPt) - throws IOException; + KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols, long readPt, + Object readerLock) throws IOException; /** * Get all scanners with no filtering based on TTL (that happens further down diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index a8ee091..0fafadf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -489,4 +489,7 @@ public class StoreFileScanner implements KeyValueScanner { public Cell getNextIndexedKey() { return hfs.getNextIndexedKey(); } + + @Override + public Object getReaderLock() {return null;} } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7ce4e0b..67e9555 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -103,10 +102,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // A flag whether use pread for scan private boolean scanUsePread = false; - protected ReentrantLock lock = new ReentrantLock(); private final long readPt; + // lock to use for updateReaders + // creator needs to ensure that: + // 1. *all* calls to public methods (except updateReaders and close) are locked with this lock + // (this can be done by passing the RegionScannerImpl object down) + // OR + // 2. updateReader is *never* called (such as in flushes or compactions) + private Object readerLock; + // used by the injection framework to test race between StoreScanner construction and compaction enum StoreScannerCompactionRace { BEFORE_SEEK, @@ -169,7 +175,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @throws IOException */ public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet columns, - long readPt) + long readPt, Object readerLock) throws IOException { this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), scanInfo.getMinVersions(), readPt); @@ -181,6 +187,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, store.getCoprocessorHost()); + this.readerLock = readerLock; this.store.addChangedReaderObserver(this); // Pass columns to try to filter out unnecessary StoreFiles. @@ -395,15 +402,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public Cell peek() { - lock.lock(); - try { if (this.heap == null) { return this.lastTop; } return this.heap.peek(); - } finally { - lock.unlock(); - } } @Override @@ -414,8 +416,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public void close() { - lock.lock(); - try { if (this.closing) return; this.closing = true; // under test, we dont have a this.store @@ -425,21 +425,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.heap.close(); this.heap = null; // CLOSED! this.lastTop = null; // If both are null, we are closed. - } finally { - lock.unlock(); - } } @Override public boolean seek(Cell key) throws IOException { - lock.lock(); - try { // reset matcher state, in case that underlying store changed checkReseek(); return this.heap.seek(key); - } finally { - lock.unlock(); - } } /** @@ -464,8 +456,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public NextState next(List outResult, int limit, long remainingResultSize) throws IOException { - lock.lock(); - try { if (checkReseek()) { return NextState.makeState(NextState.State.MORE_VALUES, 0); } @@ -617,9 +607,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // No more keys close(); return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize); - } finally { - lock.unlock(); - } } /* @@ -660,11 +647,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return next(outResult, -1); } + @Override + public Object getReaderLock() { + return readerLock; + } + // Implementation of ChangedReadersObserver @Override public void updateReaders() throws IOException { - lock.lock(); - try { + synchronized(readerLock) { if (this.closing) return; // All public synchronized API calls will call 'checkReseek' which will cause @@ -684,8 +675,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP // Let the next() call handle re-creating and seeking - } finally { - lock.unlock(); } } @@ -776,8 +765,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public boolean reseek(Cell kv) throws IOException { - lock.lock(); - try { //Heap will not be null, if this is called from next() which. //If called from RegionScanner.reseek(...) make sure the scanner //stack is reset if needed. @@ -786,9 +773,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return heap.requestSeek(kv, true, useRowColBloom); } return heap.reseek(kv); - } finally { - lock.unlock(); - } } @Override @@ -864,4 +848,3 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return this.heap.getNextIndexedKey(); } } - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 22fd326..d2193a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -3308,7 +3308,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { scan.getFamilyMap().get(store.getFamily().getName()), // originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to set // readpoint 0. - 0); + 0, null); List result = new ArrayList(); scanner.next(result); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 6c7552a..f5e6023 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -106,8 +106,8 @@ public class TestRegionObserverScannerOpenHook { Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s) throws IOException { scan.setFilter(new NoDataFilter()); - return new StoreScanner(store, store.getScanInfo(), scan, targetCols, - ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); + return new StoreScanner(store, store.getScanInfo(), scan, targetCols, ((HStore) store) + .getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), s.getReaderLock()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java index 36493cd..9a9916c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java @@ -77,8 +77,8 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver { HRegion r = c.getEnvironment().getRegion(); return scan.isReversed() ? new ReversedStoreScanner(store, store.getScanInfo(), scan, targetCols, r.getReadpoint(scan - .getIsolationLevel())) : new StoreScanner(store, + .getIsolationLevel()), s.getReaderLock()) : new StoreScanner(store, store.getScanInfo(), scan, targetCols, r.getReadpoint(scan - .getIsolationLevel())); + .getIsolationLevel()), s.getReaderLock()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index 7600388..939ffd8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -294,8 +294,8 @@ public class TestCoprocessorScanPolicy { newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); - return new StoreScanner(store, scanInfo, scan, targetCols, - ((HStore) store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); + return new StoreScanner(store, scanInfo, scan, targetCols, ((HStore) store).getHRegion() + .getReadpoint(IsolationLevel.READ_COMMITTED), s.getReaderLock()); } else { return s; }