commit b7923a81dfcdc38ebdeeec53750cac5eb7f6441f Author: stack Date: Thu Jun 30 15:37:36 2016 -0700 HBASE-15716 HRegion#RegionScannerImpl scannerReadPoints synchronization constrains random read Implementation that makes use of Hiroshi Ikeda ScannReadPoints class. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c497ec0..5fc8073 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -325,7 +325,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L; final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); - private final ConcurrentHashMap scannerReadPoints; + private final ScannerReadPoints scannerReadPoints; /** * The sequence ID that was encountered when this region was opened. @@ -368,20 +368,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * read operation. */ public long getSmallestReadPoint() { - long minimumReadPoint; - // We need to ensure that while we are calculating the smallestReadPoint - // no new RegionScanners can grab a readPoint that we are unaware of. - // We achieve this by synchronizing on the scannerReadPoints object. - synchronized(scannerReadPoints) { - minimumReadPoint = mvcc.getReadPoint(); - - for (Long readPoint: this.scannerReadPoints.values()) { - if (readPoint < minimumReadPoint) { - minimumReadPoint = readPoint; - } - } - } - return minimumReadPoint; + return this.scannerReadPoints.getSmallestReadPoint(); } /* @@ -683,7 +670,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.rsServices = rsServices; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); setHTableSpecificConf(); - this.scannerReadPoints = new ConcurrentHashMap(); + this.scannerReadPoints = new ScannerReadPoints() { + @Override + long getMvccReadPoint() { + return mvcc.getReadPoint(); + } + }; this.busyWaitDuration = conf.getLong( "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION); @@ -5638,10 +5630,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected final byte[] stopRow; protected final HRegion region; - private final long readPt; private final long maxResultSize; private final ScannerContext defaultScannerContext; private final FilterWrapper filter; + /** + * This scanners registry in the Region Map of outstanding Scanners. + */ + private final ScannerReadPoints.Entry scannerReadPointsEntry; @Override public HRegionInfo getRegionInfo() { @@ -5674,13 +5669,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - // synchronize on scannerReadPoints so that nobody calculates - // getSmallestReadPoint, before scannerReadPoints is updated. - IsolationLevel isolationLevel = scan.getIsolationLevel(); - synchronized(scannerReadPoints) { - this.readPt = getReadpoint(isolationLevel); - scannerReadPoints.put(this, this.readPt); - } + // Register this Scanner + this.scannerReadPointsEntry = scannerReadPoints.getEntry(scan.getIsolationLevel()); // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). @@ -5700,7 +5690,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Store store = stores.get(entry.getKey()); KeyValueScanner scanner; try { - scanner = store.getScanner(scan, entry.getValue(), this.readPt); + scanner = store.getScanner(scan, entry.getValue(), + this.scannerReadPointsEntry.readPoint()); } catch (FileNotFoundException e) { throw handleFileNotFound(e); } @@ -5730,7 +5721,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private IOException handleException(List instantiatedScanners, Throwable t) { // remove scaner read point before throw the exception - scannerReadPoints.remove(this); + this.scannerReadPointsEntry.release(); if (storeHeap != null) { storeHeap.close(); storeHeap = null; @@ -5754,7 +5745,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public long getMvccReadPoint() { - return this.readPt; + return this.scannerReadPointsEntry.readPoint(); } @Override @@ -6230,8 +6221,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi joinedHeap.close(); joinedHeap = null; } - // no need to synchronize here. - scannerReadPoints.remove(this); + this.scannerReadPointsEntry.release(); this.filterClosed = true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerReadPoints.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerReadPoints.java new file mode 100644 index 0000000..743e014 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerReadPoints.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.IsolationLevel; + +/** + * Manages read points for scanners. + * Thread safe. + */ +@InterfaceAudience.Private +abstract class ScannerReadPoints { + /** + * Node of the internal queue. + */ + private static class Node { + final long readPoint; + + /** + * Reference count, or negative when this node instance is marked as expired + * (and is never restored). + * Before setting it to negative, the field {@code next} should be set. + */ + volatile long references; + + /** + * The following node, or null if this node is a tail. Set only once. + */ + volatile Node next; + + Node (long readPoint, long references) { + this.readPoint = readPoint; + this.references = references; + } + + boolean compareAndSetReferences(long expect, long update) { + return referencesUpdater.compareAndSet(this, expect, update); + } + + boolean setNextIfAbsent(Node update) { + return nextUpdater.compareAndSet(this, null, update); + } + + static final AtomicLongFieldUpdater referencesUpdater = + AtomicLongFieldUpdater.newUpdater(Node.class, "references"); + + static final AtomicReferenceFieldUpdater nextUpdater = + AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next"); + } + + /** + * Encapsulates an assigned read point. Thread safe. + */ + interface Entry { + /** + * Returns the assigned read point, which is never changed for each instance. + */ + long readPoint(); + + /** + * Notifies that the assigned read point ends its service. + * Duplicated calling this method has no effect. + */ + void release(); + } + + private static class EntryImpl implements Entry { + final Node node; + + @SuppressWarnings("unused") // used via releasedUpdater + volatile int released; // There is no boolean field updater. + + EntryImpl(Node node) { + this.node = node; + assert node.references > 0; + } + + @Override + public long readPoint() { + return node.readPoint; + } + + @Override + public void release() { + if (releasedUpdater.compareAndSet(this, 0, 1)) { + long refs; + while (true) { + refs = node.references; + assert refs > 0; + if (node.compareAndSetReferences(refs, refs - 1)) { + break; + } + } + } + } + + static final AtomicIntegerFieldUpdater releasedUpdater = + AtomicIntegerFieldUpdater.newUpdater(EntryImpl.class, "released"); + } + + private static final Entry READ_UNCOMMITTED_ENTRY = new Entry() { + @Override + public long readPoint() { + return Long.MAX_VALUE; + } + @Override + public void release() {} + }; + + /** + * Reference to the head of the internal queue, possibly staled. + * Threads trying to access the head should cooperate to update this reference. + */ + private final AtomicReference headRef; + + /** + * Reference to the tail of the internal queue, possibly staled. + * Threads trying to access the tail should cooperate to update this reference. + */ + private final AtomicReference tailRef; + + ScannerReadPoints() { + Node initNode = new Node(Long.MIN_VALUE, 0); + headRef = new AtomicReference(initNode); + tailRef = new AtomicReference(initNode); + } + + /** + * Returns an entry with assigning an read point according to its isolation level. + */ + Entry getEntry(IsolationLevel isolationLevel) { + if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { + return READ_UNCOMMITTED_ENTRY; + } + + Node tail = tailRef.get(); + while (true) { + while (true) { + Node next = tail.next; + if (next == null) { + break; + } + if (tailRef.compareAndSet(tail, next)) { + tail = next; + } else { + tail = tailRef.get(); + } + } + + long mvccReadPoint = getMvccReadPoint(); + + if (tail.readPoint < mvccReadPoint) { + Node newTail = new Node(mvccReadPoint, 1); + if (tail.setNextIfAbsent(newTail)) { + tailRef.compareAndSet(tail, newTail); + // Ignore the result; Another thread already did for you. + return new EntryImpl(newTail); + } // Otherwise tail.next is asynchronously set; Check again. + + } else { + while (true) { + long refs = tail.references; + if (refs < 0) { + // Already expired and the following node should be available; Check again. + break; + } + if (tail.compareAndSetReferences(refs, refs + 1)) { + return new EntryImpl(tail); + } + } + } + } + } + + /** + * Returns the smallest read point whether current mvcc point or one of + * the entries here which has not yet been released. + */ + long getSmallestReadPoint() { + Node head = headRef.get(); + while (true) { + long refs = head.references; + if (refs > 0) { + return head.readPoint; + } + + Node next = head.next; + if (next == null) { + return getMvccReadPoint(); + } + + if (refs < 0 || head.compareAndSetReferences(0, -1)) { + if (headRef.compareAndSet(head, next)) { + head = next; + } else { + head = headRef.get(); + } + } // Otherwise head.references is asynchronously changed from 0; Check again. + } + } + + /** + * Returns the current mvcc read point, which is supposed to be always advancing. + * Possibly simultaneously called by threads which are calling non-private methods. + */ + abstract long getMvccReadPoint(); +}