diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 4cca2d4..800790c 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -284,15 +285,29 @@ public class KeyValue implements Cell, HeapSize, Cloneable { // used to achieve atomic operations in the memstore. @Override public long getMvccVersion() { + if (mvcc == null) { + return 0; + } + return mvcc.longValue(); + } + + public MutableLong getMvccVersionReference() { return mvcc; } public void setMvccVersion(long mvccVersion){ + if(this.mvcc == null){ + this.mvcc = new MutableLong(); + } + this.mvcc.setValue(mvccVersion); + } + + public void setMvccVersion(MutableLong mvccVersion){ this.mvcc = mvccVersion; } // multi-version concurrency control version. default value is 0, aka do not care. - private long mvcc = 0; // this value is not part of a serialized KeyValue (not in HFiles) + private MutableLong mvcc; // this value is not part of a serialized KeyValue (not in HFiles) /** Dragon time over, return to normal business */ @@ -2585,7 +2600,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { sum += ClassSize.align(ClassSize.ARRAY);// "bytes" sum += ClassSize.align(length);// number of bytes of data in the "bytes" array sum += 2 * Bytes.SIZEOF_INT;// offset, length - sum += Bytes.SIZEOF_LONG;// memstoreTS + sum += ClassSize.REFERENCE;// pointer to mvcc return ClassSize.align(sum); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index c0a4b51..93e0359 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -269,7 +269,7 @@ public class DefaultMemStore implements MemStore { assert alloc.getBytes() != null; alloc.put(0, kv.getBuffer(), kv.getOffset(), len); KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len); - newKv.setMvccVersion(kv.getMvccVersion()); + newKv.setMvccVersion(kv.getMvccVersionReference()); return newKv; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 90976e2..6d8e2c1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -814,10 +815,11 @@ public class HRegion implements HeapSize { // , Writable{ } } } - mvcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1); + mvcc.initialize(maxSeqId); return maxSeqId; } @@ -1712,57 +1714,64 @@ public class HRegion implements HeapSize { // , Writable{ status.setStatus("Preparing to flush by snapshotting stores"); List storeFlushCtxs = new ArrayList(stores.size()); long flushSeqId = -1L; - try { - // Record the mvcc for all transactions in progress. - w = mvcc.beginMemstoreInsert(); - mvcc.advanceMemstore(w); - // check if it is not closing. - if (wal != null) { - if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { - String msg = "Flush will not be started for [" - + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; - status.setStatus(msg); - return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); + String statusMsg = ""; + try{ + try { + // wait for all in-progress transactions to commit to HLog before + // we can start the flush. This prevents + // uncommitted transactions from being written into HFiles. + // We have to block before we start the flush, otherwise keys that + // were removed via a rollbackMemstore could be written to Hfiles. + w = mvcc.beginMemstoreInsert(this.sequenceId.incrementAndGet()); + + // check if it is not closing. + if (wal != null) { + if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { + String msg = "Flush will not be started for [" + + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; + status.setStatus(msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); + } + flushSeqId = this.sequenceId.incrementAndGet(); + } else { + // use the provided sequence Id as WAL is not being used for this flush. + flushSeqId = myseqid; } - flushSeqId = this.sequenceId.incrementAndGet(); - } else { - // use the provided sequence Id as WAL is not being used for this flush. - flushSeqId = myseqid; - } - - for (Store s : stores.values()) { - totalFlushableSize += s.getFlushableSize(); - storeFlushCtxs.add(s.createFlushContext(flushSeqId)); - } - - // prepare flush (take a snapshot) - for (StoreFlushContext flush : storeFlushCtxs) { - flush.prepare(); - } + + for (Store s : stores.values()) { + totalFlushableSize += s.getFlushableSize(); + storeFlushCtxs.add(s.createFlushContext(flushSeqId)); + } + + // prepare flush (take a snapshot) + for (StoreFlushContext flush : storeFlushCtxs) { + flush.prepare(); + } + } finally { + this.updatesLock.writeLock().unlock(); + } + statusMsg = "Finished memstore snapshotting " + this + + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; + status.setStatus(statusMsg); + if (LOG.isTraceEnabled()) LOG.trace(statusMsg); + + // sync unflushed WAL changes when deferred log sync is enabled + // see HBASE-8208 for details + if (wal != null && !shouldSyncLog()) { + wal.sync(); + } + + mvcc.waitForPreviousTransactoinsComplete(w); + w = null; } finally { - this.updatesLock.writeLock().unlock(); - } - String s = "Finished memstore snapshotting " + this + - ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; - status.setStatus(s); - if (LOG.isTraceEnabled()) LOG.trace(s); - - // sync unflushed WAL changes when deferred log sync is enabled - // see HBASE-8208 for details - if (wal != null && !shouldSyncLog()) { - wal.sync(); + if(w != null){ + mvcc.advanceMemstore(w); + } } - - // wait for all in-progress transactions to commit to HLog before - // we can start the flush. This prevents - // uncommitted transactions from being written into HFiles. - // We have to block before we start the flush, otherwise keys that - // were removed via a rollbackMemstore could be written to Hfiles. - mvcc.waitForRead(w); - - s = "Flushing stores of " + this; - status.setStatus(s); - if (LOG.isTraceEnabled()) LOG.trace(s); + + statusMsg = "Flushing stores of " + this; + status.setStatus(statusMsg); + if (LOG.isTraceEnabled()) LOG.trace(statusMsg); // Any failure from here on out will be catastrophic requiring server // restart so hlog content can be replayed and put back into the memstore. @@ -2328,6 +2337,7 @@ public class HRegion implements HeapSize { // , Writable{ int lastIndexExclusive = firstIndex; boolean success = false; int noOfPuts = 0, noOfDeletes = 0; + MutableLong seqNumber = new MutableLong(); try { // ------------------------------------ // STEP 1. Try to acquire as many locks as we can, and ensure @@ -2447,13 +2457,13 @@ public class HRegion implements HeapSize { // , Writable{ lock(this.updatesLock.readLock(), numReadyToWrite); locked = true; - + seqNumber.setValue(this.sequenceId.incrementAndGet()); // // ------------------------------------ // Acquire the latest mvcc number // ---------------------------------- - w = mvcc.beginMemstoreInsert(); - + w = mvcc.beginMemstoreInsertWithSeqNum(seqNumber.longValue()); + // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { MiniBatchOperationInProgress miniBatchOp = @@ -2560,6 +2570,10 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------- if (hasWalAppends) { syncOrDefer(txid, durability); + // Get LogSequenceNumber from WAL Sync + if(walEdit.getLogKey() != null) { + seqNumber.setValue(walEdit.getLogKey().getLogSeqNum()); + } } doRollBackMemstore = false; // calling the post CP hook for batch mutation @@ -2574,6 +2588,7 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ if (w != null) { + w.setWriteNumber(seqNumber.longValue()); mvcc.completeMemstoreInsert(w); w = null; } @@ -2606,7 +2621,10 @@ public class HRegion implements HeapSize { // , Writable{ if (doRollBackMemstore) { rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); } - if (w != null) mvcc.completeMemstoreInsert(w); + if (w != null) { + w.setWriteNumber(seqNumber.longValue()); + mvcc.completeMemstoreInsert(w); + } if (locked) { this.updatesLock.readLock().unlock(); @@ -2702,7 +2720,7 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row - note that doBatchMutate will relock this row if called RowLock rowLock = getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactoinsComplete(this.sequenceId.incrementAndGet()); List result; try { result = get(get, false); @@ -2925,7 +2943,7 @@ public class HRegion implements HeapSize { // , Writable{ try { if (localizedWriteEntry == null) { - localizedWriteEntry = mvcc.beginMemstoreInsert(); + localizedWriteEntry = mvcc.beginMemstoreInsert(this.sequenceId.incrementAndGet()); freemvcc = true; } @@ -4904,6 +4922,7 @@ public class HRegion implements HeapSize { // , Writable{ long addedSize = 0; List mutations = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); + MutableLong seqNumber = new MutableLong(); try { // 2. Acquire the row lock(s) acquiredRowLocks = new ArrayList(rowsToLock.size()); @@ -4914,6 +4933,7 @@ public class HRegion implements HeapSize { // , Writable{ // 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.size()); locked = true; + seqNumber.setValue(this.sequenceId.incrementAndGet()); long now = EnvironmentEdgeManager.currentTimeMillis(); try { @@ -4924,7 +4944,7 @@ public class HRegion implements HeapSize { // , Writable{ if (!mutations.isEmpty()) { // 5. Get a mvcc write number - writeEntry = mvcc.beginMemstoreInsert(); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(seqNumber.longValue()); // 6. Apply to memstore for (KeyValue kv : mutations) { kv.setMvccVersion(writeEntry.getWriteNumber()); @@ -4952,6 +4972,9 @@ public class HRegion implements HeapSize { // , Writable{ // 10. Sync edit log if (txid != 0) { syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); + if (walEdit.getLogKey() != null){ + seqNumber.setValue(walEdit.getLogKey().getLogSeqNum()); + } } walSyncSuccessful = true; } @@ -4966,6 +4989,7 @@ public class HRegion implements HeapSize { // , Writable{ } // 11. Roll mvcc forward if (writeEntry != null) { + writeEntry.setWriteNumber(seqNumber.longValue()); mvcc.completeMemstoreInsert(writeEntry); writeEntry = null; } @@ -5073,17 +5097,19 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row startRegionOperation(Operation.APPEND); this.writeRequestsCount.increment(); + MutableLong seqNumber = new MutableLong(); WriteEntry w = null; - RowLock rowLock; + RowLock rowLock = null; try { rowLock = getRowLock(row); try { lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactoinsComplete(this.sequenceId.incrementAndGet()); // now start my own transaction - w = mvcc.beginMemstoreInsert(); + seqNumber.setValue(this.sequenceId.incrementAndGet()); + w = mvcc.beginMemstoreInsertWithSeqNum(seqNumber.longValue()); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -5209,13 +5235,21 @@ public class HRegion implements HeapSize { // , Writable{ } } finally { rowLock.release(); + rowLock = null; } if (writeToWAL) { // sync the transaction log outside the rowlock syncOrDefer(txid, durability); + if(walEdits.getLogKey() != null){ + seqNumber.setValue(walEdits.getLogKey().getLogSeqNum()); + } } } finally { + if (rowLock != null) { + rowLock.release(); + } if (w != null) { + w.setWriteNumber(seqNumber.longValue()); mvcc.completeMemstoreInsert(w); } closeRegionOperation(Operation.APPEND); @@ -5264,16 +5298,19 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); + RowLock rowLock = null; WriteEntry w = null; + MutableLong seqNumber = new MutableLong(); try { - RowLock rowLock = getRowLock(row); + rowLock = getRowLock(row); try { lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactoinsComplete(this.sequenceId.incrementAndGet()); // now start my own transaction - w = mvcc.beginMemstoreInsert(); + seqNumber.setValue(this.sequenceId.incrementAndGet()); + w = mvcc.beginMemstoreInsertWithSeqNum(seqNumber.longValue()); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -5403,13 +5440,21 @@ public class HRegion implements HeapSize { // , Writable{ } } finally { rowLock.release(); + rowLock = null; } if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) { // sync the transaction log outside the rowlock syncOrDefer(txid, durability); + if(walEdits.getLogKey() != null){ + seqNumber.setValue(walEdits.getLogKey().getLogSeqNum()); + } } } finally { + if (rowLock != null) { + rowLock.release(); + } if (w != null) { + w.setWriteNumber(seqNumber.longValue()); mvcc.completeMemstoreInsert(w); } closeRegionOperation(Operation.INCREMENT); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index b46d55b..751ff0a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.util.LinkedList; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -32,9 +33,7 @@ import org.apache.hadoop.hbase.util.ClassSize; */ @InterfaceAudience.Private public class MultiVersionConsistencyControl { - private volatile long memstoreRead = 0; - private volatile long memstoreWrite = 0; - + private volatile long memstoreRead = 0; private final Object readWaiters = new Object(); // This is the pending queue of writes. @@ -45,7 +44,7 @@ public class MultiVersionConsistencyControl { * Default constructor. Initializes the memstoreRead/Write points to 0. */ public MultiVersionConsistencyControl() { - this.memstoreRead = this.memstoreWrite = 0; + memstoreRead = 0; } /** @@ -54,27 +53,39 @@ public class MultiVersionConsistencyControl { */ public void initialize(long startPoint) { synchronized (writeQueue) { - if (this.memstoreWrite != this.memstoreRead) { - throw new RuntimeException("Already used this mvcc. Too late to initialize"); - } - - this.memstoreRead = this.memstoreWrite = startPoint; + writeQueue.clear(); + memstoreRead = startPoint; } } - /** - * Generate and return a {@link WriteEntry} with a new write number. - * To complete the WriteEntry and wait for it to be visible, - * call {@link #completeMemstoreInsert(WriteEntry)}. - */ - public WriteEntry beginMemstoreInsert() { + WriteEntry beginMemstoreInsert(long baseVal) { + WriteEntry e = new WriteEntry(baseVal); synchronized (writeQueue) { - long nextWriteNumber = ++memstoreWrite; - WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); return e; } } + + /** + * This function starts a MVCC transaction with current region's log change sequence number. Since + * we set change sequence number when flushing current change to WAL(late binding), the flush + * order may differ from the order to start a MVCC transaction. For example, a change begins a + * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we + * add a safe bumper to the passed in sequence number to start a MVCC in order that no other + * concurrent transactions will reuse the number till current MVCC completes(success or fail). The + * "faked" big number is safe because we only need it to prevent current change being seen and the + * number will be reset to real sequence number(set in log sync) right before we complete a MVCC + * so that MVCC align with flush sequence. + * @param curSeqNum + * @return WriteEntry a WriteEntry instance with the passed in seqnum + */ + public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { + // the 1 billion is just an arbitrary big number to guard new region sequence number won't see + // this value before current MVCC completes. + // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple + // changes touch same row key + return beginMemstoreInsert(curSeqNum + 1000000000); + } /** * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. @@ -99,45 +110,39 @@ public class MultiVersionConsistencyControl { * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) */ boolean advanceMemstore(WriteEntry e) { + long nextReadValue = -1; synchronized (writeQueue) { e.markCompleted(); - long nextReadValue = -1; - boolean ranOnce=false; while (!writeQueue.isEmpty()) { - ranOnce=true; WriteEntry queueFirst = writeQueue.getFirst(); - - if (nextReadValue > 0) { - if (nextReadValue+1 != queueFirst.getWriteNumber()) { - throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " - + nextReadValue + " next: " + queueFirst.getWriteNumber()); - } - } - if (queueFirst.isCompleted()) { - nextReadValue = queueFirst.getWriteNumber(); + // Using Max because Edit complete in WAL sync order not arriving order + nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber().longValue()); writeQueue.removeFirst(); } else { break; } } - if (!ranOnce) { - throw new RuntimeException("never was a first"); + if (nextReadValue > memstoreRead) { + memstoreRead = nextReadValue; } - if (nextReadValue > 0) { - synchronized (readWaiters) { - memstoreRead = nextReadValue; - readWaiters.notifyAll(); - } - } - if (memstoreRead >= e.getWriteNumber()) { - return true; + // notify waiters on writeQueue before return + writeQueue.notifyAll(); + } + + if (nextReadValue > 0) { + synchronized (readWaiters) { + readWaiters.notifyAll(); } - return false; } + + if (memstoreRead >= e.getWriteNumber().longValue()) { + return true; + } + return false; } /** @@ -147,29 +152,75 @@ public class MultiVersionConsistencyControl { public void waitForRead(WriteEntry e) { boolean interrupted = false; synchronized (readWaiters) { - while (memstoreRead < e.getWriteNumber()) { + while (memstoreRead < e.getWriteNumber().longValue()) { try { readWaiters.wait(0); } catch (InterruptedException ie) { // We were interrupted... finish the loop -- i.e. cleanup --and then // on our way out, reset the interrupt flag. interrupted = true; + break; } } } if (interrupted) Thread.currentThread().interrupt(); } + /** + * Wait for all previous MVCC transactions complete + * @param curSeqNum + */ + public void waitForPreviousTransactoinsComplete(long curSeqNum) { + WriteEntry w = beginMemstoreInsert(curSeqNum); + waitForPreviousTransactoinsComplete(w); + } + + public void waitForPreviousTransactoinsComplete(WriteEntry waitedEntry) { + boolean interrupted = false; + WriteEntry w = waitedEntry; + + try { + WriteEntry firstEntry = null; + do { + synchronized (writeQueue) { + // writeQueue won't be empty at this point, the following is just a safety check + if (writeQueue.isEmpty()) { + break; + } + firstEntry = writeQueue.getFirst(); + if (firstEntry == w) { + // all previous in-flight transactions are done + break; + } + try { + writeQueue.wait(0); + } catch (InterruptedException ie) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; + break; + } + } + } while (firstEntry != null); + } finally { + if (w != null) { + advanceMemstore(w); + } + } + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + public long memstoreReadPoint() { return memstoreRead; } - public static class WriteEntry { - private long writeNumber; + private MutableLong writeNumber; private boolean completed = false; WriteEntry(long writeNumber) { - this.writeNumber = writeNumber; + this.writeNumber = new MutableLong(writeNumber); } void markCompleted() { this.completed = true; @@ -177,9 +228,12 @@ public class MultiVersionConsistencyControl { boolean isCompleted() { return this.completed; } - long getWriteNumber() { + MutableLong getWriteNumber() { return this.writeNumber; } + void setWriteNumber(long val){ + this.writeNumber.setValue(val); + } } public static final long FIXED_SIZE = ClassSize.align( diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index b876972..7403700 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -121,12 +121,6 @@ abstract class StoreFlusher { // set its memstoreTS to 0. This will help us save space when writing to // disk. KeyValue kv = KeyValueUtil.ensureKeyValue(c); - if (kv.getMvccVersion() <= smallestReadPoint) { - // let us not change the original KV. It could be in the memstore - // changing its memstoreTS could affect other threads/scanners. - kv = kv.shallowCopy(); - kv.setMvccVersion(0); - } sink.append(kv); } kvs.clear(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index fc9aa44..484e38b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -47,6 +47,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 0d65a54..71a9f7a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -44,6 +44,8 @@ class FSWALEntry extends HLog.Entry { final AtomicLong referenceToRegionSequenceId, final boolean inMemstore, final HTableDescriptor htd, final HRegionInfo hri) { super(key, edit); + //binding LogKey & WALEdit + edit.setLogKey(key); this.regionSequenceIdReference = referenceToRegionSequenceId; this.inMemstore = inMemstore; this.htd = htd; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 6e5d250..03450b9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -86,6 +86,8 @@ public class WALEdit implements Writable, HeapSize { private final boolean isReplay; private final ArrayList kvs = new ArrayList(1); + // reference to its related HLogKey instance + private HLogKey logKey; // Only here for legacy writable deserialization @Deprecated @@ -237,7 +239,8 @@ public class WALEdit implements Writable, HeapSize { public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("[#edits: " + kvs.size() + " = <"); + sb.append("[" + ((this.logKey != null) ? "seqnum=" + this.logKey.getLogSeqNum() + " ": "")); + sb.append("#edits: " + kvs.size() + " = <"); for (KeyValue kv : kvs) { sb.append(kv.toString()); sb.append("; "); @@ -249,6 +252,14 @@ public class WALEdit implements Writable, HeapSize { return sb.toString(); } + public void setLogKey(HLogKey logKey){ + this.logKey = logKey; + } + + public HLogKey getLogKey() { + return this.logKey; + } + /** * Create a compacion WALEdit * @param c diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index cccb3ff..d214bc6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -1101,7 +1101,7 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()), newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(), tags); // Preserve mvcc data - rewriteKv.setMvccVersion(newKv.getMvccVersion()); + rewriteKv.setMvccVersion(newKv.getMvccVersionReference()); return rewriteKv; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 98563d6..2e74281 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -68,7 +68,7 @@ public class TestMultiParallel { private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte [][] KEYS = makeKeys(); - private static final int slaves = 2; // also used for testing HTable pool size + private static final int slaves = 3; // also used for testing HTable pool size @BeforeClass public static void beforeClass() throws Exception { ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); @@ -238,7 +238,7 @@ public class TestMultiParallel { * * @throws Exception */ - @Test (timeout=300000) + @Test (timeout=360000) public void testFlushCommitsWithAbort() throws Exception { LOG.info("test=testFlushCommitsWithAbort"); doTestFlushCommits(true); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 011b9a4..6e0995e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -62,6 +63,7 @@ public class TestDefaultMemStore extends TestCase { private static final int QUALIFIER_COUNT = ROW_COUNT; private static final byte [] FAMILY = Bytes.toBytes("column"); private MultiVersionConsistencyControl mvcc; + private AtomicLong startSeqNum = new AtomicLong(0); @Override public void setUp() throws Exception { @@ -237,7 +239,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v = Bytes.toBytes("value"); MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv1 = new KeyValue(row, f, q1, v); kv1.setMvccVersion(w.getWriteNumber()); @@ -251,7 +253,7 @@ public class TestDefaultMemStore extends TestCase { s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1}); - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv2 = new KeyValue(row, f, q2, v); kv2.setMvccVersion(w.getWriteNumber()); memstore.add(kv2); @@ -281,7 +283,7 @@ public class TestDefaultMemStore extends TestCase { // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMvccVersion(w.getWriteNumber()); @@ -297,7 +299,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv21 = new KeyValue(row, f, q1, v2); kv21.setMvccVersion(w.getWriteNumber()); memstore.add(kv21); @@ -333,7 +335,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMvccVersion(w.getWriteNumber()); @@ -349,7 +351,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); kvDel.setMvccVersion(w.getWriteNumber()); @@ -378,6 +380,7 @@ public class TestDefaultMemStore extends TestCase { final MultiVersionConsistencyControl mvcc; final MemStore memstore; + final AtomicLong startSeqNum; AtomicReference caughtException; @@ -385,12 +388,14 @@ public class TestDefaultMemStore extends TestCase { public ReadOwnWritesTester(int id, MemStore memstore, MultiVersionConsistencyControl mvcc, - AtomicReference caughtException) + AtomicReference caughtException, + AtomicLong startSeqNum) { this.mvcc = mvcc; this.memstore = memstore; this.caughtException = caughtException; row = Bytes.toBytes(id); + this.startSeqNum = startSeqNum; } public void run() { @@ -404,7 +409,7 @@ public class TestDefaultMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); @@ -434,7 +439,7 @@ public class TestDefaultMemStore extends TestCase { AtomicReference caught = new AtomicReference(); for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught); + threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught, this.startSeqNum); threads[i].start(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java index 40fafd9..472e48c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java @@ -46,8 +46,10 @@ public class TestMultiVersionConsistencyControl extends TestCase { public boolean failed = false; public void run() { + AtomicLong startPoint = new AtomicLong(); while (!finished.get()) { - MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry e = + mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet()); // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) int sleepTime = rnd.nextInt(500);