diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 709e890..312c1ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -142,7 +142,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; -import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; @@ -585,8 +585,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private boolean splitRequest; private byte[] explicitSplitPoint = null; - private final MultiVersionConsistencyControl mvcc = - new MultiVersionConsistencyControl(); + private final MultiVersionConcurrencyControl mvcc = + new MultiVersionConcurrencyControl(); // Coprocessor host private RegionCoprocessorHost coprocessorHost; @@ -1254,7 +1254,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - public MultiVersionConsistencyControl getMVCC() { + public MultiVersionConcurrencyControl getMVCC() { return mvcc; } @@ -2083,7 +2083,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (this.memstoreSize.get() <= 0) { // Take an update lock because am about to change the sequence id and we want the sequence id // to be at the border of the empty memstore. - MultiVersionConsistencyControl.WriteEntry w = null; + MultiVersionConcurrencyControl.WriteEntry w = null; this.updatesLock.writeLock().lock(); try { if (this.memstoreSize.get() <= 0) { @@ -2139,7 +2139,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received // during flush - MultiVersionConsistencyControl.WriteEntry w = null; + MultiVersionConcurrencyControl.WriteEntry w = null; // We have to take an update lock during snapshot, or else a write could end up in both snapshot // and memstore (makes it difficult to do atomic rows then) status.setStatus("Obtaining lock to block concurrent updates"); @@ -2882,7 +2882,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; WALEdit walEdit = new WALEdit(isInReplay); - MultiVersionConsistencyControl.WriteEntry w = null; + MultiVersionConcurrencyControl.WriteEntry w = null; long txid = 0; boolean doRollBackMemstore = false; boolean locked = false; @@ -3029,7 +3029,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if(isInReplay) { mvccNum = batchOp.getReplaySequenceId(); } else { - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId); } // // ------------------------------------ @@ -6717,7 +6717,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return; } - MultiVersionConsistencyControl.WriteEntry writeEntry = null; + MultiVersionConcurrencyControl.WriteEntry writeEntry = null; boolean locked; boolean walSyncSuccessful = false; List acquiredRowLocks; @@ -6738,7 +6738,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); locked = true; // Get a mvcc write number - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId); long now = EnvironmentEdgeManager.currentTime(); try { @@ -6935,7 +6935,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } // now start my own transaction - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId); w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTime(); // Process each family @@ -7199,7 +7199,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } // now start my own transaction - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId); w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTime(); // Process each family @@ -7425,7 +7425,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WriteState.HEAP_SIZE + // writestate ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock - MultiVersionConsistencyControl.FIXED_SIZE // mvcc + MultiVersionConcurrencyControl.FIXED_SIZE // mvcc + ClassSize.TREEMAP // maxSeqIdInStores + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress ; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java new file mode 100644 index 0000000..028d81a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -0,0 +1,273 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +/** + * Manages the read/write consistency within memstore. This provides + * an interface for readers to determine what entries to ignore, and + * a mechanism for writers to obtain new write numbers, then "commit" + * the new writes for readers to read (thus forming atomic transactions). + */ +@InterfaceAudience.Private +public class MultiVersionConcurrencyControl { + private static final long NO_WRITE_NUMBER = 0; + private volatile long memstoreRead = 0; + private final Object readWaiters = new Object(); + + // This is the pending queue of writes. + private final LinkedList writeQueue = + new LinkedList(); + + /** + * Default constructor. Initializes the memstoreRead/Write points to 0. + */ + public MultiVersionConcurrencyControl() { + } + + /** + * Initializes the memstoreRead/Write points appropriately. + * @param startPoint + */ + public void initialize(long startPoint) { + synchronized (writeQueue) { + writeQueue.clear(); + memstoreRead = startPoint; + } + } + + /** + * + * @param initVal The value we used initially and expected it'll be reset later + * @return WriteEntry instance. + */ + WriteEntry beginMemstoreInsert() { + return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER); + } + + /** + * Get a mvcc write number before an actual one(its log sequence Id) being assigned + * @param sequenceId + * @return long a faked write number which is bigger enough not to be seen by others before a real + * one is assigned + */ + public static long getPreAssignedWriteNumber(AtomicLong sequenceId) { + // the 1 billion is just an arbitrary big number to guard no scanner will reach it before + // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers + // because each handler could increment sequence num twice and max concurrent in-flight + // transactions is the number of RPC handlers. + // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple + // changes touch same row key + // If for any reason, the bumped value isn't reset due to failure situations, we'll reset + // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all + return sequenceId.incrementAndGet() + 1000000000; + } + + /** + * This function starts a MVCC transaction with current region's log change sequence number. Since + * we set change sequence number when flushing current change to WAL(late binding), the flush + * order may differ from the order to start a MVCC transaction. For example, a change begins a + * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we + * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent + * transactions will reuse the number till current MVCC completes(success or fail). The "faked" + * big number is safe because we only need it to prevent current change being seen and the number + * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order + * for MVCC to align with flush sequence. + * @param curSeqNum + * @return WriteEntry a WriteEntry instance with the passed in curSeqNum + */ + public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { + WriteEntry e = new WriteEntry(curSeqNum); + synchronized (writeQueue) { + writeQueue.add(e); + return e; + } + } + + /** + * Complete a {@link WriteEntry} that was created by + * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read + * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is + * visible to MVCC readers. + * @throws IOException + */ + public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId) + throws IOException { + if(e == null) return; + if (seqId != null) { + e.setWriteNumber(seqId.getSequenceId()); + } else { + // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside + // function beginMemstoreInsertWithSeqNum in case of failures + e.setWriteNumber(NO_WRITE_NUMBER); + } + waitForPreviousTransactionsComplete(e); + } + + /** + * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the + * end of this call, the global read point is at least as large as the write point of the passed + * in WriteEntry. Thus, the write is visible to MVCC readers. + */ + public void completeMemstoreInsert(WriteEntry e) { + waitForPreviousTransactionsComplete(e); + } + + /** + * Mark the {@link WriteEntry} as complete and advance the read point as + * much as possible. + * + * How much is the read point advanced? + * Let S be the set of all write numbers that are completed and where all previous write numbers + * are also completed. Then, the read point is advanced to the supremum of S. + * + * @param e + * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) + */ + boolean advanceMemstore(WriteEntry e) { + long nextReadValue = -1; + synchronized (writeQueue) { + e.markCompleted(); + + while (!writeQueue.isEmpty()) { + WriteEntry queueFirst = writeQueue.getFirst(); + if (queueFirst.isCompleted()) { + // Using Max because Edit complete in WAL sync order not arriving order + nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber()); + writeQueue.removeFirst(); + } else { + break; + } + } + + if (nextReadValue > memstoreRead) { + memstoreRead = nextReadValue; + } + + // notify waiters on writeQueue before return + writeQueue.notifyAll(); + } + + if (nextReadValue > 0) { + synchronized (readWaiters) { + readWaiters.notifyAll(); + } + } + + if (memstoreRead >= e.getWriteNumber()) { + return true; + } + return false; + } + + /** + * Advances the current read point to be given seqNum if it is smaller than + * that. + */ + void advanceMemstoreReadPointIfNeeded(long seqNum) { + synchronized (writeQueue) { + if (this.memstoreRead < seqNum) { + memstoreRead = seqNum; + } + } + } + + /** + * Wait for all previous MVCC transactions complete + */ + public void waitForPreviousTransactionsComplete() { + WriteEntry w = beginMemstoreInsert(); + waitForPreviousTransactionsComplete(w); + } + + public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) { + boolean interrupted = false; + WriteEntry w = waitedEntry; + + try { + WriteEntry firstEntry = null; + do { + synchronized (writeQueue) { + // writeQueue won't be empty at this point, the following is just a safety check + if (writeQueue.isEmpty()) { + break; + } + firstEntry = writeQueue.getFirst(); + if (firstEntry == w) { + // all previous in-flight transactions are done + break; + } + try { + writeQueue.wait(0); + } catch (InterruptedException ie) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; + break; + } + } + } while (firstEntry != null); + } finally { + if (w != null) { + advanceMemstore(w); + } + } + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + + public long memstoreReadPoint() { + return memstoreRead; + } + + public static class WriteEntry { + private long writeNumber; + private volatile boolean completed = false; + + WriteEntry(long writeNumber) { + this.writeNumber = writeNumber; + } + void markCompleted() { + this.completed = true; + } + boolean isCompleted() { + return this.completed; + } + long getWriteNumber() { + return this.writeNumber; + } + void setWriteNumber(long val){ + this.writeNumber = val; + } + } + + public static final long FIXED_SIZE = ClassSize.align( + ClassSize.OBJECT + + 2 * Bytes.SIZEOF_LONG + + 2 * ClassSize.REFERENCE); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java deleted file mode 100644 index 96af2c3..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ /dev/null @@ -1,273 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; - -/** - * Manages the read/write consistency within memstore. This provides - * an interface for readers to determine what entries to ignore, and - * a mechanism for writers to obtain new write numbers, then "commit" - * the new writes for readers to read (thus forming atomic transactions). - */ -@InterfaceAudience.Private -public class MultiVersionConsistencyControl { - private static final long NO_WRITE_NUMBER = 0; - private volatile long memstoreRead = 0; - private final Object readWaiters = new Object(); - - // This is the pending queue of writes. - private final LinkedList writeQueue = - new LinkedList(); - - /** - * Default constructor. Initializes the memstoreRead/Write points to 0. - */ - public MultiVersionConsistencyControl() { - } - - /** - * Initializes the memstoreRead/Write points appropriately. - * @param startPoint - */ - public void initialize(long startPoint) { - synchronized (writeQueue) { - writeQueue.clear(); - memstoreRead = startPoint; - } - } - - /** - * - * @param initVal The value we used initially and expected it'll be reset later - * @return WriteEntry instance. - */ - WriteEntry beginMemstoreInsert() { - return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER); - } - - /** - * Get a mvcc write number before an actual one(its log sequence Id) being assigned - * @param sequenceId - * @return long a faked write number which is bigger enough not to be seen by others before a real - * one is assigned - */ - public static long getPreAssignedWriteNumber(AtomicLong sequenceId) { - // the 1 billion is just an arbitrary big number to guard no scanner will reach it before - // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers - // because each handler could increment sequence num twice and max concurrent in-flight - // transactions is the number of RPC handlers. - // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple - // changes touch same row key - // If for any reason, the bumped value isn't reset due to failure situations, we'll reset - // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all - return sequenceId.incrementAndGet() + 1000000000; - } - - /** - * This function starts a MVCC transaction with current region's log change sequence number. Since - * we set change sequence number when flushing current change to WAL(late binding), the flush - * order may differ from the order to start a MVCC transaction. For example, a change begins a - * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we - * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent - * transactions will reuse the number till current MVCC completes(success or fail). The "faked" - * big number is safe because we only need it to prevent current change being seen and the number - * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order - * for MVCC to align with flush sequence. - * @param curSeqNum - * @return WriteEntry a WriteEntry instance with the passed in curSeqNum - */ - public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { - WriteEntry e = new WriteEntry(curSeqNum); - synchronized (writeQueue) { - writeQueue.add(e); - return e; - } - } - - /** - * Complete a {@link WriteEntry} that was created by - * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read - * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is - * visible to MVCC readers. - * @throws IOException - */ - public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId) - throws IOException { - if(e == null) return; - if (seqId != null) { - e.setWriteNumber(seqId.getSequenceId()); - } else { - // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside - // function beginMemstoreInsertWithSeqNum in case of failures - e.setWriteNumber(NO_WRITE_NUMBER); - } - waitForPreviousTransactionsComplete(e); - } - - /** - * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the - * end of this call, the global read point is at least as large as the write point of the passed - * in WriteEntry. Thus, the write is visible to MVCC readers. - */ - public void completeMemstoreInsert(WriteEntry e) { - waitForPreviousTransactionsComplete(e); - } - - /** - * Mark the {@link WriteEntry} as complete and advance the read point as - * much as possible. - * - * How much is the read point advanced? - * Let S be the set of all write numbers that are completed and where all previous write numbers - * are also completed. Then, the read point is advanced to the supremum of S. - * - * @param e - * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) - */ - boolean advanceMemstore(WriteEntry e) { - long nextReadValue = -1; - synchronized (writeQueue) { - e.markCompleted(); - - while (!writeQueue.isEmpty()) { - WriteEntry queueFirst = writeQueue.getFirst(); - if (queueFirst.isCompleted()) { - // Using Max because Edit complete in WAL sync order not arriving order - nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber()); - writeQueue.removeFirst(); - } else { - break; - } - } - - if (nextReadValue > memstoreRead) { - memstoreRead = nextReadValue; - } - - // notify waiters on writeQueue before return - writeQueue.notifyAll(); - } - - if (nextReadValue > 0) { - synchronized (readWaiters) { - readWaiters.notifyAll(); - } - } - - if (memstoreRead >= e.getWriteNumber()) { - return true; - } - return false; - } - - /** - * Advances the current read point to be given seqNum if it is smaller than - * that. - */ - void advanceMemstoreReadPointIfNeeded(long seqNum) { - synchronized (writeQueue) { - if (this.memstoreRead < seqNum) { - memstoreRead = seqNum; - } - } - } - - /** - * Wait for all previous MVCC transactions complete - */ - public void waitForPreviousTransactionsComplete() { - WriteEntry w = beginMemstoreInsert(); - waitForPreviousTransactionsComplete(w); - } - - public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) { - boolean interrupted = false; - WriteEntry w = waitedEntry; - - try { - WriteEntry firstEntry = null; - do { - synchronized (writeQueue) { - // writeQueue won't be empty at this point, the following is just a safety check - if (writeQueue.isEmpty()) { - break; - } - firstEntry = writeQueue.getFirst(); - if (firstEntry == w) { - // all previous in-flight transactions are done - break; - } - try { - writeQueue.wait(0); - } catch (InterruptedException ie) { - // We were interrupted... finish the loop -- i.e. cleanup --and then - // on our way out, reset the interrupt flag. - interrupted = true; - break; - } - } - } while (firstEntry != null); - } finally { - if (w != null) { - advanceMemstore(w); - } - } - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - - public long memstoreReadPoint() { - return memstoreRead; - } - - public static class WriteEntry { - private long writeNumber; - private volatile boolean completed = false; - - WriteEntry(long writeNumber) { - this.writeNumber = writeNumber; - } - void markCompleted() { - this.completed = true; - } - boolean isCompleted() { - return this.completed; - } - long getWriteNumber() { - return this.writeNumber; - } - void setWriteNumber(long val){ - this.writeNumber = val; - } - } - - public static final long FIXED_SIZE = ClassSize.align( - ClassSize.OBJECT + - 2 * Bytes.SIZEOF_LONG + - 2 * ClassSize.REFERENCE); - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index 1bc6546..a3c6459 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -63,7 +63,7 @@ public interface RegionScanner extends InternalScanner { long getMaxResultSize(); /** - * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl} + * @return The Scanner's MVCC readPt see {@link MultiVersionConcurrencyControl} */ long getMvccReadPoint(); @@ -94,7 +94,7 @@ public interface RegionScanner extends InternalScanner { * close a region operation, an synchronize on the scanner object. Example: * HRegion region = ...; * RegionScanner scanner = ... - * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint()); + * MultiVersionConcurrencyControl.setThreadReadPoint(scanner.getMvccReadPoint()); * region.startRegionOperation(); * try { * synchronized(scanner) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index c59b62b..cffa90b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -3452,7 +3452,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, scan.getFamilyMap().get(store.getFamily().getName()), - // originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to set + // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set // readpoint 0. 0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index d0c3b91..08419ba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -66,13 +66,13 @@ public class TestDefaultMemStore extends TestCase { private static final int ROW_COUNT = 10; private static final int QUALIFIER_COUNT = ROW_COUNT; private static final byte [] FAMILY = Bytes.toBytes("column"); - private MultiVersionConsistencyControl mvcc; + private MultiVersionConcurrencyControl mvcc; private AtomicLong startSeqNum = new AtomicLong(0); @Override public void setUp() throws Exception { super.setUp(); - this.mvcc = new MultiVersionConsistencyControl(); + this.mvcc = new MultiVersionConcurrencyControl(); this.memstore = new DefaultMemStore(); } @@ -243,7 +243,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v = Bytes.toBytes("value"); - MultiVersionConsistencyControl.WriteEntry w = + MultiVersionConcurrencyControl.WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv1 = new KeyValue(row, f, q1, v); @@ -287,7 +287,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v2 = Bytes.toBytes("value2"); // INSERT 1: Write both columns val1 - MultiVersionConsistencyControl.WriteEntry w = + MultiVersionConcurrencyControl.WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); @@ -339,7 +339,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 - MultiVersionConsistencyControl.WriteEntry w = + MultiVersionConcurrencyControl.WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); @@ -383,7 +383,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] f = Bytes.toBytes("family"); final byte[] q1 = Bytes.toBytes("q1"); - final MultiVersionConsistencyControl mvcc; + final MultiVersionConcurrencyControl mvcc; final MemStore memstore; final AtomicLong startSeqNum; @@ -392,7 +392,7 @@ public class TestDefaultMemStore extends TestCase { public ReadOwnWritesTester(int id, MemStore memstore, - MultiVersionConsistencyControl mvcc, + MultiVersionConcurrencyControl mvcc, AtomicReference caughtException, AtomicLong startSeqNum) { @@ -413,7 +413,7 @@ public class TestDefaultMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { - MultiVersionConsistencyControl.WriteEntry w = + MultiVersionConcurrencyControl.WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); // Insert the sequence value (i) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java new file mode 100644 index 0000000..a816b23 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import junit.framework.TestCase; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.experimental.categories.Category; + +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This is a hammer test that verifies MultiVersionConcurrencyControl in a + * multiple writer single reader scenario. + */ +@Category(SmallTests.class) +public class TestMultiVersionConcurrencyControl extends TestCase { + static class Writer implements Runnable { + final AtomicBoolean finished; + final MultiVersionConcurrencyControl mvcc; + final AtomicBoolean status; + + Writer(AtomicBoolean finished, MultiVersionConcurrencyControl mvcc, AtomicBoolean status) { + this.finished = finished; + this.mvcc = mvcc; + this.status = status; + } + + private Random rnd = new Random(); + public boolean failed = false; + + public void run() { + AtomicLong startPoint = new AtomicLong(); + while (!finished.get()) { + MultiVersionConcurrencyControl.WriteEntry e = + mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet()); + // System.out.println("Begin write: " + e.getWriteNumber()); + // 10 usec - 500usec (including 0) + int sleepTime = rnd.nextInt(500); + // 500 * 1000 = 500,000ns = 500 usec + // 1 * 100 = 100ns = 1usec + try { + if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000); + } catch (InterruptedException e1) { + } + try { + mvcc.completeMemstoreInsert(e); + } catch (RuntimeException ex) { + // got failure + System.out.println(ex.toString()); + ex.printStackTrace(); + status.set(false); + return; + // Report failure if possible. + } + } + } + } + + public void testParallelism() throws Exception { + final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + + final AtomicBoolean finished = new AtomicBoolean(false); + + // fail flag for the reader thread + final AtomicBoolean readerFailed = new AtomicBoolean(false); + final AtomicLong failedAt = new AtomicLong(); + Runnable reader = new Runnable() { + public void run() { + long prev = mvcc.memstoreReadPoint(); + while (!finished.get()) { + long newPrev = mvcc.memstoreReadPoint(); + if (newPrev < prev) { + // serious problem. + System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev); + readerFailed.set(true); + // might as well give up + failedAt.set(newPrev); + return; + } + } + } + }; + + // writer thread parallelism. + int n = 20; + Thread[] writers = new Thread[n]; + AtomicBoolean[] statuses = new AtomicBoolean[n]; + Thread readThread = new Thread(reader); + + for (int i = 0; i < n; ++i) { + statuses[i] = new AtomicBoolean(true); + writers[i] = new Thread(new Writer(finished, mvcc, statuses[i])); + writers[i].start(); + } + readThread.start(); + + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException ex) { + } + + finished.set(true); + + readThread.join(); + for (int i = 0; i < n; ++i) { + writers[i].join(); + } + + // check failure. + assertFalse(readerFailed.get()); + for (int i = 0; i < n; ++i) { + assertTrue(statuses[i].get()); + } + + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java deleted file mode 100644 index e876a94..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import junit.framework.TestCase; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.experimental.categories.Category; - -import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -/** - * This is a hammer test that verifies MultiVersionConsistencyControl in a - * multiple writer single reader scenario. - */ -@Category(SmallTests.class) -public class TestMultiVersionConsistencyControl extends TestCase { - static class Writer implements Runnable { - final AtomicBoolean finished; - final MultiVersionConsistencyControl mvcc; - final AtomicBoolean status; - - Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) { - this.finished = finished; - this.mvcc = mvcc; - this.status = status; - } - - private Random rnd = new Random(); - public boolean failed = false; - - public void run() { - AtomicLong startPoint = new AtomicLong(); - while (!finished.get()) { - MultiVersionConsistencyControl.WriteEntry e = - mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet()); - // System.out.println("Begin write: " + e.getWriteNumber()); - // 10 usec - 500usec (including 0) - int sleepTime = rnd.nextInt(500); - // 500 * 1000 = 500,000ns = 500 usec - // 1 * 100 = 100ns = 1usec - try { - if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000); - } catch (InterruptedException e1) { - } - try { - mvcc.completeMemstoreInsert(e); - } catch (RuntimeException ex) { - // got failure - System.out.println(ex.toString()); - ex.printStackTrace(); - status.set(false); - return; - // Report failure if possible. - } - } - } - } - - public void testParallelism() throws Exception { - final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl(); - - final AtomicBoolean finished = new AtomicBoolean(false); - - // fail flag for the reader thread - final AtomicBoolean readerFailed = new AtomicBoolean(false); - final AtomicLong failedAt = new AtomicLong(); - Runnable reader = new Runnable() { - public void run() { - long prev = mvcc.memstoreReadPoint(); - while (!finished.get()) { - long newPrev = mvcc.memstoreReadPoint(); - if (newPrev < prev) { - // serious problem. - System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev); - readerFailed.set(true); - // might as well give up - failedAt.set(newPrev); - return; - } - } - } - }; - - // writer thread parallelism. - int n = 20; - Thread[] writers = new Thread[n]; - AtomicBoolean[] statuses = new AtomicBoolean[n]; - Thread readThread = new Thread(reader); - - for (int i = 0; i < n; ++i) { - statuses[i] = new AtomicBoolean(true); - writers[i] = new Thread(new Writer(finished, mvcc, statuses[i])); - writers[i].start(); - } - readThread.start(); - - try { - Thread.sleep(10 * 1000); - } catch (InterruptedException ex) { - } - - finished.set(true); - - readThread.join(); - for (int i = 0; i < n; ++i) { - writers[i].join(); - } - - // check failure. - assertFalse(readerFailed.get()); - for (int i = 0; i < n; ++i) { - assertTrue(statuses[i].get()); - } - - } - -} diff --git a/src/main/asciidoc/_chapters/architecture.adoc b/src/main/asciidoc/_chapters/architecture.adoc index bee1c16..0aac442 100644 --- a/src/main/asciidoc/_chapters/architecture.adoc +++ b/src/main/asciidoc/_chapters/architecture.adoc @@ -1504,7 +1504,7 @@ The minimum flush unit is per region, not at individual MemStore level. * The `RegionScanner` object contains a list of `StoreScanner` objects, one per column family. * Each `StoreScanner` object further contains a list of `StoreFileScanner` objects, corresponding to each StoreFile and HFile of the corresponding column family, and a list of `KeyValueScanner` objects for the MemStore. * The two lists are merged into one, which is sorted in ascending order with the scan object for the MemStore at the end of the list. -* When a `StoreFileScanner` object is constructed, it is associated with a `MultiVersionConsistencyControl` read point, which is the current `memstoreTS`, filtering out any new updates beyond the read point. +* When a `StoreFileScanner` object is constructed, it is associated with a `MultiVersionConcurrencyControl` read point, which is the current `memstoreTS`, filtering out any new updates beyond the read point. [[hfile]] ==== StoreFile (HFile)