diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f5695a8..d237e4a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -147,11 +147,13 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; +import org.joni.ScanEnvironment; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multiset.Entry; import com.google.common.io.Closeables; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; @@ -315,7 +317,14 @@ public class HRegion implements HeapSize { // , Writable{ static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L; final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); - private final ConcurrentHashMap scannerReadPoints; + /** + * Accessing this HashMap must be synchronized since it accessed by multiple threads. It + * contains mvcc read points so updates must have synchronization span read of mvcc and update + * of Map (until we find a better way). Hiroshi Ikeda notes that the Map key is TMI!!! It means + * this Map is holding a reference to a Scanner... if we fail a close or some such, we are + * preventing Scanner GC. Fix. + */ + private final Map scannerReadPoints = new HashMap(); /** * The sequence ID that was encountered when this region was opened. @@ -359,10 +368,9 @@ public class HRegion implements HeapSize { // , Writable{ // We achieve this by synchronizing on the scannerReadPoints object. synchronized(scannerReadPoints) { minimumReadPoint = mvcc.memstoreReadPoint(); - - for (Long readPoint: this.scannerReadPoints.values()) { - if (readPoint < minimumReadPoint) { - minimumReadPoint = readPoint; + for (Map.Entry e: this.scannerReadPoints.entrySet()) { + if (e.getValue() < minimumReadPoint) { + minimumReadPoint = e.getValue(); } } } @@ -604,7 +612,6 @@ public class HRegion implements HeapSize { // , Writable{ this.rsServices = rsServices; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); setHTableSpecificConf(); - this.scannerReadPoints = new ConcurrentHashMap(); this.busyWaitDuration = conf.getLong( "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION); @@ -3980,7 +3987,7 @@ public class HRegion implements HeapSize { // , Writable{ private int batch; protected int isScan; private boolean filterClosed = false; - private long readPt; + private final long readPt; private long maxResultSize; protected HRegion region; @@ -4010,9 +4017,14 @@ public class HRegion implements HeapSize { // , Writable{ // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - // synchronize on scannerReadPoints so that nobody calculates + // Synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. + // This is a heavily contended spot when lots of random reads!! IsolationLevel isolationLevel = scan.getIsolationLevel(); + // If the isolation level is READ_UNCOMMITTED, should we just not bother registering? + // This was suggested by Hiroshi Ikeda. I was thinking that the READ_UNCOMMITTED applied to + // the frontier of updates, not the rear where Cells might be dropped because too many + // versions or beyond the TTL. synchronized(scannerReadPoints) { this.readPt = getReadpoint(isolationLevel); scannerReadPoints.put(this, this.readPt);