commit 26e36c6084cba2d540c8ee90ff6d54229eb3e3c6 Author: stack Date: Sat Jun 25 23:43:07 2016 +0100 Has Scanner locks and alternative synchronization Add other sync removals diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 02fe1df..a690dbb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -672,14 +672,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } - private synchronized void doRunLoop() { + private void doRunLoop() { while (running) { try { readSelector.select(); while (adding) { - this.wait(1000); + synchronized (this) { + this.wait(0, 1000); + } } - Iterator iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c497ec0..d947802 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -325,7 +325,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L; final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); - private final ConcurrentHashMap scannerReadPoints; + /** + * Used to keep account of oldest outstanding mvcc read point. + */ + private final ConcurrentHashMap scannerReadPoints = + new ConcurrentHashMap(); /** * The sequence ID that was encountered when this region was opened. @@ -368,17 +372,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * read operation. */ public long getSmallestReadPoint() { - long minimumReadPoint; - // We need to ensure that while we are calculating the smallestReadPoint - // no new RegionScanners can grab a readPoint that we are unaware of. - // We achieve this by synchronizing on the scannerReadPoints object. - synchronized(scannerReadPoints) { - minimumReadPoint = mvcc.getReadPoint(); - - for (Long readPoint: this.scannerReadPoints.values()) { - if (readPoint < minimumReadPoint) { - minimumReadPoint = readPoint; - } + // This is tied with registration of scanners in scannerReadPoints in the region scanner + // constructor. See note there on concurrency assumption (Trying to avoid expensive synchronize + // that ties scannerReadPoints and mvcc.getReadPoint(); it shows up when doing heavy random + // read). + long minimumReadPoint = mvcc.getReadPoint(); + for (Long readPoint: this.scannerReadPoints.values()) { + if (readPoint < minimumReadPoint) { + minimumReadPoint = readPoint; } } return minimumReadPoint; @@ -683,7 +684,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.rsServices = rsServices; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); setHTableSpecificConf(); - this.scannerReadPoints = new ConcurrentHashMap(); this.busyWaitDuration = conf.getLong( "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION); @@ -5632,7 +5632,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * If the joined heap data gathering is interrupted due to scan limits, this will * contain the row for which we are populating the values.*/ protected Cell joinedContinuationRow = null; - private boolean filterClosed = false; + private volatile boolean scannerClosed = false; protected final int isScan; protected final byte[] stopRow; @@ -5674,13 +5674,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - // synchronize on scannerReadPoints so that nobody calculates - // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); - synchronized(scannerReadPoints) { - this.readPt = getReadpoint(isolationLevel); - scannerReadPoints.put(this, this.readPt); + // Make sure that before we progress, the read point that is in scannerReadPoints for this + // scanner is the latest. We don't want read points seen later in getSmallestReadPt to not + // include where we will be reading at. It is ok if getSmallestReadPt does not see this + // scanner but it can't go to work with a read point that is larger than ours when we go to + // read. + // + // CONCURRENCY: We depend on the fact that mvcc read point is always advancing. + // Cost is two reads of an AtomicLong at a minimum which hopefully costs less than a + // synchronization block. Lets see. TODO! This code is tied with getSmallestReadPt. + // NOTE: This registration is leaking this object before it is fully instantiated but should + // be ok given only used by getSmallestReadPt. + long workingReadPt = -1; + while (true) { + long localReadPt = getReadpoint(isolationLevel); + if (localReadPt == workingReadPt) { + break; + } + // Otherwise, go around again. When we leave here, our scanner read point must be the + // latest. Can't have it such that the getSmallestReadPt can return a read point that is + // beyond what we have registered here. + workingReadPt = localReadPt; + scannerReadPoints.put(this, workingReadPt); } + this.readPt = workingReadPt; // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). @@ -5718,6 +5736,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + // Called by Constructor only! protected void initializeKVHeap(List scanners, List joinedScanners, HRegion region) throws IOException { @@ -5727,22 +5746,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + // Called by Constructor only! private IOException handleException(List instantiatedScanners, Throwable t) { - // remove scaner read point before throw the exception - scannerReadPoints.remove(this); - if (storeHeap != null) { - storeHeap.close(); - storeHeap = null; - if (joinedHeap != null) { - joinedHeap.close(); - joinedHeap = null; - } - } else { - // close all already instantiated scanners before throwing the exception - for (KeyValueScanner scanner : instantiatedScanners) { + try { + // Close all already instantiated scanners if any before throwing the exception + for (KeyValueScanner scanner: instantiatedScanners) { scanner.close(); } + } finally { + close(); } return t instanceof IOException ? (IOException) t : new IOException(t); } @@ -5781,13 +5794,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public synchronized boolean next(List outResults, ScannerContext scannerContext) + public boolean next(List outResults, ScannerContext scannerContext) throws IOException { - if (this.filterClosed) { - throw new UnknownScannerException("Scanner was closed (timed out?) " + - "after we renewed it. Could be caused by a very slow scanner " + - "or a lengthy garbage collection"); - } startRegionOperation(Operation.SCAN); readRequestsCount.increment(); try { @@ -5806,9 +5814,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public boolean nextRaw(List outResults, ScannerContext scannerContext) throws IOException { - if (storeHeap == null) { - // scanner is closed - throw new UnknownScannerException("Scanner was closed"); + if (this.scannerClosed) { + throw new UnknownScannerException("Scanner was closed (timed out?) " + + "after we renewed it. Could be caused by a very slow scanner " + + "or a lengthy garbage collection"); } boolean moreValues; if (outResults.isEmpty()) { @@ -6221,7 +6230,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public synchronized void close() { + public void close() { + this.scannerClosed = true; + scannerReadPoints.remove(this); if (storeHeap != null) { storeHeap.close(); storeHeap = null; @@ -6230,9 +6241,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi joinedHeap.close(); joinedHeap = null; } - // no need to synchronize here. - scannerReadPoints.remove(this); - this.filterClosed = true; } KeyValueHeap getStoreHeapForTesting() {