diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 4cca2d4..5608603 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -284,15 +285,29 @@ public class KeyValue implements Cell, HeapSize, Cloneable { // used to achieve atomic operations in the memstore. @Override public long getMvccVersion() { + if (mvcc == null) { + return 0; + } + return mvcc.longValue(); + } + + public MutableLong getMvccVersionReference() { return mvcc; } public void setMvccVersion(long mvccVersion){ + if(this.mvcc == null){ + this.mvcc = new MutableLong(); + } + this.mvcc.setValue(mvccVersion); + } + + public void setMvccVersion(MutableLong mvccVersion){ this.mvcc = mvccVersion; } // multi-version concurrency control version. default value is 0, aka do not care. - private long mvcc = 0; // this value is not part of a serialized KeyValue (not in HFiles) + private MutableLong mvcc; // this value is not part of a serialized KeyValue (not in HFiles) /** Dragon time over, return to normal business */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index c0a4b51..93e0359 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -269,7 +269,7 @@ public class DefaultMemStore implements MemStore { assert alloc.getBytes() != null; alloc.put(0, kv.getBuffer(), kv.getOffset(), len); KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len); - newKv.setMvccVersion(kv.getMvccVersion()); + newKv.setMvccVersion(kv.getMvccVersionReference()); return newKv; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 90976e2..43866ce 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -814,10 +815,12 @@ public class HRegion implements HeapSize { // , Writable{ } } } - mvcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1); + mvcc.initialize(maxSeqId); + return maxSeqId; } @@ -1713,9 +1716,13 @@ public class HRegion implements HeapSize { // , Writable{ List storeFlushCtxs = new ArrayList(stores.size()); long flushSeqId = -1L; try { - // Record the mvcc for all transactions in progress. - w = mvcc.beginMemstoreInsert(); - mvcc.advanceMemstore(w); + // wait for all in-progress transactions to commit to HLog before + // we can start the flush. This prevents + // uncommitted transactions from being written into HFiles. + // We have to block before we start the flush, otherwise keys that + // were removed via a rollbackMemstore could be written to Hfiles. + mvcc.waitForPreviousTransactoinsComplete(this.sequenceId.incrementAndGet()); + // check if it is not closing. if (wal != null) { if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { @@ -1753,13 +1760,6 @@ public class HRegion implements HeapSize { // , Writable{ wal.sync(); } - // wait for all in-progress transactions to commit to HLog before - // we can start the flush. This prevents - // uncommitted transactions from being written into HFiles. - // We have to block before we start the flush, otherwise keys that - // were removed via a rollbackMemstore could be written to Hfiles. - mvcc.waitForRead(w); - s = "Flushing stores of " + this; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); @@ -2328,6 +2328,7 @@ public class HRegion implements HeapSize { // , Writable{ int lastIndexExclusive = firstIndex; boolean success = false; int noOfPuts = 0, noOfDeletes = 0; + MutableLong seqNumber = new MutableLong(); try { // ------------------------------------ // STEP 1. Try to acquire as many locks as we can, and ensure @@ -2447,13 +2448,13 @@ public class HRegion implements HeapSize { // , Writable{ lock(this.updatesLock.readLock(), numReadyToWrite); locked = true; - + seqNumber.setValue(this.sequenceId.incrementAndGet()); // // ------------------------------------ // Acquire the latest mvcc number // ---------------------------------- - w = mvcc.beginMemstoreInsert(); - + w = mvcc.beginMemstoreInsertWithSeqNum(seqNumber.longValue()); + // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { MiniBatchOperationInProgress miniBatchOp = @@ -2560,6 +2561,10 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------- if (hasWalAppends) { syncOrDefer(txid, durability); + // Get LogSequenceNumber from WAL Sync + if(walEdit.getLogKey() != null) { + seqNumber.setValue(walEdit.getLogKey().getLogSeqNum()); + } } doRollBackMemstore = false; // calling the post CP hook for batch mutation @@ -2574,6 +2579,7 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ if (w != null) { + w.setWriteNumber(seqNumber.longValue()); mvcc.completeMemstoreInsert(w); w = null; } @@ -2606,7 +2612,10 @@ public class HRegion implements HeapSize { // , Writable{ if (doRollBackMemstore) { rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); } - if (w != null) mvcc.completeMemstoreInsert(w); + if (w != null) { + w.setWriteNumber(seqNumber.longValue()); + mvcc.completeMemstoreInsert(w); + } if (locked) { this.updatesLock.readLock().unlock(); @@ -2702,7 +2711,7 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row - note that doBatchMutate will relock this row if called RowLock rowLock = getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactoinsComplete(this.sequenceId.incrementAndGet()); List result; try { result = get(get, false); @@ -2925,7 +2934,7 @@ public class HRegion implements HeapSize { // , Writable{ try { if (localizedWriteEntry == null) { - localizedWriteEntry = mvcc.beginMemstoreInsert(); + localizedWriteEntry = mvcc.beginMemstoreInsertWithSeqNum(this.sequenceId.incrementAndGet()); freemvcc = true; } @@ -4904,6 +4913,7 @@ public class HRegion implements HeapSize { // , Writable{ long addedSize = 0; List mutations = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); + MutableLong seqNumber = new MutableLong(); try { // 2. Acquire the row lock(s) acquiredRowLocks = new ArrayList(rowsToLock.size()); @@ -4914,6 +4924,7 @@ public class HRegion implements HeapSize { // , Writable{ // 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.size()); locked = true; + seqNumber.setValue(this.sequenceId.incrementAndGet()); long now = EnvironmentEdgeManager.currentTimeMillis(); try { @@ -4924,7 +4935,7 @@ public class HRegion implements HeapSize { // , Writable{ if (!mutations.isEmpty()) { // 5. Get a mvcc write number - writeEntry = mvcc.beginMemstoreInsert(); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(seqNumber.longValue()); // 6. Apply to memstore for (KeyValue kv : mutations) { kv.setMvccVersion(writeEntry.getWriteNumber()); @@ -4952,6 +4963,9 @@ public class HRegion implements HeapSize { // , Writable{ // 10. Sync edit log if (txid != 0) { syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); + if (walEdit.getLogKey() != null){ + seqNumber.setValue(walEdit.getLogKey().getLogSeqNum()); + } } walSyncSuccessful = true; } @@ -4966,6 +4980,7 @@ public class HRegion implements HeapSize { // , Writable{ } // 11. Roll mvcc forward if (writeEntry != null) { + writeEntry.setWriteNumber(seqNumber.longValue()); mvcc.completeMemstoreInsert(writeEntry); writeEntry = null; } @@ -5073,17 +5088,19 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row startRegionOperation(Operation.APPEND); this.writeRequestsCount.increment(); + MutableLong seqNumber = new MutableLong(); WriteEntry w = null; - RowLock rowLock; + RowLock rowLock = null; try { rowLock = getRowLock(row); try { lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactoinsComplete(this.sequenceId.incrementAndGet()); // now start my own transaction - w = mvcc.beginMemstoreInsert(); + seqNumber.setValue(this.sequenceId.incrementAndGet()); + w = mvcc.beginMemstoreInsertWithSeqNum(seqNumber.longValue()); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -5209,13 +5226,21 @@ public class HRegion implements HeapSize { // , Writable{ } } finally { rowLock.release(); + rowLock = null; } if (writeToWAL) { // sync the transaction log outside the rowlock syncOrDefer(txid, durability); + if(walEdits.getLogKey() != null){ + seqNumber.setValue(walEdits.getLogKey().getLogSeqNum()); + } } } finally { + if (rowLock != null) { + rowLock.release(); + } if (w != null) { + w.setWriteNumber(seqNumber.longValue()); mvcc.completeMemstoreInsert(w); } closeRegionOperation(Operation.APPEND); @@ -5264,16 +5289,19 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); + RowLock rowLock = null; WriteEntry w = null; + MutableLong seqNumber = new MutableLong(); try { - RowLock rowLock = getRowLock(row); + rowLock = getRowLock(row); try { lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactoinsComplete(this.sequenceId.incrementAndGet()); // now start my own transaction - w = mvcc.beginMemstoreInsert(); + seqNumber.setValue(this.sequenceId.incrementAndGet()); + w = mvcc.beginMemstoreInsertWithSeqNum(seqNumber.longValue()); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -5403,13 +5431,21 @@ public class HRegion implements HeapSize { // , Writable{ } } finally { rowLock.release(); + rowLock = null; } if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) { // sync the transaction log outside the rowlock syncOrDefer(txid, durability); + if(walEdits.getLogKey() != null){ + seqNumber.setValue(walEdits.getLogKey().getLogSeqNum()); + } } } finally { + if (rowLock != null) { + rowLock.release(); + } if (w != null) { + w.setWriteNumber(seqNumber.longValue()); mvcc.completeMemstoreInsert(w); } closeRegionOperation(Operation.INCREMENT); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index b46d55b..6d61443 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.util.LinkedList; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -32,9 +33,7 @@ import org.apache.hadoop.hbase.util.ClassSize; */ @InterfaceAudience.Private public class MultiVersionConsistencyControl { - private volatile long memstoreRead = 0; - private volatile long memstoreWrite = 0; - + private volatile long memstoreRead = 0; private final Object readWaiters = new Object(); // This is the pending queue of writes. @@ -45,7 +44,7 @@ public class MultiVersionConsistencyControl { * Default constructor. Initializes the memstoreRead/Write points to 0. */ public MultiVersionConsistencyControl() { - this.memstoreRead = this.memstoreWrite = 0; + memstoreRead = 0; } /** @@ -54,27 +53,39 @@ public class MultiVersionConsistencyControl { */ public void initialize(long startPoint) { synchronized (writeQueue) { - if (this.memstoreWrite != this.memstoreRead) { - throw new RuntimeException("Already used this mvcc. Too late to initialize"); - } - - this.memstoreRead = this.memstoreWrite = startPoint; + writeQueue.clear(); + memstoreRead = startPoint; } } - /** - * Generate and return a {@link WriteEntry} with a new write number. - * To complete the WriteEntry and wait for it to be visible, - * call {@link #completeMemstoreInsert(WriteEntry)}. - */ - public WriteEntry beginMemstoreInsert() { + private WriteEntry beginMemstoreInsert(long baseVal) { + WriteEntry e = new WriteEntry(baseVal); synchronized (writeQueue) { - long nextWriteNumber = ++memstoreWrite; - WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); return e; } } + + /** + * This function starts a MVCC transaction with current region's log change sequence number. Since + * we set change sequence number when flushing current change to WAL(late binding), the flush + * order may differ from the order to start a MVCC transaction. For example, a change begins a + * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we + * add a safe bumper to the passed in sequence number to start a MVCC in order that no other + * concurrent transactions will reuse the number till current MVCC completes(success or fail). The + * "faked" big number is safe because we only need it to prevent current change being seen and the + * number will be reset to real sequence number(set in log sync) right before we complete a MVCC + * so that MVCC align with flush sequence. + * @param curSeqNum + * @return + */ + public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { + // the 1 billion is just an arbitrary big number to guard new region sequence number won't see + // this value before current MVCC completes. + // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple + // changes touch same row key + return beginMemstoreInsert(curSeqNum + 1000000000); + } /** * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. @@ -103,37 +114,26 @@ public class MultiVersionConsistencyControl { e.markCompleted(); long nextReadValue = -1; - boolean ranOnce=false; while (!writeQueue.isEmpty()) { - ranOnce=true; WriteEntry queueFirst = writeQueue.getFirst(); - - if (nextReadValue > 0) { - if (nextReadValue+1 != queueFirst.getWriteNumber()) { - throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " - + nextReadValue + " next: " + queueFirst.getWriteNumber()); - } - } - if (queueFirst.isCompleted()) { - nextReadValue = queueFirst.getWriteNumber(); + nextReadValue = queueFirst.getWriteNumber().longValue(); writeQueue.removeFirst(); } else { break; } } - if (!ranOnce) { - throw new RuntimeException("never was a first"); - } - if (nextReadValue > 0) { synchronized (readWaiters) { - memstoreRead = nextReadValue; + if (nextReadValue > memstoreRead) { + memstoreRead = nextReadValue; + } readWaiters.notifyAll(); } } - if (memstoreRead >= e.getWriteNumber()) { + + if (memstoreRead >= e.getWriteNumber().longValue()) { return true; } return false; @@ -147,7 +147,7 @@ public class MultiVersionConsistencyControl { public void waitForRead(WriteEntry e) { boolean interrupted = false; synchronized (readWaiters) { - while (memstoreRead < e.getWriteNumber()) { + while (memstoreRead < e.getWriteNumber().longValue()) { try { readWaiters.wait(0); } catch (InterruptedException ie) { @@ -160,16 +160,56 @@ public class MultiVersionConsistencyControl { if (interrupted) Thread.currentThread().interrupt(); } + /** + * Wait for all previous MVCC transactions complete + * @param curSeqNum + * @return + */ + public WriteEntry waitForPreviousTransactoinsComplete(long curSeqNum) { + boolean interrupted = false; + WriteEntry w = beginMemstoreInsert(curSeqNum); + + WriteEntry firstEntry = null; + do { + synchronized (writeQueue) { + // writeQueue won't be empty at this point, the following is just a safety check + if (writeQueue.isEmpty()) { + break; + } + firstEntry = writeQueue.getFirst(); + } + if (firstEntry == w) { + // all previous in-flight transactions are done + break; + } + synchronized (readWaiters) { + try { + readWaiters.wait(0); + } catch (InterruptedException ie) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; + } + } + } while (firstEntry != null); + + advanceMemstore(w); + + if (interrupted) { + Thread.currentThread().interrupt(); + } + return w; + } + public long memstoreReadPoint() { return memstoreRead; } - public static class WriteEntry { - private long writeNumber; + private MutableLong writeNumber; private boolean completed = false; WriteEntry(long writeNumber) { - this.writeNumber = writeNumber; + this.writeNumber = new MutableLong(writeNumber); } void markCompleted() { this.completed = true; @@ -177,9 +217,12 @@ public class MultiVersionConsistencyControl { boolean isCompleted() { return this.completed; } - long getWriteNumber() { + MutableLong getWriteNumber() { return this.writeNumber; } + void setWriteNumber(long val){ + this.writeNumber.setValue(val); + } } public static final long FIXED_SIZE = ClassSize.align( diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index b876972..7403700 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -121,12 +121,6 @@ abstract class StoreFlusher { // set its memstoreTS to 0. This will help us save space when writing to // disk. KeyValue kv = KeyValueUtil.ensureKeyValue(c); - if (kv.getMvccVersion() <= smallestReadPoint) { - // let us not change the original KV. It could be in the memstore - // changing its memstoreTS could affect other threads/scanners. - kv = kv.shallowCopy(); - kv.setMvccVersion(0); - } sink.append(kv); } kvs.clear(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index fc9aa44..484e38b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -47,6 +47,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 0d65a54..71a9f7a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -44,6 +44,8 @@ class FSWALEntry extends HLog.Entry { final AtomicLong referenceToRegionSequenceId, final boolean inMemstore, final HTableDescriptor htd, final HRegionInfo hri) { super(key, edit); + //binding LogKey & WALEdit + edit.setLogKey(key); this.regionSequenceIdReference = referenceToRegionSequenceId; this.inMemstore = inMemstore; this.htd = htd; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 0917c8b..5782ba7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -27,6 +27,8 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -335,7 +337,8 @@ public interface HLog { */ long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId, - boolean isInMemstore, long nonceGroup, long nonce) throws IOException; + boolean isInMemstore, long nonceGroup, long nonce) + throws IOException; // TODO: Do we need all these versions of sync? void hsync() throws IOException; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 9825fee..d52f321 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 6e5d250..03450b9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -86,6 +86,8 @@ public class WALEdit implements Writable, HeapSize { private final boolean isReplay; private final ArrayList kvs = new ArrayList(1); + // reference to its related HLogKey instance + private HLogKey logKey; // Only here for legacy writable deserialization @Deprecated @@ -237,7 +239,8 @@ public class WALEdit implements Writable, HeapSize { public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("[#edits: " + kvs.size() + " = <"); + sb.append("[" + ((this.logKey != null) ? "seqnum=" + this.logKey.getLogSeqNum() + " ": "")); + sb.append("#edits: " + kvs.size() + " = <"); for (KeyValue kv : kvs) { sb.append(kv.toString()); sb.append("; "); @@ -249,6 +252,14 @@ public class WALEdit implements Writable, HeapSize { return sb.toString(); } + public void setLogKey(HLogKey logKey){ + this.logKey = logKey; + } + + public HLogKey getLogKey() { + return this.logKey; + } + /** * Create a compacion WALEdit * @param c diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index cccb3ff..d214bc6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -1101,7 +1101,7 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()), newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(), tags); // Preserve mvcc data - rewriteKv.setMvccVersion(newKv.getMvccVersion()); + rewriteKv.setMvccVersion(newKv.getMvccVersionReference()); return rewriteKv; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 011b9a4..6e0995e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -62,6 +63,7 @@ public class TestDefaultMemStore extends TestCase { private static final int QUALIFIER_COUNT = ROW_COUNT; private static final byte [] FAMILY = Bytes.toBytes("column"); private MultiVersionConsistencyControl mvcc; + private AtomicLong startSeqNum = new AtomicLong(0); @Override public void setUp() throws Exception { @@ -237,7 +239,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v = Bytes.toBytes("value"); MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv1 = new KeyValue(row, f, q1, v); kv1.setMvccVersion(w.getWriteNumber()); @@ -251,7 +253,7 @@ public class TestDefaultMemStore extends TestCase { s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1}); - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv2 = new KeyValue(row, f, q2, v); kv2.setMvccVersion(w.getWriteNumber()); memstore.add(kv2); @@ -281,7 +283,7 @@ public class TestDefaultMemStore extends TestCase { // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMvccVersion(w.getWriteNumber()); @@ -297,7 +299,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv21 = new KeyValue(row, f, q1, v2); kv21.setMvccVersion(w.getWriteNumber()); memstore.add(kv21); @@ -333,7 +335,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMvccVersion(w.getWriteNumber()); @@ -349,7 +351,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); kvDel.setMvccVersion(w.getWriteNumber()); @@ -378,6 +380,7 @@ public class TestDefaultMemStore extends TestCase { final MultiVersionConsistencyControl mvcc; final MemStore memstore; + final AtomicLong startSeqNum; AtomicReference caughtException; @@ -385,12 +388,14 @@ public class TestDefaultMemStore extends TestCase { public ReadOwnWritesTester(int id, MemStore memstore, MultiVersionConsistencyControl mvcc, - AtomicReference caughtException) + AtomicReference caughtException, + AtomicLong startSeqNum) { this.mvcc = mvcc; this.memstore = memstore; this.caughtException = caughtException; row = Bytes.toBytes(id); + this.startSeqNum = startSeqNum; } public void run() { @@ -404,7 +409,7 @@ public class TestDefaultMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); @@ -434,7 +439,7 @@ public class TestDefaultMemStore extends TestCase { AtomicReference caught = new AtomicReference(); for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught); + threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught, this.startSeqNum); threads[i].start(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java index 40fafd9..472e48c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java @@ -46,8 +46,10 @@ public class TestMultiVersionConsistencyControl extends TestCase { public boolean failed = false; public void run() { + AtomicLong startPoint = new AtomicLong(); while (!finished.get()) { - MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry e = + mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet()); // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) int sleepTime = rnd.nextInt(500);