From 77484b0468670cb7f1aa91efe001e1c01fb32174 Mon Sep 17 00:00:00 2001 From: Walter Koetke Date: Thu, 31 Mar 2016 10:06:40 -0700 Subject: [PATCH] HBASE-15556: add extensible ConsistencyControl interface Summary: preserve existing functionality of MultiVersionConsistencyControl class but abstract it into a pluggable interface. --- .../hbase/regionserver/ConsistencyControl.java | 141 ++++++++++++++++++++ .../regionserver/ConsistencyControlUtils.java | 72 +++++++++++ .../apache/hadoop/hbase/regionserver/HRegion.java | 64 +++------ .../MultiVersionConsistencyControl.java | 144 +++++++++------------ .../hadoop/hbase/regionserver/WriteEntry.java | 51 ++++++++ .../hbase/regionserver/TestDefaultMemStore.java | 8 +- .../TestMultiVersionConsistencyControl.java | 2 +- 7 files changed, 350 insertions(+), 132 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControl.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControlUtils.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WriteEntry.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControl.java new file mode 100644 index 0000000..a44c64a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControl.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Scan; +import java.io.IOException; + +/** + * Manages the read/write consistency within memstore. This provides + * an interface for readers to determine what entries to ignore, and + * a mechanism for writers to obtain new write numbers, then "commit" + * the new writes for readers to read (thus forming atomic transactions). + */ +@InterfaceAudience.Private +public interface ConsistencyControl { + + /** + * Initializes the memstore read and write points appropriately. + * + * @param startPoint starting read point + */ + void initialize(long startPoint); + + /** + * Advances the current read point to be given seqNum if it is smaller than that. + * + * @param seqNumber sequence number + */ + void advanceMemstoreReadPointIfNeeded(long seqNumber); + + /** + * Starts a MVCC transaction without a specified sequence number. + * + * @return WriteEntry the new WriteEntry instance + */ + WriteEntry beginMemstoreInsert(); + + /** + * Starts a MVCC transaction with current region's log change sequence number. + * + * @param curSeqNum current region log change sequence number + * @return WriteEntry a WriteEntry instance with the passed in curSeqNum + */ + WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum); + + /** + * Returns readpoint considering given IsolationLevel + * + * @param isolationLevel isolation level to take into account + * @return long readpoint + */ + long getReadpoint(IsolationLevel isolationLevel); + + /** + * Adds the specified region scanner to the set we keep track of. + * + * @param regionScanner the scanner to be added + * @param scan the scan whose isolation level should be considered + * @return current readpoint + */ + long addScan(RegionScanner regionScanner, Scan scan); + + /** + * Removes the specified region scanner to the set we keep track of. + * + * @param regionScanner the scanner to be removed + */ + void removeScan(RegionScanner regionScanner); + + /** + * @return the smallest mvcc readpoint across all the scanners in this + * region. Writes older than this readpoint are included in every + * read operation. + */ + long getSmallestReadPoint(); + + /** + * Completes a {@link WriteEntry} that was created by + * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read + * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is + * visible to MVCC readers. + * + * @param e the WriteEntry instance to be marked complete + * @param seqId the SequenceId instance from which next write number will be assigned + * @throws java.io.IOException + */ + void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId) + throws IOException; + + /** + * Completes a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the + * end of this call, the global read point is at least as large as the write point of the passed + * in WriteEntry. Thus, the write is visible to MVCC readers. + * + * @param e the WriteEntry instance to be marked complete + */ + void completeMemstoreInsert(WriteEntry e); + + /** + * Marks the {@link WriteEntry} as complete and advance the read point as + * much as possible. + * + * @param e the WriteEntry instance to be marked complete + * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) + */ + boolean advanceMemstore(WriteEntry e); + + /** + * Waits for all previous MVCC transactions complete, if required + */ + void waitForPreviousTransactionsComplete(); + + /** + * Waits for previous transactions to complete, if required + */ + void waitForPreviousTransactionsComplete(WriteEntry waitedEntry); + + /** + * Retrieves a readpoint for scans + */ + long memstoreReadPoint(); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControlUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControlUtils.java new file mode 100644 index 0000000..d0918a7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControlUtils.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Utility functions pertaining to consistency control. + * + * @see {@link ConsistencyControl} + * @see {@link MultiVersionConsistencyControl} + */ +public class ConsistencyControlUtils { + + // Leave public so it can be used by test code + public static final String MVCC_IMPL = "hbase.mvcc.impl"; + + /** + * Creates instance of class configured as the implementation class for + * {@link ConsistencyControl}. + * + * @param config the configuration + * @return instance of ConsistencyControl + * @throws IllegalArgumentException + */ + public static ConsistencyControl getConsistencyControl(Configuration config) throws IllegalArgumentException { + try { + @SuppressWarnings("unchecked") + Class clazz = config.getClass(MVCC_IMPL, MultiVersionConsistencyControl.class); + return (ConsistencyControl) clazz.newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException("Consistency Control could not be created"); + } + } + + /** + * Gets a mvcc write number before an actual one (its log sequence id) is assigned. + * + * @param sequenceId sequence id from which the 'fake' write number will be generated + * @return long a faked write number which is bigger enough not to be seen by others before a real + * one is assigned + */ + public static long getPreAssignedWriteNumber(AtomicLong sequenceId) { + // the 1 billion is just an arbitrary big number to guard no scanner will reach it before + // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers + // because each handler could increment sequence num twice and max concurrent in-flight + // transactions is the number of RPC handlers. + // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple + // changes touch same row key + // If for any reason, the bumped value isn't reset due to failure situations, we'll reset + // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all + return sequenceId.incrementAndGet() + 1000000000; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8aed3a6..aa8bebd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -132,7 +132,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; -import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; @@ -343,8 +342,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L; final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); - private final ConcurrentHashMap scannerReadPoints; - /** * The sequence ID that was encountered when this region was opened. */ @@ -383,21 +380,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * read operation. */ public long getSmallestReadPoint() { - long minimumReadPoint; - // We need to ensure that while we are calculating the smallestReadPoint - // no new RegionScanners can grab a readPoint that we are unaware of. - // We achieve this by synchronizing on the scannerReadPoints object. - synchronized(scannerReadPoints) { - minimumReadPoint = mvcc.memstoreReadPoint(); - - for (Long readPoint: this.scannerReadPoints.values()) { - if (readPoint < minimumReadPoint) { - minimumReadPoint = readPoint; - } - } - } - return minimumReadPoint; + return mvcc.getSmallestReadPoint(); } + /* * Data structure of write state flags used coordinating flushes, * compactions and closes. @@ -541,8 +526,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // private boolean splitRequest; private byte[] explicitSplitPoint = null; - private final MultiVersionConsistencyControl mvcc = - new MultiVersionConsistencyControl(); + private final ConsistencyControl mvcc; // Coprocessor host private RegionCoprocessorHost coprocessorHost; @@ -608,7 +592,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (confParam instanceof CompoundConfiguration) { throw new IllegalArgumentException("Need original base configuration"); } - + mvcc = ConsistencyControlUtils.getConsistencyControl(confParam); this.comparator = fs.getRegionInfo().getComparator(); this.wal = wal; this.fs = fs; @@ -635,7 +619,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.rsServices = rsServices; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); setHTableSpecificConf(); - this.scannerReadPoints = new ConcurrentHashMap(); this.busyWaitDuration = conf.getLong( "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION); @@ -1122,19 +1105,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - public MultiVersionConsistencyControl getMVCC() { + public ConsistencyControl getMVCC() { return mvcc; } - /* + /** * Returns readpoint considering given IsolationLevel */ public long getReadpoint(IsolationLevel isolationLevel) { - if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { - // This scan can read even uncommitted transactions - return Long.MAX_VALUE; - } - return mvcc.memstoreReadPoint(); + return mvcc.getReadpoint(isolationLevel); } public boolean isLoadingCfsOnDemandDefault() { @@ -1820,7 +1799,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (this.memstoreSize.get() <= 0) { // Take an update lock because am about to change the sequence id and we want the sequence id // to be at the border of the empty memstore. - MultiVersionConsistencyControl.WriteEntry w = null; + WriteEntry w = null; this.updatesLock.writeLock().lock(); try { if (this.memstoreSize.get() <= 0) { @@ -1861,7 +1840,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received // during flush - MultiVersionConsistencyControl.WriteEntry w = null; + WriteEntry w = null; // We have to take an update lock during snapshot, or else a write could end up in both snapshot // and memstore (makes it difficult to do atomic rows then) @@ -2583,7 +2562,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; WALEdit walEdit = new WALEdit(isInReplay); - MultiVersionConsistencyControl.WriteEntry w = null; + WriteEntry w = null; long txid = 0; boolean doRollBackMemstore = false; boolean locked = false; @@ -2729,7 +2708,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if(isInReplay) { mvccNum = batchOp.getReplaySequenceId(); } else { - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + mvccNum = ConsistencyControlUtils.getPreAssignedWriteNumber(this.sequenceId); } // // ------------------------------------ @@ -4227,13 +4206,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - // synchronize on scannerReadPoints so that nobody calculates - // getSmallestReadPoint, before scannerReadPoints is updated. - IsolationLevel isolationLevel = scan.getIsolationLevel(); - synchronized(scannerReadPoints) { - this.readPt = getReadpoint(isolationLevel); - scannerReadPoints.put(this, this.readPt); - } + this.readPt = mvcc.addScan(this, scan); // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). @@ -4574,8 +4547,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // joinedHeap.close(); joinedHeap = null; } - // no need to synchronize here. - scannerReadPoints.remove(this); + mvcc.removeScan(this); this.filterClosed = true; } @@ -5385,7 +5357,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return; } - MultiVersionConsistencyControl.WriteEntry writeEntry = null; + WriteEntry writeEntry = null; boolean locked; boolean walSyncSuccessful = false; List acquiredRowLocks; @@ -5406,7 +5378,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); locked = true; // Get a mvcc write number - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + mvccNum = ConsistencyControlUtils.getPreAssignedWriteNumber(this.sequenceId); long now = EnvironmentEdgeManager.currentTime(); try { @@ -5612,7 +5584,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } // now start my own transaction - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + mvccNum = ConsistencyControlUtils.getPreAssignedWriteNumber(this.sequenceId); w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTime(); // Process each family @@ -5856,7 +5828,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (r != null) return r; } // Now start my own transaction - long mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + long mvccNum = ConsistencyControlUtils.getPreAssignedWriteNumber(this.sequenceId); writeEntry = this.mvcc.beginMemstoreInsertWithSeqNum(mvccNum); // Process increments a Store/family at a time. @@ -6092,7 +6064,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 42 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (12 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index 8b9f41b..3367b08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -20,27 +20,28 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.LinkedHashSet; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; /** - * Manages the read/write consistency within memstore. This provides - * an interface for readers to determine what entries to ignore, and - * a mechanism for writers to obtain new write numbers, then "commit" - * the new writes for readers to read (thus forming atomic transactions). + * Default implementation class for {@link ConsistencyControl}. */ @InterfaceAudience.Private -public class MultiVersionConsistencyControl { +public class MultiVersionConsistencyControl implements ConsistencyControl { static final long NO_WRITE_NUMBER = 0; private volatile long memstoreRead = 0; private final Object readWaiters = new Object(); + private final ConcurrentHashMap scannerReadPoints = + new ConcurrentHashMap(); // This is the pending queue of writes. private final LinkedHashSet writeQueue = - new LinkedHashSet(); + new LinkedHashSet(); /** * Default constructor. Initializes the memstoreRead/Write points to 0. @@ -48,10 +49,7 @@ public class MultiVersionConsistencyControl { public MultiVersionConsistencyControl() { } - /** - * Initializes the memstoreRead/Write points appropriately. - * @param startPoint - */ + @Override public void initialize(long startPoint) { synchronized (writeQueue) { writeQueue.clear(); @@ -59,36 +57,14 @@ public class MultiVersionConsistencyControl { } } - /** - * - * @param initVal The value we used initially and expected it'll be reset later - * @return WriteEntry instance. - */ - WriteEntry beginMemstoreInsert() { + @Override + public WriteEntry beginMemstoreInsert() { return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER); } /** - * Get a mvcc write number before an actual one(its log sequence Id) being assigned - * @param sequenceId - * @return long a faked write number which is bigger enough not to be seen by others before a real - * one is assigned - */ - public static long getPreAssignedWriteNumber(AtomicLong sequenceId) { - // the 1 billion is just an arbitrary big number to guard no scanner will reach it before - // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers - // because each handler could increment sequence num twice and max concurrent in-flight - // transactions is the number of RPC handlers. - // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple - // changes touch same row key - // If for any reason, the bumped value isn't reset due to failure situations, we'll reset - // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all - return sequenceId.incrementAndGet() + 1000000000; - } - - /** - * This function starts a MVCC transaction with current region's log change sequence number. Since - * we set change sequence number when flushing current change to WAL(late binding), the flush + * Starts a MVCC transaction with current region's log change sequence number. + * Since we set change sequence number when flushing current change to WAL(late binding), the flush * order may differ from the order to start a MVCC transaction. For example, a change begins a * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent @@ -96,9 +72,11 @@ public class MultiVersionConsistencyControl { * big number is safe because we only need it to prevent current change being seen and the number * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order * for MVCC to align with flush sequence. - * @param curSeqNum + * + * @param curSeqNum current region log change sequence number * @return WriteEntry a WriteEntry instance with the passed in curSeqNum */ + @Override public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { return beginMemstoreInsertWithSeqNum(curSeqNum, false); } @@ -114,13 +92,7 @@ public class MultiVersionConsistencyControl { } } - /** - * Complete a {@link WriteEntry} that was created by - * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read - * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is - * visible to MVCC readers. - * @throws IOException - */ + @Override public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId) throws IOException { if(e == null) return; @@ -134,11 +106,7 @@ public class MultiVersionConsistencyControl { waitForPreviousTransactionsComplete(e); } - /** - * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the - * end of this call, the global read point is at least as large as the write point of the passed - * in WriteEntry. Thus, the write is visible to MVCC readers. - */ + @Override public void completeMemstoreInsert(WriteEntry e) { waitForPreviousTransactionsComplete(e); } @@ -151,10 +119,11 @@ public class MultiVersionConsistencyControl { * Let S be the set of all write numbers that are completed and where all previous write numbers * are also completed. Then, the read point is advanced to the supremum of S. * - * @param e + * @param e the WriteEntry instance to be marked complete * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) */ - boolean advanceMemstore(WriteEntry e) { + @Override + public boolean advanceMemstore(WriteEntry e) { long nextReadValue = -1; synchronized (writeQueue) { e.markCompleted(); @@ -190,11 +159,8 @@ public class MultiVersionConsistencyControl { return false; } - /** - * Advances the current read point to be given seqNum if it is smaller than - * that. - */ - void advanceMemstoreReadPointIfNeeded(long seqNum) { + @Override + public void advanceMemstoreReadPointIfNeeded(long seqNum) { synchronized (writeQueue) { if (this.memstoreRead < seqNum) { memstoreRead = seqNum; @@ -202,14 +168,51 @@ public class MultiVersionConsistencyControl { } } - /** - * Wait for all previous MVCC transactions complete - */ public void waitForPreviousTransactionsComplete() { WriteEntry w = beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER, true); waitForPreviousTransactionsComplete(w); } + public long addScan(RegionScanner regionScanner, Scan scan) { + // synchronize on scannerReadPoints so that nobody calculates + // getSmallestReadPoint, before scannerReadPoints is updated. + IsolationLevel isolationLevel = scan.getIsolationLevel(); + synchronized(scannerReadPoints) { + long readPt = getReadpoint(isolationLevel); + scannerReadPoints.put(regionScanner, readPt); + return readPt; + } + } + + public void removeScan(RegionScanner regionScanner) { + scannerReadPoints.remove(regionScanner); + } + + public long getReadpoint(IsolationLevel isolationLevel) { + if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { + // This scan can read even uncommitted transactions + return Long.MAX_VALUE; + } + return memstoreReadPoint(); + } + + public long getSmallestReadPoint() { + long minimumReadPoint; + // We need to ensure that while we are calculating the smallestReadPoint + // no new RegionScanners can grab a readPoint that we are unaware of. + // We achieve this by synchronizing on the scannerReadPoints object. + synchronized(scannerReadPoints) { + minimumReadPoint = memstoreReadPoint(); + + for (Long readPoint: this.scannerReadPoints.values()) { + if (readPoint < minimumReadPoint) { + minimumReadPoint = readPoint; + } + } + } + return minimumReadPoint; + } + public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) { boolean interrupted = false; WriteEntry w = waitedEntry; @@ -253,27 +256,6 @@ public class MultiVersionConsistencyControl { return memstoreRead; } - public static class WriteEntry { - private long writeNumber; - private volatile boolean completed = false; - - WriteEntry(long writeNumber) { - this.writeNumber = writeNumber; - } - void markCompleted() { - this.completed = true; - } - boolean isCompleted() { - return this.completed; - } - long getWriteNumber() { - return this.writeNumber; - } - void setWriteNumber(long val){ - this.writeNumber = val; - } - } - public static final long FIXED_SIZE = ClassSize.align( ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WriteEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WriteEntry.java new file mode 100644 index 0000000..20979c0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WriteEntry.java @@ -0,0 +1,51 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * For use by implementors of {@link ConsistencyControl}. + */ +@InterfaceAudience.Private +class WriteEntry { + private long writeNumber; + private volatile boolean completed = false; + + WriteEntry(long writeNumber) { + this.writeNumber = writeNumber; + } + + void markCompleted() { + this.completed = true; + } + + boolean isCompleted() { + return this.completed; + } + + long getWriteNumber() { + return this.writeNumber; + } + + void setWriteNumber(long val) { + this.writeNumber = val; + } + +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 1b0af3e..505af70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -239,7 +239,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v = Bytes.toBytes("value"); - MultiVersionConsistencyControl.WriteEntry w = + WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv1 = new KeyValue(row, f, q1, v); @@ -283,7 +283,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v2 = Bytes.toBytes("value2"); // INSERT 1: Write both columns val1 - MultiVersionConsistencyControl.WriteEntry w = + WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); @@ -335,7 +335,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 - MultiVersionConsistencyControl.WriteEntry w = + WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); @@ -409,7 +409,7 @@ public class TestDefaultMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { - MultiVersionConsistencyControl.WriteEntry w = + WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); // Insert the sequence value (i) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java index e876a94..81ad627 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java @@ -48,7 +48,7 @@ public class TestMultiVersionConsistencyControl extends TestCase { public void run() { AtomicLong startPoint = new AtomicLong(); while (!finished.get()) { - MultiVersionConsistencyControl.WriteEntry e = + WriteEntry e = mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet()); // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) -- 2.3.2 (Apple Git-55)