diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b616176..e2ae36c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -666,9 +666,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final Durability durability;
private final boolean regionStatsEnabled;
- // flag and lock for MVCC preassign
- private final boolean mvccPreAssign;
- private final ReentrantLock preAssignMvccLock;
/**
* HRegion constructor. This constructor should only be used for testing and
@@ -819,14 +816,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
false :
conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
-
- // get mvcc pre-assign flag and lock
- this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN);
- if (this.mvccPreAssign) {
- this.preAssignMvccLock = new ReentrantLock();
- } else {
- this.preAssignMvccLock = null;
- }
}
void setHTableSpecificConf() {
@@ -2669,9 +2658,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// a timeout. May happen in tests after we tightened the semantic via HBASE-14317.
// Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches
// so if an abort or stop, there is no way to call them in.
- WALKey key = this.appendEmptyEdit(wal, null);
+ WALKey key = this.appendEmptyEdit(wal);
mvcc.complete(key.getWriteEntry());
- return key.getSequenceId(this.maxWaitForSeqId);
+ return key.getSequenceId();
}
//////////////////////////////////////////////////////////////////////////////
@@ -3413,29 +3402,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
}
} else {
- try {
- if (mvccPreAssign) {
- preAssignMvccLock.lock();
- writeEntry = mvcc.begin();
- }
- if (walEdit.size() > 0) {
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
- mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
- if (mvccPreAssign) {
- walKey.setPreAssignedWriteEntry(writeEntry);
- }
- txid =
- this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
- } else {
- // If this is a skip wal operation just get the read point from mvcc
- walKey = this.appendEmptyEdit(this.wal, writeEntry);
- }
- } finally {
- if (mvccPreAssign) {
- preAssignMvccLock.unlock();
- }
+ if (walEdit.size() > 0) {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
+ mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
+ txid = this.wal
+ .append(this.htableDescriptor, this.getRegionInfo(), walKey,
+ walEdit, true);
+ } else {
+ walKey = appendEmptyEdit(wal);
}
}
// ------------------------------------
@@ -3473,7 +3449,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// before apply to memstore to avoid scan return incorrect value.
// we use durability of the original mutation for the mutation passed by CP.
boolean updateSeqId = isInReplay
- || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL || mvccPreAssign;
+ || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
if (updateSeqId) {
updateSequenceId(familyMaps[i].values(), mvccNum);
}
@@ -7412,7 +7388,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if(walKey == null){
// since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
// to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
- walKey = this.appendEmptyEdit(this.wal, null);
+ walKey = this.appendEmptyEdit(this.wal);
}
// 7. Start mvcc transaction
@@ -7737,7 +7713,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean updateSeqId = false;
if (walKey == null) {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
- walKey = this.appendEmptyEdit(this.wal, null);
+ walKey = this.appendEmptyEdit(this.wal);
// If no WAL, FSWALEntry won't be used and no update for sequence id
updateSeqId = true;
}
@@ -7974,7 +7950,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true);
} else {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
- walKey = this.appendEmptyEdit(this.wal, null);
+ walKey = this.appendEmptyEdit(this.wal);
// If no WAL, FSWALEntry won't be used and no update for sequence id
updateSeqId = true;
}
@@ -8204,9 +8180,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 47 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+ 46 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
- 6 * Bytes.SIZEOF_BOOLEAN);
+ 5 * Bytes.SIZEOF_BOOLEAN);
// woefully out of date - currently missing:
// 1 x HashMap - coprocessorServiceHandlers
@@ -8792,16 +8768,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return Return the key used appending with no sync and no append.
* @throws IOException
*/
- private WALKey appendEmptyEdit(final WAL wal, WriteEntry writeEntry) throws IOException {
+ private WALKey appendEmptyEdit(final WAL wal) throws IOException {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
@SuppressWarnings("deprecation")
WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
- if (writeEntry != null) {
- key.setPreAssignedWriteEntry(writeEntry);
- }
-
// Call append but with an empty WALEdit. The returned sequence id will not be associated
// with any edit and we can be sure it went in after all outstanding appends.
try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index d65af00..57d6356 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -18,12 +18,12 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.annotations.VisibleForTesting;
-
-import com.google.common.base.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -111,20 +111,34 @@ public class MultiVersionConcurrencyControl {
}
/**
+ * Call {@link #begin(Runnable)} with an empty {@link Runnable}.
+ */
+ public WriteEntry begin() {
+ return begin(new Runnable() {
+ @Override public void run() {
+
+ }
+ });
+ }
+
+ /**
* Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
- * to our queue of ongoing writes. Return this WriteEntry instance.
- * To complete the write transaction and wait for it to be visible, call
- * {@link #completeAndWait(WriteEntry)}. If the write failed, call
- * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write
- * transaction.
+ * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write
+ * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the
+ * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of
+ * the failed write transaction.
+ *
+ * The {@code action} will be executed under the lock which means it can keep the same order with
+ * mvcc.
* @see #complete(WriteEntry)
* @see #completeAndWait(WriteEntry)
*/
- public WriteEntry begin() {
+ public WriteEntry begin(Runnable action) {
synchronized (writeQueue) {
long nextWriteNumber = writePoint.incrementAndGet();
WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
+ action.run();
return e;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index d3f302a..8d97b64 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
+import com.lmax.disruptor.*;
+import org.apache.commons.lang.mutable.MutableLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -65,6 +67,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -89,11 +92,6 @@ import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.ExceptionHandler;
-import com.lmax.disruptor.LifecycleAware;
-import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
@@ -1112,21 +1110,22 @@ public class FSHLog implements WAL {
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
// single consuming thread. Don't have to worry about it.
TraceScope scope = Trace.startSpan("FSHLog.append");
-
- // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need
- // all this to make a key and then below to append the edit, we need to carry htd, info,
- // etc. all over the ring buffer.
- FSWALEntry entry = null;
- long sequence = this.disruptor.getRingBuffer().next();
+ final MutableLong txidHolder = new MutableLong();
+ final RingBuffer ringBuffer = disruptor.getRingBuffer();
+ MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(new Runnable() {
+ @Override public void run() {
+ txidHolder.setValue(ringBuffer.next());
+ }
+ });
+ long txid = txidHolder.longValue();
try {
- RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
- // TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
- entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
- truck.loadPayload(entry, scope.detach());
+ FSWALEntry entry = new FSWALEntry(txid, key, edits, htd, hri, inMemstore);
+ entry.stampRegionSequenceId(we);
+ ringBuffer.get(txid).loadPayload(entry, scope.detach());
} finally {
- this.disruptor.getRingBuffer().publish(sequence);
+ ringBuffer.publish(txid);
}
- return sequence;
+ return txid;
}
/**
@@ -1814,12 +1813,6 @@ public class FSHLog implements WAL {
try {
FSWALEntry entry = truck.unloadFSWALEntryPayload();
if (this.exception != null) {
- // We got an exception on an earlier attempt at append. Do not let this append
- // go through. Fail it but stamp the sequenceid into this append though failed.
- // We need to do this to close the latch held down deep in WALKey...that is waiting
- // on sequenceid assignment otherwise it will just hang out (The #append method
- // called below does this also internally).
- entry.stampRegionSequenceId();
// Return to keep processing events coming off the ringbuffer
return;
}
@@ -1940,10 +1933,8 @@ public class FSHLog implements WAL {
byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
long regionSequenceId = WALKey.NO_SEQUENCE_ID;
try {
- // We are about to append this edit; update the region-scoped sequence number. Do it
- // here inside this single appending/writing thread. Events are ordered on the ringbuffer
- // so region sequenceids will also be in order.
- regionSequenceId = entry.stampRegionSequenceId();
+
+ regionSequenceId = entry.getKey().getSequenceId();
// Edits are empty, there is nothing to append. Maybe empty when we are looking for a
// region sequence id only, a region edit/sequence id that is not associated with an actual
// edit. It has to go through all the rigmarole to be sure we have the right ordering.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index f55e185..69d1c59 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -106,34 +106,19 @@ class FSWALEntry extends Entry {
/**
* Here is where a WAL edit gets its sequenceid.
+ * @param we after HBASE-17471 we already get the mvcc number
+ * in WriteEntry, just stamp the writenumber to cells and walkey
* @return The sequenceid we stamped on this edit.
* @throws IOException
*/
- long stampRegionSequenceId() throws IOException {
- long regionSequenceId = WALKey.NO_SEQUENCE_ID;
- WALKey key = getKey();
- MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry();
- boolean preAssigned = (we != null);
- if (!preAssigned) {
- MultiVersionConcurrencyControl mvcc = key.getMvcc();
- if (mvcc != null) {
- we = mvcc.begin();
- }
- }
- if (we != null) {
- regionSequenceId = we.getWriteNumber();
- }
-
+ long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws IOException {
+ long regionSequenceId = we.getWriteNumber();
if (!this.getEdit().isReplay() && inMemstore) {
- for (Cell c:getEdit().getCells()) {
+ for (Cell c : getEdit().getCells()) {
CellUtil.setSequenceId(c, regionSequenceId);
}
}
-
- // This has to stay in this order
- if (!preAssigned) {
- key.setWriteEntry(we);
- }
+ getKey().setWriteEntry(we);
return regionSequenceId;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 585c8f4..01420d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -95,37 +95,16 @@ public class WALKey implements SequenceId, Comparable {
*/
@InterfaceAudience.Private // For internal use only.
public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
- if (this.preAssignedWriteEntry != null) {
- // don't wait for seqNumAssignedLatch if writeEntry is preassigned
- return this.preAssignedWriteEntry;
- }
- try {
- this.seqNumAssignedLatch.await();
- } catch (InterruptedException ie) {
- // If interrupted... clear out our entry else we can block up mvcc.
- MultiVersionConcurrencyControl mvcc = getMvcc();
- LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry);
- if (mvcc != null) {
- if (this.writeEntry != null) {
- mvcc.complete(this.writeEntry);
- }
- }
- InterruptedIOException iie = new InterruptedIOException();
- iie.initCause(ie);
- throw iie;
- }
+ assert this.writeEntry != null;
return this.writeEntry;
}
@InterfaceAudience.Private // For internal use only.
public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
- assert this.writeEntry == null : "Non-null writeEntry when trying to set one";
+ assert this.writeEntry == null;
this.writeEntry = writeEntry;
// Set our sequenceid now using WriteEntry.
- if (this.writeEntry != null) {
- this.logSeqNum = this.writeEntry.getWriteNumber();
- }
- this.seqNumAssignedLatch.countDown();
+ this.logSeqNum = writeEntry.getWriteNumber();
}
// should be < 0 (@see HLogKey#readFields(DataInput))
@@ -189,7 +168,6 @@ public class WALKey implements SequenceId, Comparable {
@InterfaceAudience.Private
protected long logSeqNum;
private long origLogSeqNum = 0;
- private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
// Time at which this edit was written.
// visible for deprecated HLogKey
@InterfaceAudience.Private
@@ -206,7 +184,6 @@ public class WALKey implements SequenceId, Comparable {
private long nonce = HConstants.NO_NONCE;
private MultiVersionConcurrencyControl mvcc;
private MultiVersionConcurrencyControl.WriteEntry writeEntry;
- private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = null;
public static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList());
// visible for deprecated HLogKey
@@ -393,36 +370,6 @@ public class WALKey implements SequenceId, Comparable {
*/
@Override
public long getSequenceId() throws IOException {
- return getSequenceId(-1);
- }
-
- /**
- * Wait for sequence number to be assigned & return the assigned value.
- * @param maxWaitForSeqId maximum time to wait in milliseconds for sequenceid
- * @return long the new assigned sequence number
- * @throws IOException
- */
- public long getSequenceId(final long maxWaitForSeqId) throws IOException {
- // TODO: This implementation waiting on a latch is problematic because if a higher level
- // determines we should stop or abort, there is no global list of all these blocked WALKeys
- // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId.
- //
- // UPDATE: I think we can remove the timeout now we are stamping all walkeys with sequenceid,
- // even those that have failed (previously we were not... so they would just hang out...).
- // St.Ack 20150910
- try {
- if (maxWaitForSeqId < 0) {
- this.seqNumAssignedLatch.await();
- } else if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) {
- throw new TimeoutIOException("Failed to get sequenceid after " + maxWaitForSeqId +
- "ms; WAL system stuck or has gone away?");
- }
- } catch (InterruptedException ie) {
- LOG.warn("Thread interrupted waiting for next log sequence number");
- InterruptedIOException iie = new InterruptedIOException();
- iie.initCause(ie);
- throw iie;
- }
return this.logSeqNum;
}
@@ -667,23 +614,4 @@ public class WALKey implements SequenceId, Comparable {
}
}
- /**
- * @return The preassigned writeEntry, if any
- */
- @InterfaceAudience.Private // For internal use only.
- public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() {
- return this.preAssignedWriteEntry;
- }
-
- /**
- * Preassign writeEntry
- * @param writeEntry the entry to assign
- */
- @InterfaceAudience.Private // For internal use only.
- public void setPreAssignedWriteEntry(WriteEntry writeEntry) {
- if (writeEntry != null) {
- this.preAssignedWriteEntry = writeEntry;
- this.logSeqNum = writeEntry.getWriteNumber();
- }
- }
}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index aca2978..2acafb3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -242,7 +242,9 @@ public class TestWALObserver {
// it's where WAL write cp should occur.
long now = EnvironmentEdgeManager.currentTime();
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
+ long txid = log.append(htd, hri,
+ new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now,
+ new MultiVersionConcurrencyControl()),
edit, true);
log.sync(txid);
@@ -326,7 +328,7 @@ public class TestWALObserver {
LOG.debug("write a log edit that supports legacy cps.");
final long now = EnvironmentEdgeManager.currentTime();
- final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now);
+ final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc);
final WALEdit edit = new WALEdit();
final byte[] nonce = Bytes.toBytes("1772");
edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index e1af83a..20e56af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -6272,11 +6272,8 @@ public class TestHRegion {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
WALKey key = invocation.getArgumentAt(2, WALKey.class);
- MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry();
- if (we == null) {
- we = key.getMvcc().begin();
- key.setWriteEntry(we);
- }
+ MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
+ key.setWriteEntry(we);
return 1L;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index d82d1df..0a0393a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -216,13 +216,15 @@ public class TestWALLockup {
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
final HRegion region = initHRegion(tableName, null, null, dodgyWAL);
byte [] bytes = Bytes.toBytes(getName());
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
try {
// First get something into memstore. Make a Put and then pull the Cell out of it. Will
// manage append and sync carefully in below to manufacture hang. We keep adding same
// edit. WAL subsystem doesn't care.
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
- WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName());
+ WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
+ htd.getTableName(), System.currentTimeMillis(), mvcc);
WALEdit edit = new WALEdit();
CellScanner CellScanner = put.cellScanner();
assertTrue(CellScanner.advance());
@@ -388,12 +390,12 @@ public class TestWALLockup {
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
final HRegion region = initHRegion(tableName, null, null, dodgyWAL1);
byte[] bytes = Bytes.toBytes(getName());
-
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
try {
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
- htd.getTableName());
+ htd.getTableName(), System.currentTimeMillis(), mvcc);
WALEdit edit = new WALEdit();
CellScanner CellScanner = put.cellScanner();
assertTrue(CellScanner.advance());
@@ -425,7 +427,7 @@ public class TestWALLockup {
// make RingBufferEventHandler sleep 1s, so the following sync
// endOfBatch=false
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
- TableName.valueOf("sleep"));
+ TableName.valueOf("sleep"), System.currentTimeMillis(), mvcc);
dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true);
Thread t = new Thread("Sync") {
@@ -449,7 +451,7 @@ public class TestWALLockup {
}
// make append throw DamagedWALException
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
- TableName.valueOf("DamagedWALException"));
+ TableName.valueOf("DamagedWALException"), System.currentTimeMillis(), mvcc);
dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true);
while (latch.getCount() > 0) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 1fcb241..3a23a05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -416,6 +416,7 @@ public class TestFSHLog {
final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC());
wal.append(htd, info, logkey, edits, true);
+ region.getMVCC().completeAndWait(logkey.getWriteEntry());
}
region.flush(true);
// FlushResult.flushSequenceId is not visible here so go get the current sequence id.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
index bac1b6f..b1eccdf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -101,7 +102,7 @@ public class TestWALActionsListener {
HRegionInfo hri = new HRegionInfo(TableName.valueOf(SOME_BYTES),
SOME_BYTES, SOME_BYTES, false);
final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
-
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
for (int i = 0; i < 20; i++) {
byte[] b = Bytes.toBytes(i+"");
KeyValue kv = new KeyValue(b,b,b);
@@ -111,7 +112,7 @@ public class TestWALActionsListener {
htd.addFamily(new HColumnDescriptor(b));
final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
- TableName.valueOf(b), 0), edit, true);
+ TableName.valueOf(b), 0, mvcc), edit, true);
wal.sync(txid);
if (i == 10) {
wal.registerWALActionsListener(laterobserver);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 2622f6d..bf28bca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -1189,7 +1189,7 @@ public class TestWALReplay {
FSWALEntry entry =
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc), createWALEdit(
rowName, family, ee, index), htd, hri, true);
- entry.stampRegionSequenceId();
+ entry.stampRegionSequenceId(mvcc.begin());
return entry;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
index d9454bb..b405698 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
@@ -95,11 +96,12 @@ public class TestSecureWAL {
final WAL wal =
wals.getWAL(regioninfo.getEncodedNameAsBytes(), regioninfo.getTable().getNamespace());
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis()), kvs, true);
+ System.currentTimeMillis(), mvcc), kvs, true);
}
wal.sync();
final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 9b4a968..621d092 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -370,12 +370,12 @@ public class TestWALFactory {
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor(tableName.getName()));
-
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis()), kvs, true);
+ System.currentTimeMillis(), mvcc), kvs, true);
}
// Now call sync to send the data to HDFS datanodes
wal.sync();