diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e620c60..5d519af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -332,7 +332,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L; final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); - private final ConcurrentHashMap scannerReadPoints; + private final ConcurrentMap scannerReadPoints = + new ConcurrentHashMap(); /** * The sequence ID that was encountered when this region was opened. @@ -375,17 +376,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * read operation. */ public long getSmallestReadPoint() { - long minimumReadPoint; - // We need to ensure that while we are calculating the smallestReadPoint - // no new RegionScanners can grab a readPoint that we are unaware of. - // We achieve this by synchronizing on the scannerReadPoints object. - synchronized(scannerReadPoints) { - minimumReadPoint = mvcc.getReadPoint(); - - for (Long readPoint: this.scannerReadPoints.values()) { - if (readPoint < minimumReadPoint) { - minimumReadPoint = readPoint; - } + long minimumReadPoint = mvcc.getReadPoint(); + for (Entry e: this.scannerReadPoints.entrySet()) { + if (e.getValue() < minimumReadPoint) { + minimumReadPoint = e.getValue(); } } return minimumReadPoint; @@ -690,7 +684,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.rsServices = rsServices; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); setHTableSpecificConf(); - this.scannerReadPoints = new ConcurrentHashMap(); this.busyWaitDuration = conf.getLong( "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION); @@ -5598,14 +5591,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If we are doing a get, we want to be [startRow,endRow] normally // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - - // synchronize on scannerReadPoints so that nobody calculates - // getSmallestReadPoint, before scannerReadPoints is updated. - IsolationLevel isolationLevel = scan.getIsolationLevel(); - synchronized(scannerReadPoints) { - this.readPt = getReadpoint(isolationLevel); - scannerReadPoints.put(this, this.readPt); - } + // Figure our mvcc read point + this.readPt = calculateReadPoint(scan.getIsolationLevel()); // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). @@ -5634,6 +5621,37 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi initializeKVHeap(scanners, joinedScanners, region); } + /** + * Calculate the readPoint to run this scanner at and then stamp it into the regions + * scannerReadPoints map so others know we are running and what our readpoint is. + * One such consumer (the only consumer?) of the scannerReadPoints map is pruning of too + * many or expired versions done on update. It calls the below getSmallestReadPoint method. + * It tries to not prune Cells that could be inside the view of a running Scanner. + * + * The below is a check and put. We will not progress until for sure what is in + * scannerReadPoints as our readpoint is the current mvcc readpoint. This way, we + * avoid getSmallestReadpoint below ever returning a readpoint below our readpoint (and + * so going on to remove items that should be part of our 'view'). + * + * TODO: Revisit when the in-memory compaction goes in; it will do removal of too many + * versions or expired cells. + */ + private long calculateReadPoint(final IsolationLevel level) { + long readPoint = getReadpoint(level); + while (true) { + scannerReadPoints.put(this, readPoint); + long currentReadPoint = getReadpoint(level); + if (readPoint == currentReadPoint) { + // The readPoint has not changed since our read; we can break and progress knowing that + // what got lodged into scannerReadPoints is the latest. + break; + } + // Readpoint changed since we stored to scannerReadPoints. Go around again. + readPoint = currentReadPoint; + } + return readPoint; + } + protected void initializeKVHeap(List scanners, List joinedScanners, HRegion region) throws IOException { @@ -6126,7 +6144,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi joinedHeap.close(); joinedHeap = null; } - // no need to synchronize here. scannerReadPoints.remove(this); this.filterClosed = true; }