diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 6f800c2..75f1f82 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -55,7 +55,7 @@ import org.apache.hadoop.util.StringUtils;
/**
* Provides functionality to write ({@link BlockIndexWriter}) and read
- * ({@link BlockIndexReader})
+ * BlockIndexReader
* single-level and multi-level block indexes.
*
* Examples of how to use the block index writer can be found in
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 2293311..d30d3c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -46,7 +46,6 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
@@ -61,6 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
@@ -146,7 +146,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -236,19 +235,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* {@link #maxFlushedSeqId} will be older than the oldest edit in memory.
*/
private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
- /**
- * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
- * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
- * Its default value is -1L. This default is used as a marker to indicate
- * that the region hasn't opened yet. Once it is opened, it is set to the derived
- * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
- *
- *
Control of this sequence is handed off to the WAL implementation. It is responsible
- * for tagging edits with the correct sequence id since it is responsible for getting the
- * edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT
- * OUTSIDE OF THE WAL. The value you get will not be what you think it is.
- */
- private final AtomicLong sequenceId = new AtomicLong(-1L);
/**
* The sequence id of the last replayed open region event from the primary region. This is used
@@ -376,7 +362,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// no new RegionScanners can grab a readPoint that we are unaware of.
// We achieve this by synchronizing on the scannerReadPoints object.
synchronized(scannerReadPoints) {
- minimumReadPoint = mvcc.memstoreReadPoint();
+ minimumReadPoint = mvcc.getReadPoint();
for (Long readPoint: this.scannerReadPoints.values()) {
if (readPoint < minimumReadPoint) {
@@ -970,8 +956,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
getRegionServerServices().getServerName(), storeFiles);
- WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
- getSequenceId());
+ WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc);
}
private void writeRegionCloseMarker(WAL wal) throws IOException {
@@ -985,17 +970,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
- RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
+ RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
getRegionServerServices().getServerName(), storeFiles);
- WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
- getSequenceId());
+ WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc);
// Store SeqId in HDFS when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
// table is still online
if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
- getSequenceId().get(), 0);
+ mvcc.getReadPoint(), 0);
}
}
@@ -1267,7 +1251,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// This scan can read even uncommitted transactions
return Long.MAX_VALUE;
}
- return mvcc.memstoreReadPoint();
+ return mvcc.getReadPoint();
}
@Override
@@ -1947,11 +1931,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean shouldFlushStore(Store store) {
long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
store.getFamily().getName()) - 1;
- if (earliest > 0 && earliest + flushPerChanges < sequenceId.get()) {
+ if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " +
getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest +
- " is > " + this.flushPerChanges + " from current=" + sequenceId.get());
+ " is > " + this.flushPerChanges + " from current=" + mvcc.getReadPoint());
}
return true;
}
@@ -1977,7 +1961,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
whyFlush.setLength(0);
// This is a rough measure.
if (this.maxFlushedSeqId > 0
- && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
+ && (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.getReadPoint())) {
whyFlush.append("more than max edits, " + this.flushPerChanges + ", since last flush");
return true;
}
@@ -2068,10 +2052,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
protected PrepareFlushResult internalPrepareFlushCache(
- final WAL wal, final long myseqid, final Collection storesToFlush,
- MonitoredTask status, boolean writeFlushWalMarker)
- throws IOException {
-
+ final WAL wal,
+ final long myseqid,
+ final Collection storesToFlush,
+ MonitoredTask status,
+ boolean writeFlushWalMarker)
+ throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@@ -2092,26 +2078,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// etc.)
// wal can be null replaying edits.
if (wal != null) {
- w = mvcc.beginMemstoreInsert();
- long flushOpSeqId = getNextSequenceId(wal);
+ w = mvcc.newWriteEntry();
+ long flushOpSeqId = w.getWriteNumber();
FlushResult flushResult = new FlushResultImpl(
- FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, "Nothing to flush",
- writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
- w.setWriteNumber(flushOpSeqId);
- mvcc.waitForPreviousTransactionsComplete(w);
+ FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+ flushOpSeqId,
+ "Nothing to flush",
+ writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
+ mvcc.completeAndWaitForRead(w);
w = null;
return new PrepareFlushResult(flushResult, myseqid);
} else {
return new PrepareFlushResult(
- new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
- "Nothing to flush", false),
+ new FlushResultImpl(
+ FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+ "Nothing to flush",
+ false),
myseqid);
}
}
} finally {
this.updatesLock.writeLock().unlock();
if (w != null) {
- mvcc.advanceMemstore(w);
+ mvcc.complete(w);
}
}
}
@@ -2122,10 +2111,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!isAllFamilies(storesToFlush)) {
perCfExtras = new StringBuilder();
for (Store store: storesToFlush) {
- perCfExtras.append("; ");
- perCfExtras.append(store.getColumnFamilyName());
- perCfExtras.append("=");
- perCfExtras.append(StringUtils.byteDesc(store.getMemStoreSize()));
+ perCfExtras.append("; ").append(store.getColumnFamilyName());
+ perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize()));
}
}
LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
@@ -2168,7 +2155,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long trxId = 0;
try {
try {
- w = mvcc.beginMemstoreInsert();
+ w = mvcc.newWriteEntry();
if (wal != null) {
Long earliestUnflushedSequenceIdForTheRegion =
wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
@@ -2202,7 +2189,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
getRegionInfo(), flushOpSeqId, committedFiles);
// no sync. Sync is below where we do not hold the updates lock
trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, sequenceId, false);
+ desc, false, mvcc);
}
// Prepare flush (take a snapshot)
@@ -2216,7 +2203,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, sequenceId, false);
+ desc, false, mvcc);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
StringUtils.stringifyException(t));
@@ -2250,14 +2237,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that
// were removed via a rollbackMemstore could be written to Hfiles.
- w.setWriteNumber(flushOpSeqId);
- mvcc.waitForPreviousTransactionsComplete(w);
- // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
+ mvcc.completeAndWaitForRead(w);
+ // set w to null to prevent mvcc.complete from being called again inside finally block
w = null;
} finally {
if (w != null) {
// in case of failure just mark current w as complete
- mvcc.advanceMemstore(w);
+ mvcc.complete(w);
}
}
return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId,
@@ -2281,10 +2267,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
- getRegionInfo(), -1, new TreeMap>());
+ getRegionInfo(), -1, new TreeMap>(Bytes.BYTES_COMPARATOR));
try {
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, sequenceId, true);
+ desc, true, mvcc);
return true;
} catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -2347,7 +2333,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, sequenceId, true);
+ desc, true, mvcc);
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
@@ -2361,7 +2347,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
- desc, sequenceId, false);
+ desc, false, mvcc);
} catch (Throwable ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:"
@@ -2433,8 +2419,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
@VisibleForTesting
protected long getNextSequenceId(final WAL wal) throws IOException {
- WALKey key = this.appendEmptyEdit(wal, null);
- return key.getSequenceId(maxWaitForSeqId);
+ WALKey key = this.appendEmptyEdit(wal);
+ mvcc.complete(key.getWriteEntry());
+ return key.getSequenceId();
}
//////////////////////////////////////////////////////////////////////////////
@@ -2862,7 +2849,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// reference family maps directly so coprocessors can mutate them if desired
Map>[] familyMaps = new Map[batchOp.operations.length];
- List memstoreCells = new ArrayList();
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex;
@@ -2927,17 +2913,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If we haven't got any rows in our batch, we should block to
// get the next one.
- boolean shouldBlock = numReadyToWrite == 0;
RowLock rowLock = null;
try {
- rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
+ rowLock = getRowLock(mutation.getRow(), true);
} catch (IOException ioe) {
LOG.warn("Failed getting lock in batch put, row="
+ Bytes.toStringBinary(mutation.getRow()), ioe);
}
if (rowLock == null) {
// We failed to grab another lock
- assert !shouldBlock : "Should never fail to get lock when blocking";
+ assert false: "Should never fail to get lock when blocking";
break; // stop acquiring more rows for this batch
} else {
acquiredRowLocks.add(rowLock);
@@ -2997,16 +2982,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
lock(this.updatesLock.readLock(), numReadyToWrite);
locked = true;
- if(isInReplay) {
- mvccNum = batchOp.getReplaySequenceId();
- } else {
- mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
- }
- //
- // ------------------------------------
- // Acquire the latest mvcc number
- // ----------------------------------
- w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
// calling the pre CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) {
@@ -3017,35 +2992,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// ------------------------------------
- // STEP 3. Write back to memstore
- // Write to memstore. It is ok to write to memstore
- // first without updating the WAL because we do not roll
- // forward the memstore MVCC. The MVCC will be moved up when
- // the complete operation is done. These changes are not yet
- // visible to scanners till we update the MVCC. The MVCC is
- // moved only when the sync is complete.
- // ----------------------------------
- long addedSize = 0;
- for (int i = firstIndex; i < lastIndexExclusive; i++) {
- if (batchOp.retCodeDetails[i].getOperationStatusCode()
- != OperationStatusCode.NOT_RUN) {
- continue;
- }
- doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
- addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
- }
-
- // ------------------------------------
- // STEP 4. Build WAL edit
+ // STEP 3. Build WAL edit
// ----------------------------------
Durability durability = Durability.USE_DEFAULT;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing
- if (batchOp.retCodeDetails[i].getOperationStatusCode()
- != OperationStatusCode.NOT_RUN) {
+ if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
continue;
}
- batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
Mutation m = batchOp.getMutation(i);
Durability tmpDur = getEffectiveDurability(m.getDurability());
@@ -3071,9 +3025,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(),
- currentNonceGroup, currentNonce);
+ currentNonceGroup, currentNonce, mvcc);
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
- walEdit, getSequenceId(), true, null);
+ walEdit, true);
walEdit = new WALEdit(isInReplay);
walKey = null;
}
@@ -3092,38 +3046,58 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// -------------------------
- // STEP 5. Append the final edit to WAL. Do not sync wal.
+ // STEP 4. Append the final edit to WAL. Do not sync wal.
// -------------------------
Mutation mutation = batchOp.getMutation(firstIndex);
if (isInReplay) {
// use wal key from the original
walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
- mutation.getClusterIds(), currentNonceGroup, currentNonce);
+ mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
long replaySeqId = batchOp.getReplaySequenceId();
walKey.setOrigLogSeqNum(replaySeqId);
-
- // ensure that the sequence id of the region is at least as big as orig log seq id
- while (true) {
- long seqId = getSequenceId().get();
- if (seqId >= replaySeqId) break;
- if (getSequenceId().compareAndSet(seqId, replaySeqId)) break;
- }
}
if (walEdit.size() > 0) {
if (!isInReplay) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
- mutation.getClusterIds(), currentNonceGroup, currentNonce);
+ mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
}
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+ }
- txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
- getSequenceId(), true, memstoreCells);
+ // ------------------------------------
+ // Acquire the latest mvcc number
+ // ----------------------------------
+ if (walKey == null) {
+ // If this is a skip wal operation just get the read point from mvcc
+ walKey = this.appendEmptyEdit(this.wal);
}
- if(walKey == null){
- // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
- walKey = this.appendEmptyEdit(this.wal, memstoreCells);
+ if (!isInReplay) {
+ w = walKey.getWriteEntry();
+ mvccNum = w.getWriteNumber();
+ } else {
+ mvccNum = batchOp.getReplaySequenceId();
+ }
+
+ // ------------------------------------
+ // STEP 5. Write back to memstore
+ // Write to memstore. It is ok to write to memstore
+ // first without syncing the WAL because we do not roll
+ // forward the memstore MVCC. The MVCC will be moved up when
+ // the complete operation is done. These changes are not yet
+ // visible to scanners till we update the MVCC. The MVCC is
+ // moved only when the sync is complete.
+ // ----------------------------------
+ long addedSize = 0;
+ for (int i = firstIndex; i < lastIndexExclusive; i++) {
+ if (batchOp.retCodeDetails[i].getOperationStatusCode()
+ != OperationStatusCode.NOT_RUN) {
+ continue;
+ }
+ doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
+ addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay);
}
// -------------------------------
@@ -3151,13 +3125,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
coprocessorHost.postBatchMutate(miniBatchOp);
}
-
// ------------------------------------------------------------------
// STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
// ------------------------------------------------------------------
if (w != null) {
- mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
+ mvcc.completeAndWaitForRead(w);
w = null;
+ } else if (isInReplay) {
+ // ensure that the sequence id of the region is at least as big as orig log seq id
+ mvcc.advanceTo(mvccNum);
+ }
+
+ for (int i = firstIndex; i < lastIndexExclusive; i ++) {
+ if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
+ batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
+ }
}
// ------------------------------------
@@ -3185,10 +3167,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} finally {
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
- rollbackMemstore(memstoreCells);
+ for (int j=0; j cells:familyMaps[j].values()) {
+ rollbackMemstore(cells);
+ }
+ }
}
if (w != null) {
- mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
+ mvcc.complete(w);
}
if (locked) {
@@ -3275,7 +3261,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLock(get.getRow());
// wait for all previous transactions to complete (with lock held)
- mvcc.waitForPreviousTransactionsComplete();
+ mvcc.completeAndWaitForRead(mvcc.newWriteEntry());
try {
if (this.getCoprocessorHost() != null) {
Boolean processed = null;
@@ -3384,7 +3370,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLock(get.getRow());
// wait for all previous transactions to complete (with lock held)
- mvcc.waitForPreviousTransactionsComplete();
+ mvcc.completeAndWaitForRead(mvcc.newWriteEntry());
try {
List result = get(get, false);
@@ -3461,7 +3447,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private void doBatchMutate(Mutation mutation) throws IOException {
// Currently this is only called for puts and deletes, so no nonces.
- OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation });
+ OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
@@ -3646,7 +3632,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* new entries.
*/
private long applyFamilyMapToMemstore(Map> familyMap,
- long mvccNum, List memstoreCells, boolean isInReplay) throws IOException {
+ long mvccNum, boolean isInReplay) throws IOException {
long size = 0;
for (Map.Entry> e : familyMap.entrySet()) {
@@ -3657,10 +3643,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int listSize = cells.size();
for (int i=0; i < listSize; i++) {
Cell cell = cells.get(i);
- CellUtil.setSequenceId(cell, mvccNum);
+ if (cell.getSequenceId() == 0) {
+ CellUtil.setSequenceId(cell, mvccNum);
+ }
Pair ret = store.add(cell);
size += ret.getFirst();
- memstoreCells.add(ret.getSecond());
if(isInReplay) {
// set memstore newly added cells with replay mvcc number
CellUtil.setSequenceId(ret.getSecond(), mvccNum);
@@ -4417,12 +4404,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.maxFlushedSeqId = flush.getFlushSequenceNumber();
// advance the mvcc read point so that the new flushed file is visible.
- // there may be some in-flight transactions, but they won't be made visible since they are
- // either greater than flush seq number or they were already dropped via flush.
- // TODO: If we are using FlushAllStoresPolicy, then this can make edits visible from other
- // stores while they are still in flight because the flush commit marker will not contain
- // flushes from ALL stores.
- getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
+ mvcc.advanceTo(flush.getFlushSequenceNumber());
} catch (FileNotFoundException ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -4489,15 +4471,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Drops the memstore contents after replaying a flush descriptor or region open event replay
* if the memstore edits have seqNums smaller than the given seq id
- * @param flush the flush descriptor
* @throws IOException
*/
private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
long totalFreedSize = 0;
this.updatesLock.writeLock().lock();
try {
- mvcc.waitForPreviousTransactionsComplete();
- long currentSeqId = getSequenceId().get();
+
+ long currentSeqId = mvcc.getReadPoint();
if (seqId >= currentSeqId) {
// then we can drop the memstore contents since everything is below this seqId
LOG.info(getRegionInfo().getEncodedName() + " : "
@@ -4660,9 +4641,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
dropPrepareFlushIfPossible();
// advance the mvcc read point so that the new flushed file is visible.
- // there may be some in-flight transactions, but they won't be made visible since they are
- // either greater than flush seq number or they were already dropped via flush.
- getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
+ mvcc.completeAndWaitForRead(mvcc.newWriteEntry());
// If we were waiting for observing a flush or region opening event for not showing partial
// data after a secondary region crash, we can allow reads now.
@@ -4753,7 +4732,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
if (bulkLoadEvent.getBulkloadSeqNum() > 0) {
- getMVCC().advanceMemstoreReadPointIfNeeded(bulkLoadEvent.getBulkloadSeqNum());
+ mvcc.advanceTo(bulkLoadEvent.getBulkloadSeqNum());
}
} finally {
closeBulkRegionOperation();
@@ -4852,11 +4831,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
dropPrepareFlushIfPossible();
// advance the mvcc read point so that the new flushed files are visible.
- // there may be some in-flight transactions, but they won't be made visible since they are
- // either greater than flush seq number or they were already picked up via flush.
- for (Store s : getStores()) {
- getMVCC().advanceMemstoreReadPointIfNeeded(s.getMaxMemstoreTS());
- }
+ // either greater than flush seq number or they were already picked up via flush.
+ for (Store s : getStores()) {
+ mvcc.advanceTo(s.getMaxMemstoreTS());
+ }
+
// smallestSeqIdInStores is the seqId that we have a corresponding hfile for. We can safely
// skip all edits that are to be replayed in the future with that has a smaller seqId
@@ -5014,75 +4993,91 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- @Override
- public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
- startRegionOperation();
- try {
- return getRowLockInternal(row, waitForLock);
- } finally {
- closeRegionOperation();
- }
+
+ /**
+ * Get an exclusive ( write lock ) lock on a given row.
+ * @param row Which row to lock.
+ * @return A locked RowLock. The lock is exclusive and already aqquired.
+ * @throws IOException
+ */
+ public RowLock getRowLock(byte[] row) throws IOException {
+ return getRowLock(row, false);
}
/**
- * A version of getRowLock(byte[], boolean) to use when a region operation has already been
+ *
+ * Get a row lock for the specified row. All locks are reentrant.
+ *
+ * Before calling this function make sure that a region operation has already been
* started (the calling thread has already acquired the region-close-guard lock).
+ * @param row The row actions will be performed against
+ * @param readLock is the lock reader or writer. True indicates that a non-exlcusive
+ * lock is requested
*/
- protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
+ public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
+ // Make sure the row is inside of this region before getting the lock for it.
+ checkRow(row, "row lock");
+ // create an object to use a a key in the row lock map
HashedBytes rowKey = new HashedBytes(row);
- RowLockContext rowLockContext = new RowLockContext(rowKey);
- // loop until we acquire the row lock (unless !waitForLock)
- while (true) {
- RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
- if (existingContext == null) {
- // Row is not already locked by any thread, use newly created context.
- break;
- } else if (existingContext.ownedByCurrentThread()) {
- // Row is already locked by current thread, reuse existing context instead.
- rowLockContext = existingContext;
- break;
- } else {
- if (!waitForLock) {
- return null;
+ RowLockContext rowLockContext = null;
+ RowLockImpl result = null;
+ TraceScope traceScope = null;
+
+ // If we're tracing start a span to show how long this took.
+ if (Trace.isTracing()) {
+ traceScope = Trace.startSpan("HRegion.getRowLock");
+ traceScope.getSpan().addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock"));
+ }
+
+ try {
+ // Keep trying until we have a lock or error out.
+ // TODO: do we need to add a time component here?
+ while (result == null) {
+
+ // Try adding a RowLockContext to the lockedRows.
+ // If we can add it then there's no other transactions currently running.
+ rowLockContext = new RowLockContext(rowKey);
+ RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
+
+ // if there was a running transaction then there's already a context.
+ if (existingContext != null) {
+ rowLockContext = existingContext;
}
- TraceScope traceScope = null;
- try {
- if (Trace.isTracing()) {
- traceScope = Trace.startSpan("HRegion.getRowLockInternal");
- }
- // Row is already locked by some other thread, give up or wait for it
- if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
- if(traceScope != null) {
- traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
- }
- throw new IOException("Timed out waiting for lock for row: " + rowKey);
- }
- if (traceScope != null) traceScope.close();
- traceScope = null;
- } catch (InterruptedException ie) {
- LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
- InterruptedIOException iie = new InterruptedIOException();
- iie.initCause(ie);
- throw iie;
- } finally {
- if (traceScope != null) traceScope.close();
+
+ // Now try an get the lock.
+ //
+ // This can fail as
+ if (readLock) {
+ result = rowLockContext.newReadLock();
+ } else {
+ result = rowLockContext.newWriteLock();
+ }
+ }
+ if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
+ if (traceScope != null) {
+ traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
}
+ result = null;
+ // Clean up the counts just in case this was the thing keeping the context alive.
+ rowLockContext.cleanUp();
+ throw new IOException("Timed out waiting for lock for row: " + rowKey);
+ }
+ return result;
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ if (traceScope != null) {
+ traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock");
+ }
+ Thread.currentThread().interrupt();
+ throw iie;
+ } finally {
+ if (traceScope != null) {
+ traceScope.close();
}
}
-
- // allocate new lock for this thread
- return rowLockContext.newLock();
- }
-
- /**
- * Acquires a lock on the given row.
- * The same thread may acquire multiple locks on the same row.
- * @return the acquired row lock
- * @throws IOException if the lock could not be acquired after waiting
- */
- public RowLock getRowLock(byte[] row) throws IOException {
- return getRowLock(row, true);
}
@Override
@@ -5095,6 +5090,97 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ @VisibleForTesting
+ class RowLockContext {
+ private final HashedBytes row;
+ final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
+ final AtomicBoolean usable = new AtomicBoolean(true);
+ final AtomicInteger count = new AtomicInteger(0);
+ final Object lock = new Object();
+
+ RowLockContext(HashedBytes row) {
+ this.row = row;
+ }
+
+ RowLockImpl newWriteLock() {
+ Lock l = readWriteLock.writeLock();
+ return getRowLock(l);
+ }
+ RowLockImpl newReadLock() {
+ Lock l = readWriteLock.readLock();
+ return getRowLock(l);
+ }
+
+ private RowLockImpl getRowLock(Lock l) {
+ count.incrementAndGet();
+ synchronized (lock) {
+ if (usable.get()) {
+ return new RowLockImpl(this, l);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ void cleanUp() {
+ long c = count.decrementAndGet();
+ if (c <= 0) {
+ synchronized (lock) {
+ if (count.get() <= 0) {
+ usable.set(false);
+ RowLockContext removed = lockedRows.remove(row);
+ assert removed == this: "we should never remove a different context";
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "RowLockContext{" +
+ "row=" + row +
+ ", readWriteLock=" + readWriteLock +
+ ", count=" + count +
+ '}';
+ }
+ }
+
+ /**
+ * Class used to represent a lock on a row.
+ */
+ public static class RowLockImpl implements RowLock {
+ private final RowLockContext context;
+ private final Lock lock;
+
+ public RowLockImpl(RowLockContext context, Lock lock) {
+ this.context = context;
+ this.lock = lock;
+ }
+
+ public Lock getLock() {
+ return lock;
+ }
+
+ @VisibleForTesting
+ public RowLockContext getContext() {
+ return context;
+ }
+
+ @Override
+ public void release() {
+ lock.unlock();
+ context.cleanUp();
+ }
+
+ @Override
+ public String toString() {
+ return "RowLockImpl{" +
+ "context=" + context +
+ ", lock=" + lock +
+ '}';
+ }
+ }
+
/**
* Determines whether multiple column families are present
* Precondition: familyPaths is not null
@@ -5240,7 +5326,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.getRegionInfo().getTable(),
ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
- loadDescriptor, sequenceId);
+ loadDescriptor);
} catch (IOException ioe) {
if (this.rsServices != null) {
// Have to abort region server because some hfiles has been loaded but we can't write
@@ -5411,7 +5497,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
- public synchronized boolean next(List outResults, ScannerContext scannerContext) throws IOException {
+ public synchronized boolean next(List outResults, ScannerContext scannerContext)
+ throws IOException {
if (this.filterClosed) {
throw new UnknownScannerException("Scanner was closed (timed out?) " +
"after we renewed it. Could be caused by a very slow scanner " +
@@ -5843,8 +5930,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
protected boolean isStopRow(Cell currentRowCell) {
- return currentRowCell == null
- || (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length) >= isScan);
+ return currentRowCell == null ||
+ (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length) >=
+ isScan);
}
@Override
@@ -5992,7 +6080,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (initialize) {
// If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when
// verifying the WALEdits.
- region.setSequenceId(region.initialize(null));
+ region.getMVCC().initialize(region.initialize(null));
}
return region;
}
@@ -6206,7 +6294,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Refuse to open the region if a required class cannot be loaded
checkClassLoading();
this.openSeqNum = initialize(reporter);
- this.setSequenceId(openSeqNum);
+ this.mvcc.initialize(openSeqNum);
if (wal != null && getRegionServerServices() != null && !writestate.readOnly
&& !recovering) {
// Only write the region open event marker to WAL if (1) we are not read-only
@@ -6641,7 +6729,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
List acquiredRowLocks;
long addedSize = 0;
List mutations = new ArrayList();
- List memstoreCells = new ArrayList();
Collection rowsToLock = processor.getRowsToLock();
long mvccNum = 0;
WALKey walKey = null;
@@ -6650,13 +6737,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
acquiredRowLocks = new ArrayList(rowsToLock.size());
for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, throw if any lock times out
+ // use a writer lock for mixed reads and writes
acquiredRowLocks.add(getRowLock(row));
}
// 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
locked = true;
- // Get a mvcc write number
- mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
long now = EnvironmentEdgeManager.currentTime();
try {
@@ -6666,11 +6752,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
processor, now, this, mutations, walEdit, timeout);
if (!mutations.isEmpty()) {
- // 5. Start mvcc transaction
- writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
- // 6. Call the preBatchMutate hook
+
+ // 5. Call the preBatchMutate hook
processor.preBatchMutate(this, walEdit);
- // 7. Apply to memstore
+
+ long txid = 0;
+ // 6. Append no sync
+ if (!walEdit.isEmpty()) {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
+ processor.getClusterIds(), nonceGroup, nonce, mvcc);
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
+ walKey, walEdit, false);
+ }
+ if(walKey == null){
+ // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
+ // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
+ walKey = this.appendEmptyEdit(this.wal);
+ }
+
+ // 7. Start mvcc transaction
+ writeEntry = walKey.getWriteEntry();
+ mvccNum = walKey.getSequenceId();
+
+
+
+ // 8. Apply to memstore
for (Mutation m : mutations) {
// Handle any tag based cell features
rewriteCellTags(m.getFamilyCellMap(), m);
@@ -6685,25 +6793,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
Pair ret = store.add(cell);
addedSize += ret.getFirst();
- memstoreCells.add(ret.getSecond());
}
}
- long txid = 0;
- // 8. Append no sync
- if (!walEdit.isEmpty()) {
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
- processor.getClusterIds(), nonceGroup, nonce);
- txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
- walKey, walEdit, getSequenceId(), true, memstoreCells);
- }
- if(walKey == null){
- // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
- // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
- walKey = this.appendEmptyEdit(this.wal, memstoreCells);
- }
// 9. Release region lock
if (locked) {
this.updatesLock.readLock().unlock();
@@ -6735,7 +6827,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// 13. Roll mvcc forward
if (writeEntry != null) {
- mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
+ mvcc.completeAndWaitForRead(writeEntry);
}
if (locked) {
this.updatesLock.readLock().unlock();
@@ -6833,28 +6925,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(Operation.APPEND);
this.writeRequestsCount.increment();
long mvccNum = 0;
- WriteEntry w = null;
+ MultiVersionConcurrencyControl.WriteEntry w = null;
WALKey walKey = null;
RowLock rowLock = null;
- List memstoreCells = new ArrayList();
boolean doRollBackMemstore = false;
try {
rowLock = getRowLock(row);
+ assert rowLock != null;
try {
lock(this.updatesLock.readLock());
try {
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
- mvcc.waitForPreviousTransactionsComplete();
+ mvcc.completeAndWaitForRead(mvcc.newWriteEntry());
if (this.coprocessorHost != null) {
Result r = this.coprocessorHost.preAppendAfterRowLock(append);
if(r!= null) {
return r;
}
}
- // now start my own transaction
- mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
- w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
+
long now = EnvironmentEdgeManager.currentTime();
// Process each family
for (Map.Entry> family : append.getFamilyCellMap().entrySet()) {
@@ -6958,7 +7048,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- CellUtil.setSequenceId(newCell, mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
@@ -6979,42 +7068,50 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
tempMemstore.put(store, kvs);
}
+ // Actually write to WAL now
+ if (writeToWAL) {
+ // Using default cluster id, as this can only happen in the originating
+ // cluster. A slave cluster receives the final value (not the delta)
+ // as a Put.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ walKey = new HLogKey(
+ getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(),
+ WALKey.NO_SEQUENCE_ID,
+ nonceGroup,
+ nonce,
+ mvcc);
+ txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);
+ } else {
+ recordMutationWithoutWal(append.getFamilyCellMap());
+ }
+ if (walKey == null) {
+ // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
+ walKey = this.appendEmptyEdit(this.wal);
+ }
+ // now start my own transaction
+ w = walKey.getWriteEntry();
+ mvccNum = walKey.getSequenceId();
+
+
//Actually write to Memstore now
for (Map.Entry> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
if (store.getFamily().getMaxVersions() == 1) {
// upsert if VERSIONS for this CF == 1
size += store.upsert(entry.getValue(), getSmallestReadPoint());
- memstoreCells.addAll(entry.getValue());
} else {
// otherwise keep older versions around
for (Cell cell: entry.getValue()) {
+ CellUtil.setSequenceId(cell, mvccNum);
Pair ret = store.add(cell);
size += ret.getFirst();
- memstoreCells.add(ret.getSecond());
doRollBackMemstore = true;
}
}
allKVs.addAll(entry.getValue());
}
- // Actually write to WAL now
- if (writeToWAL) {
- // Using default cluster id, as this can only happen in the originating
- // cluster. A slave cluster receives the final value (not the delta)
- // as a Put.
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
- txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
- this.sequenceId, true, memstoreCells);
- } else {
- recordMutationWithoutWal(append.getFamilyCellMap());
- }
- if (walKey == null) {
- // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
- walKey = this.appendEmptyEdit(this.wal, memstoreCells);
- }
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
} finally {
@@ -7035,10 +7132,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
- rollbackMemstore(memstoreCells);
+ for(Listcells:tempMemstore.values()) {
+ rollbackMemstore(cells);
+ }
}
if (w != null) {
- mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
+ mvcc.completeAndWaitForRead(w);
}
closeRegionOperation(Operation.APPEND);
}
@@ -7086,10 +7185,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(Operation.INCREMENT);
this.writeRequestsCount.increment();
RowLock rowLock = null;
- WriteEntry w = null;
+ MultiVersionConcurrencyControl.WriteEntry w = null;
WALKey walKey = null;
- long mvccNum = 0;
- List memstoreCells = new ArrayList();
+
boolean doRollBackMemstore = false;
try {
rowLock = getRowLock(row);
@@ -7098,16 +7196,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
// wait for all prior MVCC transactions to finish - while we hold the row lock
// (so that we are guaranteed to see the latest state)
- mvcc.waitForPreviousTransactionsComplete();
+ mvcc.completeAndWaitForRead(mvcc.newWriteEntry());
if (this.coprocessorHost != null) {
Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
if (r != null) {
return r;
}
}
- // now start my own transaction
- mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
- w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
+
long now = EnvironmentEdgeManager.currentTime();
// Process each family
for (Map.Entry> family:
@@ -7189,8 +7285,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
val, 0, val.length,
newTags);
- CellUtil.setSequenceId(newKV, mvccNum);
-
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = coprocessorHost.postMutationBeforeWAL(
@@ -7217,6 +7311,35 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+
+ // Actually write to WAL now
+ if (walEdits != null && !walEdits.isEmpty()) {
+ if (writeToWAL) {
+ // Using default cluster id, as this can only happen in the originating
+ // cluster. A slave cluster receives the final value (not the delta)
+ // as a Put.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(),
+ WALKey.NO_SEQUENCE_ID,
+ nonceGroup,
+ nonce,
+ mvcc);
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
+ walKey, walEdits, true);
+ } else {
+ recordMutationWithoutWal(increment.getFamilyCellMap());
+ }
+ }
+ if (walKey == null) {
+ // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
+ walKey = this.appendEmptyEdit(this.wal);
+ }
+
+
+ // now start my own transaction
+ w = walKey.getWriteEntry();
+
//Actually write to Memstore now
if (!tempMemstore.isEmpty()) {
for (Map.Entry> entry : tempMemstore.entrySet()) {
@@ -7224,13 +7347,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (store.getFamily().getMaxVersions() == 1) {
// upsert if VERSIONS for this CF == 1
size += store.upsert(entry.getValue(), getSmallestReadPoint());
- memstoreCells.addAll(entry.getValue());
} else {
// otherwise keep older versions around
for (Cell cell : entry.getValue()) {
+ CellUtil.setSequenceId(cell, w.getWriteNumber());
Pair ret = store.add(cell);
size += ret.getFirst();
- memstoreCells.add(ret.getSecond());
doRollBackMemstore = true;
}
}
@@ -7238,26 +7360,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
}
-
- // Actually write to WAL now
- if (walEdits != null && !walEdits.isEmpty()) {
- if (writeToWAL) {
- // Using default cluster id, as this can only happen in the originating
- // cluster. A slave cluster receives the final value (not the delta)
- // as a Put.
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
- txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
- walKey, walEdits, getSequenceId(), true, memstoreCells);
- } else {
- recordMutationWithoutWal(increment.getFamilyCellMap());
- }
- }
- if(walKey == null){
- // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
- walKey = this.appendEmptyEdit(this.wal, memstoreCells);
- }
} finally {
this.updatesLock.readLock().unlock();
}
@@ -7276,10 +7378,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
- rollbackMemstore(memstoreCells);
+ for(List cells:tempMemstore.values()) {
+ rollbackMemstore(cells);
+ }
}
if (w != null) {
- mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
+ mvcc.completeAndWaitForRead(w);
}
closeRegionOperation(Operation.INCREMENT);
if (this.metricsRegion != null) {
@@ -7310,7 +7414,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+ 43 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);
@@ -7456,7 +7560,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw new IOException("Not a known catalog table: " + p.toString());
}
try {
- region.initialize(null);
+ region.mvcc.initialize(region.initialize(null));
if (majorCompact) {
region.compact(true);
} else {
@@ -7874,91 +7978,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/**
- * Do not change this sequence id. See {@link #sequenceId} comment.
+ * Do not change this sequence id. See #sequenceId comment.
* @return sequenceId
*/
@VisibleForTesting
- public AtomicLong getSequenceId() {
- return this.sequenceId;
- }
-
- /**
- * sets this region's sequenceId.
- * @param value new value
- */
- private void setSequenceId(long value) {
- this.sequenceId.set(value);
- }
-
- @VisibleForTesting class RowLockContext {
- private final HashedBytes row;
- private final CountDownLatch latch = new CountDownLatch(1);
- private final Thread thread;
- private int lockCount = 0;
-
- RowLockContext(HashedBytes row) {
- this.row = row;
- this.thread = Thread.currentThread();
- }
-
- boolean ownedByCurrentThread() {
- return thread == Thread.currentThread();
- }
-
- RowLock newLock() {
- lockCount++;
- RowLockImpl rl = new RowLockImpl();
- rl.setContext(this);
- return rl;
- }
-
- @Override
- public String toString() {
- Thread t = this.thread;
- return "Thread=" + (t == null? "null": t.getName()) + ", row=" + this.row +
- ", lockCount=" + this.lockCount;
- }
-
- void releaseLock() {
- if (!ownedByCurrentThread()) {
- throw new IllegalArgumentException("Lock held by thread: " + thread
- + " cannot be released by different thread: " + Thread.currentThread());
- }
- lockCount--;
- if (lockCount == 0) {
- // no remaining locks by the thread, unlock and allow other threads to access
- RowLockContext existingContext = lockedRows.remove(row);
- if (existingContext != this) {
- throw new RuntimeException(
- "Internal row lock state inconsistent, should not happen, row: " + row);
- }
- latch.countDown();
- }
- }
+ public long getSequenceId() {
+ return this.mvcc.getReadPoint();
}
- public static class RowLockImpl implements RowLock {
- private RowLockContext context;
- private boolean released = false;
-
- @VisibleForTesting
- public RowLockContext getContext() {
- return context;
- }
-
- @VisibleForTesting
- public void setContext(RowLockContext context) {
- this.context = context;
- }
-
- @Override
- public void release() {
- if (!released) {
- context.releaseLock();
- }
- released = true;
- }
- }
/**
* Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
@@ -7969,14 +7996,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return Return the key used appending with no sync and no append.
* @throws IOException
*/
- private WALKey appendEmptyEdit(final WAL wal, List cells) throws IOException {
+ private WALKey appendEmptyEdit(final WAL wal) throws IOException {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
- WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc);
// Call append but with an empty WALEdit. The returned seqeunce id will not be associated
// with any edit and we can be sure it went in after all outstanding appends.
wal.append(getTableDesc(), getRegionInfo(), key,
- WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
+ WALEdit.EMPTY_WALEDIT, false);
return key;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 840085d..2b593e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -478,6 +478,7 @@ public class HStore implements Store {
* @param hri {@link HRegionInfo} for the region.
* @param family {@link HColumnDescriptor} describing the column family
* @return Path to family/Store home directory.
+ * @deprecated Do not use. Reveals too much about internals.
*/
@Deprecated
public static Path getStoreHomedir(final Path tabledir,
@@ -490,6 +491,7 @@ public class HStore implements Store {
* @param encodedName Encoded region name.
* @param family {@link HColumnDescriptor} describing the column family
* @return Path to family/Store home directory.
+ * @deprecated Do not use. Reveals too much about internals.
*/
@Deprecated
public static Path getStoreHomedir(final Path tabledir,
@@ -619,7 +621,8 @@ public class HStore implements Store {
if (newFiles == null) newFiles = new ArrayList(0);
- HashMap currentFilesSet = new HashMap(currentFiles.size());
+ HashMap currentFilesSet =
+ new HashMap(currentFiles.size());
for (StoreFile sf : currentFiles) {
currentFilesSet.put(sf.getFileInfo(), sf);
}
@@ -650,7 +653,7 @@ public class HStore implements Store {
// readers might pick it up. This assumes that the store is not getting any writes (otherwise
// in-flight transactions might be made visible)
if (!toBeAddedFiles.isEmpty()) {
- region.getMVCC().advanceMemstoreReadPointIfNeeded(this.getMaxSequenceId());
+ region.getMVCC().advanceTo(this.getMaxSequenceId());
}
// notify scanners, close file readers, and recompute store size
@@ -1306,7 +1309,7 @@ public class HStore implements Store {
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
- this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
+ this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
}
@VisibleForTesting
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index 028d81a..a70bb56 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -18,122 +18,102 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
/**
- * Manages the read/write consistency within memstore. This provides
+ * Manages the read/write consistency. This provides
* an interface for readers to determine what entries to ignore, and
* a mechanism for writers to obtain new write numbers, then "commit"
* the new writes for readers to read (thus forming atomic transactions).
*/
@InterfaceAudience.Private
public class MultiVersionConcurrencyControl {
- private static final long NO_WRITE_NUMBER = 0;
- private volatile long memstoreRead = 0;
+ final AtomicLong readPoint = new AtomicLong(0);
+ final AtomicLong writePoint = new AtomicLong(0);
+
private final Object readWaiters = new Object();
// This is the pending queue of writes.
- private final LinkedList writeQueue =
- new LinkedList();
+ //
+ // TODO(eclark): Should this be an array of fixed size to
+ // reduce the number of allocations on the write path?
+ // This could be equal to the number of handlers + a small number.
+ private final LinkedList writeQueue = new LinkedList();
/**
- * Default constructor. Initializes the memstoreRead/Write points to 0.
+ * Default constructor. Initializes the Read/Write points to 0.
*/
public MultiVersionConcurrencyControl() {
}
+
+
/**
- * Initializes the memstoreRead/Write points appropriately.
+ * Initializes the Read/Write points appropriately.
* @param startPoint
*/
- public void initialize(long startPoint) {
- synchronized (writeQueue) {
- writeQueue.clear();
- memstoreRead = startPoint;
- }
+ public boolean initialize(long startPoint) {
+ return tryAdvanceTo(startPoint, -1);
}
- /**
- *
- * @param initVal The value we used initially and expected it'll be reset later
- * @return WriteEntry instance.
- */
- WriteEntry beginMemstoreInsert() {
- return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
+ public void advanceTo(long mvccNum) {
+ while (true) {
+ long seqId = this.getWritePoint();
+ if (seqId >= mvccNum) break;
+ if (this.tryAdvanceTo(/* newSeqId = */ mvccNum, /* expected = */ seqId)) break;
+ }
}
- /**
- * Get a mvcc write number before an actual one(its log sequence Id) being assigned
- * @param sequenceId
- * @return long a faked write number which is bigger enough not to be seen by others before a real
- * one is assigned
- */
- public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
- // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
- // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
- // because each handler could increment sequence num twice and max concurrent in-flight
- // transactions is the number of RPC handlers.
- // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
- // changes touch same row key
- // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
- // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all
- return sequenceId.incrementAndGet() + 1000000000;
+ public boolean tryAdvanceTo(long startPoint, long expected) {
+ synchronized (writeQueue) {
+ long currentRead = this.readPoint.get();
+ long currentWrite = this.writePoint.get();
+ if (currentRead != currentWrite) {
+ throw new RuntimeException("Already used this mvcc. Too late to tryAdvanceTo");
+ }
+ if (expected != -1 && expected != currentRead) {
+ return false;
+ }
+
+ if (startPoint < currentRead) {
+ return false;
+ }
+
+ readPoint.set(startPoint);
+ writePoint.set(startPoint);
+ }
+ return true;
}
/**
- * This function starts a MVCC transaction with current region's log change sequence number. Since
- * we set change sequence number when flushing current change to WAL(late binding), the flush
- * order may differ from the order to start a MVCC transaction. For example, a change begins a
- * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
- * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
- * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
- * big number is safe because we only need it to prevent current change being seen and the number
- * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
- * for MVCC to align with flush sequence.
- * @param curSeqNum
- * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
+ * Generate and return a {@link WriteEntry} with a new write number.
+ * To complete the WriteEntry and wait for it to be visible,
+ * call {@link #completeAndWaitForRead(WriteEntry)}.
*/
- public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
- WriteEntry e = new WriteEntry(curSeqNum);
+ public WriteEntry newWriteEntry() {
synchronized (writeQueue) {
+ long nextWriteNumber = writePoint.incrementAndGet();
+ WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
return e;
}
}
/**
- * Complete a {@link WriteEntry} that was created by
- * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
- * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
- * visible to MVCC readers.
- * @throws IOException
- */
- public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
- throws IOException {
- if(e == null) return;
- if (seqId != null) {
- e.setWriteNumber(seqId.getSequenceId());
- } else {
- // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
- // function beginMemstoreInsertWithSeqNum in case of failures
- e.setWriteNumber(NO_WRITE_NUMBER);
- }
- waitForPreviousTransactionsComplete(e);
- }
-
- /**
- * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
- * end of this call, the global read point is at least as large as the write point of the passed
- * in WriteEntry. Thus, the write is visible to MVCC readers.
+ * Complete a {@link WriteEntry} that was created by {@link #newWriteEntry()}.
+ *
+ * At the end of this call, the global read point is at least as large as the write point
+ * of the passed in WriteEntry. Thus, the write is visible to MVCC readers.
*/
- public void completeMemstoreInsert(WriteEntry e) {
- waitForPreviousTransactionsComplete(e);
+ public void completeAndWaitForRead(WriteEntry e) {
+ complete(e);
+ waitForRead(e);
}
/**
@@ -145,94 +125,63 @@ public class MultiVersionConcurrencyControl {
* are also completed. Then, the read point is advanced to the supremum of S.
*
* @param e
+ *
* @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
*/
- boolean advanceMemstore(WriteEntry e) {
- long nextReadValue = -1;
+ public boolean complete(WriteEntry e) {
synchronized (writeQueue) {
e.markCompleted();
+ long nextReadValue = -1;
+ boolean ranOnce=false;
while (!writeQueue.isEmpty()) {
+ ranOnce=true;
WriteEntry queueFirst = writeQueue.getFirst();
+
+ if (nextReadValue > 0) {
+ if (nextReadValue+1 != queueFirst.getWriteNumber()) {
+ throw new RuntimeException("invariant in completeAndWaitForRead violated, prev: "
+ + nextReadValue + " next: " + queueFirst.getWriteNumber());
+ }
+ }
+
if (queueFirst.isCompleted()) {
- // Using Max because Edit complete in WAL sync order not arriving order
- nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
+ nextReadValue = queueFirst.getWriteNumber();
writeQueue.removeFirst();
} else {
break;
}
}
- if (nextReadValue > memstoreRead) {
- memstoreRead = nextReadValue;
+ if (!ranOnce) {
+ throw new RuntimeException("never was a first");
}
- // notify waiters on writeQueue before return
- writeQueue.notifyAll();
- }
-
- if (nextReadValue > 0) {
- synchronized (readWaiters) {
- readWaiters.notifyAll();
- }
- }
-
- if (memstoreRead >= e.getWriteNumber()) {
- return true;
- }
- return false;
- }
-
- /**
- * Advances the current read point to be given seqNum if it is smaller than
- * that.
- */
- void advanceMemstoreReadPointIfNeeded(long seqNum) {
- synchronized (writeQueue) {
- if (this.memstoreRead < seqNum) {
- memstoreRead = seqNum;
+ if (nextReadValue > 0) {
+ synchronized (readWaiters) {
+ readPoint.set(nextReadValue);
+ readWaiters.notifyAll();
+ }
}
+ return readPoint.get() >= e.getWriteNumber();
}
}
/**
- * Wait for all previous MVCC transactions complete
+ * Wait for the global readPoint to advance upto
+ * the specified transaction number.
*/
- public void waitForPreviousTransactionsComplete() {
- WriteEntry w = beginMemstoreInsert();
- waitForPreviousTransactionsComplete(w);
- }
-
- public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
+ public void waitForRead(WriteEntry e) {
boolean interrupted = false;
- WriteEntry w = waitedEntry;
-
- try {
- WriteEntry firstEntry = null;
- do {
- synchronized (writeQueue) {
- // writeQueue won't be empty at this point, the following is just a safety check
- if (writeQueue.isEmpty()) {
- break;
- }
- firstEntry = writeQueue.getFirst();
- if (firstEntry == w) {
- // all previous in-flight transactions are done
- break;
- }
- try {
- writeQueue.wait(0);
- } catch (InterruptedException ie) {
- // We were interrupted... finish the loop -- i.e. cleanup --and then
- // on our way out, reset the interrupt flag.
- interrupted = true;
- break;
- }
+ synchronized (readWaiters) {
+ while (readPoint.get() < e.getWriteNumber()) {
+ try {
+ readWaiters.wait(0);
+ } catch (InterruptedException ie) {
+ // We were interrupted... finish the loop -- i.e. cleanup --and then
+ // on our way out, reset the interrupt flag.
+ interrupted = true;
}
- } while (firstEntry != null);
- } finally {
- if (w != null) {
- advanceMemstore(w);
}
}
if (interrupted) {
@@ -240,14 +189,19 @@ public class MultiVersionConcurrencyControl {
}
}
- public long memstoreReadPoint() {
- return memstoreRead;
+ public long getReadPoint() {
+ return readPoint.get();
}
- public static class WriteEntry {
- private long writeNumber;
- private volatile boolean completed = false;
+ @VisibleForTesting
+ public long getWritePoint() {
+ return writePoint.get();
+ }
+ @InterfaceAudience.Private
+ public static class WriteEntry {
+ private final long writeNumber;
+ private boolean completed = false;
WriteEntry(long writeNumber) {
this.writeNumber = writeNumber;
}
@@ -257,12 +211,9 @@ public class MultiVersionConcurrencyControl {
boolean isCompleted() {
return this.completed;
}
- long getWriteNumber() {
+ public long getWriteNumber() {
return this.writeNumber;
}
- void setWriteNumber(long val){
- this.writeNumber = val;
- }
}
public static final long FIXED_SIZE = ClassSize.align(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index fa69d63..c4fb294 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -63,7 +62,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -1021,27 +1019,12 @@ public class FSHLog implements WAL {
}
}
}
-
- /**
- * @param now
- * @param encodedRegionName Encoded name of the region as returned by
- * HRegionInfo#getEncodedNameAsBytes().
- * @param tableName
- * @param clusterIds that have consumed the change
- * @return New log key.
- */
- protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
- long now, List clusterIds, long nonceGroup, long nonce) {
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
- }
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
justification="Will never be null")
@Override
public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
- final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
- final List memstoreCells) throws IOException {
+ final WALEdit edits, final boolean inMemstore) throws IOException {
if (this.closed) throw new IOException("Cannot append; log is closed");
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
// single consuming thread. Don't have to worry about it.
@@ -1055,9 +1038,9 @@ public class FSHLog implements WAL {
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
- // edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the
- // latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
- entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells);
+ // edit with its edit/sequence id.
+ // TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
+ entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
truck.loadPayload(entry, scope.detach());
} finally {
this.disruptor.getRingBuffer().publish(sequence);
@@ -1083,9 +1066,9 @@ public class FSHLog implements WAL {
private class SyncRunner extends HasThread {
private volatile long sequence;
private final BlockingQueue syncFutures;
-
+
/**
- * UPDATE!
+ * UPDATE!
* @param syncs the batch of calls to sync that arrived as this thread was starting; when done,
* we will put the result of the actual hdfs sync call as the result.
* @param sequence The sequence number on the ring buffer when this thread was set running.
@@ -1133,12 +1116,13 @@ public class FSHLog implements WAL {
// This function releases one sync future only.
return 1;
}
-
+
/**
* Release all SyncFutures whose sequence is <= currentSequence.
* @param currentSequence
* @param t May be non-null if we are processing SyncFutures because an exception was thrown.
* @return Count of SyncFutures we let go.
+ *
*/
private int releaseSyncFutures(final long currentSequence, final Throwable t) {
int syncCount = 0;
@@ -1566,7 +1550,7 @@ public class FSHLog implements WAL {
* 'safe point' while the orchestrating thread does some work that requires the first thread
* paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another
* thread.
- *
+ *
*
Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until
* Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A.
* Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused,
@@ -1574,7 +1558,7 @@ public class FSHLog implements WAL {
* it flags B and then Thread A and Thread B continue along on their merry way. Pause and
* signalling 'zigzags' between the two participating threads. We use two latches -- one the
* inverse of the other -- pausing and signaling when states are achieved.
- *
+ *
*
To start up the drama, Thread A creates an instance of this class each time it would do
* this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot
* only). Thread B notices the new instance (via reading a volatile reference or how ever) and it
@@ -1596,7 +1580,7 @@ public class FSHLog implements WAL {
* Latch to wait on. Will be released when we can proceed.
*/
private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
-
+
/**
* For Thread A to call when it is ready to wait on the 'safe point' to be attained.
* Thread A will be held in here until Thread B calls {@link #safePointAttained()}
@@ -1605,7 +1589,7 @@ public class FSHLog implements WAL {
* @throws InterruptedException
* @throws ExecutionException
* @return The passed syncFuture
- * @throws FailedSyncBeforeLogCloseException
+ * @throws FailedSyncBeforeLogCloseException
*/
SyncFuture waitSafePoint(final SyncFuture syncFuture)
throws InterruptedException, FailedSyncBeforeLogCloseException {
@@ -1617,7 +1601,7 @@ public class FSHLog implements WAL {
}
return syncFuture;
}
-
+
/**
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals
* Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
@@ -1811,8 +1795,7 @@ public class FSHLog implements WAL {
// here inside this single appending/writing thread. Events are ordered on the ringbuffer
// so region sequenceids will also be in order.
regionSequenceId = entry.stampRegionSequenceId();
-
- // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
+ // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
// region sequence id only, a region edit/sequence id that is not associated with an actual
// edit. It has to go through all the rigmarole to be sure we have the right ordering.
if (entry.getEdit().isEmpty()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 1ea9d4f..9cc7eed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
@@ -51,23 +50,18 @@ class FSWALEntry extends Entry {
// The below data members are denoted 'transient' just to highlight these are not persisted;
// they are only in memory and held here while passing over the ring buffer.
private final transient long sequence;
- private final transient AtomicLong regionSequenceIdReference;
private final transient boolean inMemstore;
private final transient HTableDescriptor htd;
private final transient HRegionInfo hri;
- private final transient List memstoreCells;
private final Set familyNames;
FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
- final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
- final HTableDescriptor htd, final HRegionInfo hri, List memstoreCells) {
+ final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) {
super(key, edit);
- this.regionSequenceIdReference = referenceToRegionSequenceId;
this.inMemstore = inMemstore;
this.htd = htd;
this.hri = hri;
this.sequence = sequence;
- this.memstoreCells = memstoreCells;
if (inMemstore) {
// construct familyNames here to reduce the work of log sinker.
ArrayList cells = this.getEdit().getCells();
@@ -110,25 +104,26 @@ class FSWALEntry extends Entry {
return this.sequence;
}
- /**
- * Stamp this edit with a region edit/sequence id.
- * Call when safe to do so: i.e. the context is such that the increment on the passed in
- * {@link #regionSequenceIdReference} is guaranteed aligned w/ how appends are going into the
- * WAL. This method works with {@link #getRegionSequenceId()}. It will block waiting on this
- * method to be called.
- * @return The region edit/sequence id we set for this edit.
- * @throws IOException
- * @see #getRegionSequenceId()
- */
long stampRegionSequenceId() throws IOException {
- long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
- if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) {
- for (Cell cell : this.memstoreCells) {
- CellUtil.setSequenceId(cell, regionSequenceId);
+ long regionSequenceId = WALKey.NO_SEQUENCE_ID;
+ MultiVersionConcurrencyControl mvcc = getKey().getMvcc();
+ MultiVersionConcurrencyControl.WriteEntry we = null;
+
+ if (mvcc != null) {
+ we = mvcc.newWriteEntry();
+ regionSequenceId = we.getWriteNumber();
+ }
+
+ if (!this.getEdit().isReplay() && inMemstore) {
+ for (Cell c:getEdit().getCells()) {
+ CellUtil.setSequenceId(c, regionSequenceId);
}
}
+
+ // This has to stay in this order
WALKey key = getKey();
key.setLogSeqNum(regionSequenceId);
+ key.setWriteEntry(we);
return regionSequenceId;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 8caf8df..e8392ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.Writable;
@@ -73,6 +74,13 @@ public class HLogKey extends WALKey implements Writable {
super(encodedRegionName, tablename, now);
}
+ public HLogKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ final long now,
+ final MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, now, mvcc);
+ }
+
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
@@ -86,9 +94,16 @@ public class HLogKey extends WALKey implements Writable {
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
*/
- public HLogKey(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+ public HLogKey(
+ final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ final long now,
+ List clusterIds,
+ long nonceGroup,
+ long nonce,
+ MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -104,9 +119,14 @@ public class HLogKey extends WALKey implements Writable {
* @param nonceGroup
* @param nonce
*/
- public HLogKey(final byte [] encodedRegionName, final TableName tablename,
- final long now, List clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
+ public HLogKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ final long now,
+ List clusterIds,
+ long nonceGroup,
+ long nonce,
+ final MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -122,10 +142,13 @@ public class HLogKey extends WALKey implements Writable {
* @param nonce
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
- long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce);
+ long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce, mvcc);
}
+ /**
+ * @deprecated Do not use these old Writables serialization methods
+ */
@Override
@Deprecated
public void write(DataOutput out) throws IOException {
@@ -138,7 +161,8 @@ public class HLogKey extends WALKey implements Writable {
Compressor.writeCompressed(this.encodedRegionName, 0,
this.encodedRegionName.length, out,
compressionContext.regionDict);
- Compressor.writeCompressed(this.tablename.getName(), 0, this.tablename.getName().length, out,
+ Compressor.writeCompressed(this.tablename.getName(), 0,
+ this.tablename.getName().length, out,
compressionContext.tableDict);
}
out.writeLong(this.logSeqNum);
@@ -200,10 +224,11 @@ public class HLogKey extends WALKey implements Writable {
}
} else {
try {
- // dummy read (former byte cluster id)
+ // Dummy read (former byte cluster id)
in.readByte();
} catch(EOFException e) {
// Means it's a very old key, just continue
+ if (LOG.isTraceEnabled()) LOG.trace("Old key", e);
}
}
try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
index cb89346..22e9054 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
/**
* An HLogKey specific to WalEdits coming from replay.
@@ -32,13 +33,13 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
public class ReplayHLogKey extends HLogKey {
public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
- final long now, List clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
+ final long now, List clusterIds, long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc);
}
public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) {
- super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+ long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc) {
+ super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 399623f..3d25a27 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -20,12 +20,9 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -34,6 +31,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -53,14 +51,18 @@ public class WALUtil {
* This provides info to the HMaster to allow it to recover the compaction if
* this regionserver dies in the middle (This part is not yet implemented). It also prevents
* the compaction from finishing if this regionserver has already lost its lease on the log.
- * @param sequenceId Used by WAL to get sequence Id for the waledit.
+ * @param mvcc Used by WAL to get sequence Id for the waledit.
*/
- public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
- final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
+ public static void writeCompactionMarker(WAL log,
+ HTableDescriptor htd,
+ HRegionInfo info,
+ final CompactionDescriptor c,
+ MultiVersionConcurrencyControl mvcc) throws IOException {
TableName tn = TableName.valueOf(c.getTableName().toByteArray());
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
+ WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn, System.currentTimeMillis(), mvcc);
+ log.append(htd, info, key, WALEdit.createCompaction(info, c), false);
+ mvcc.complete(key.getWriteEntry());
log.sync();
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
@@ -70,13 +72,17 @@ public class WALUtil {
/**
* Write a flush marker indicating a start / abort or a complete of a region flush
*/
- public static long writeFlushMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
- final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException {
+ public static long writeFlushMarker(WAL log,
+ HTableDescriptor htd,
+ HRegionInfo info,
+ final FlushDescriptor f,
+ boolean sync,
+ MultiVersionConcurrencyControl mvcc) throws IOException {
TableName tn = TableName.valueOf(f.getTableName().toByteArray());
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false,
- null);
+ WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn, System.currentTimeMillis(), mvcc);
+ long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), false);
+ mvcc.complete(key.getWriteEntry());
if (sync) log.sync(trx);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
@@ -88,12 +94,11 @@ public class WALUtil {
* Write a region open marker indicating that the region is opened
*/
public static long writeRegionEventMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
- final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException {
+ final RegionEventDescriptor r) throws IOException {
TableName tn = TableName.valueOf(r.getTableName().toByteArray());
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r),
- sequenceId, false, null);
+ long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r), false);
log.sync(trx);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
@@ -108,27 +113,22 @@ public class WALUtil {
* @param htd A description of the table that we are bulk loading into.
* @param info A description of the region in the table that we are bulk loading into.
* @param descriptor A protocol buffers based description of the client's bulk loading request
- * @param sequenceId The current sequenceId in the log at the time when we were to write the
- * bulk load marker.
* @return txid of this transaction or if nothing to do, the last txid
* @throws IOException We will throw an IOException if we can not append to the HLog.
*/
public static long writeBulkLoadMarkerAndSync(final WAL wal,
final HTableDescriptor htd,
final HRegionInfo info,
- final WALProtos.BulkLoadDescriptor descriptor,
- final AtomicLong sequenceId) throws IOException {
+ final WALProtos.BulkLoadDescriptor descriptor)
+ throws IOException {
TableName tn = info.getTable();
WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
// Add it to the log but the false specifies that we don't need to add it to the memstore
long trx = wal.append(htd,
- info,
- key,
- WALEdit.createBulkLoadEvent(info, descriptor),
- sequenceId,
- false,
- new ArrayList());
+ info,
+ key,
+ WALEdit.createBulkLoadEvent(info, descriptor), false);
wal.sync(trx);
if (LOG.isTraceEnabled()) {
@@ -136,5 +136,4 @@ public class WALUtil {
}
return trx;
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
index f628cee..84d6128 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
@@ -55,7 +55,7 @@ public class HashedBytes {
if (obj == null || getClass() != obj.getClass())
return false;
HashedBytes other = (HashedBytes) obj;
- return Arrays.equals(bytes, other.bytes);
+ return (hashCode == other.hashCode) && Arrays.equals(bytes, other.bytes);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 52becbe..831d88c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -156,7 +156,7 @@ class DisabledWALProvider implements WALProvider {
@Override
public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
- AtomicLong sequenceId, boolean inMemstore, List memstoreKVs) {
+ boolean inMemstore) {
if (!this.listeners.isEmpty()) {
final long start = System.nanoTime();
long len = 0;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 4844487..edc8993 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -21,17 +21,13 @@ package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
-import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
// imports we use from yet-to-be-moved regionsever.wal
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
@@ -115,18 +111,14 @@ public interface WAL {
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
* @param htd used to give scope for replication TODO refactor out in favor of table name and info
- * @param sequenceId A reference to the atomic long the info region is using as
- * source of its incrementing edits sequence id. Inside in this call we will increment it and
- * attach the sequence to the edit we apply the WAL.
* @param inMemstore Always true except for case where we are writing a compaction completion
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
* -- it is not an edit for memstore.
- * @param memstoreKVs list of KVs added into memstore
* @return Returns a 'transaction id' and key will have the region edit/sequence id
* in it.
*/
long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
- AtomicLong sequenceId, boolean inMemstore, List memstoreKVs)
+ boolean inMemstore)
throws IOException;
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 69c2aec..da21110 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -73,6 +74,27 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
public class WALKey implements SequenceId, Comparable {
private static final Log LOG = LogFactory.getLog(WALKey.class);
+ public MultiVersionConcurrencyControl getMvcc() {
+ return mvcc;
+ }
+
+ public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
+ try {
+ this.seqNumAssignedLatch.await();
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread interrupted waiting for write entry");
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
+ }
+ return writeEntry;
+ }
+
+ public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
+ this.writeEntry = writeEntry;
+ this.seqNumAssignedLatch.countDown();
+ }
+
// should be < 0 (@see HLogKey#readFields(DataInput))
// version 2 supports WAL compression
// public members here are only public because of HLogKey
@@ -149,7 +171,9 @@ public class WALKey implements SequenceId, Comparable {
private long nonceGroup = HConstants.NO_NONCE;
private long nonce = HConstants.NO_NONCE;
- static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList());
+ private MultiVersionConcurrencyControl mvcc;
+ private MultiVersionConcurrencyControl.WriteEntry writeEntry;
+ public static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList());
// visible for deprecated HLogKey
@InterfaceAudience.Private
@@ -157,16 +181,17 @@ public class WALKey implements SequenceId, Comparable {
public WALKey() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
- new ArrayList(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+ new ArrayList(), HConstants.NO_NONCE, HConstants.NO_NONCE, null);
}
@VisibleForTesting
- public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
+ public WALKey(final byte[] encodedRegionName, final TableName tablename,
+ long logSeqNum,
final long now, UUID clusterId) {
List clusterIds = new ArrayList();
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
- HConstants.NO_NONCE, HConstants.NO_NONCE);
+ HConstants.NO_NONCE, HConstants.NO_NONCE, null);
}
public WALKey(final byte[] encodedRegionName, final TableName tablename) {
@@ -174,8 +199,28 @@ public class WALKey implements SequenceId, Comparable {
}
public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
- init(encodedRegionName, tablename, NO_SEQUENCE_ID, now,
- EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ init(encodedRegionName,
+ tablename,
+ NO_SEQUENCE_ID,
+ now,
+ EMPTY_UUIDS,
+ HConstants.NO_NONCE,
+ HConstants.NO_NONCE,
+ null);
+ }
+
+ public WALKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ final long now,
+ MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName,
+ tablename,
+ NO_SEQUENCE_ID,
+ now,
+ EMPTY_UUIDS,
+ HConstants.NO_NONCE,
+ HConstants.NO_NONCE,
+ mvcc);
}
/**
@@ -185,15 +230,21 @@ public class WALKey implements SequenceId, Comparable {
*
Used by log splitting and snapshots.
*
* @param encodedRegionName Encoded name of the region as returned by
- * HRegionInfo#getEncodedNameAsBytes().
- * @param tablename - name of table
- * @param logSeqNum - log sequence number
- * @param now Time at which this edit was written.
- * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * HRegionInfo#getEncodedNameAsBytes().
+ * @param tablename - name of table
+ * @param logSeqNum - log sequence number
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
*/
- public WALKey(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+ public WALKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ final long now,
+ List clusterIds,
+ long nonceGroup,
+ long nonce,
+ MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -202,17 +253,18 @@ public class WALKey implements SequenceId, Comparable {
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
- * HRegionInfo#getEncodedNameAsBytes().
+ * HRegionInfo#getEncodedNameAsBytes().
* @param tablename
- * @param now Time at which this edit was written.
- * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup
* @param nonce
+ * @param mvcc mvcc control used to generate sequence numbers and control read/write points
*/
- public WALKey(final byte [] encodedRegionName, final TableName tablename,
- final long now, List clusterIds, long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds,
- nonceGroup, nonce);
+ public WALKey(final byte[] encodedRegionName, final TableName tablename,
+ final long now, List clusterIds, long nonceGroup,
+ final long nonce, final MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
@@ -221,21 +273,37 @@ public class WALKey implements SequenceId, Comparable {
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
- * HRegionInfo#getEncodedNameAsBytes().
+ * HRegionInfo#getEncodedNameAsBytes().
* @param tablename
* @param logSeqNum
* @param nonceGroup
* @param nonce
*/
- public WALKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
- long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTime(),
- EMPTY_UUIDS, nonceGroup, nonce);
+ public WALKey(final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ long nonceGroup,
+ long nonce,
+ final MultiVersionConcurrencyControl mvcc) {
+ init(encodedRegionName,
+ tablename,
+ logSeqNum,
+ EnvironmentEdgeManager.currentTime(),
+ EMPTY_UUIDS,
+ nonceGroup,
+ nonce,
+ mvcc);
}
@InterfaceAudience.Private
- protected void init(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) {
+ protected void init(final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ final long now,
+ List clusterIds,
+ long nonceGroup,
+ long nonce,
+ MultiVersionConcurrencyControl mvcc) {
this.logSeqNum = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
@@ -243,6 +311,7 @@ public class WALKey implements SequenceId, Comparable {
this.tablename = tablename;
this.nonceGroup = nonceGroup;
this.nonce = nonce;
+ this.mvcc = mvcc;
}
/**
@@ -268,15 +337,14 @@ public class WALKey implements SequenceId, Comparable {
}
/**
- * Allow that the log sequence id to be set post-construction and release all waiters on assigned
- * sequence number.
+ * Allow that the log sequence id to be set post-construction
* Only public for org.apache.hadoop.hbase.regionserver.wal.FSWALEntry
* @param sequence
*/
@InterfaceAudience.Private
public void setLogSeqNum(final long sequence) {
this.logSeqNum = sequence;
- this.seqNumAssignedLatch.countDown();
+
}
/**
@@ -484,21 +552,22 @@ public class WALKey implements SequenceId, Comparable {
this.encodedRegionName = encodedRegionName;
}
- public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
- throws IOException {
- org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
+ public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(
+ WALCellCodec.ByteStringCompressor compressor) throws IOException {
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder =
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
if (compressionContext == null) {
builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
} else {
builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
- compressionContext.regionDict));
+ compressionContext.regionDict));
builder.setTableName(compressor.compress(this.tablename.getName(),
- compressionContext.tableDict));
+ compressionContext.tableDict));
}
builder.setLogSequenceNumber(this.logSeqNum);
builder.setWriteTime(writeTime);
- if(this.origLogSeqNum > 0) {
+ if (this.origLogSeqNum > 0) {
builder.setOrigSequenceNumber(this.origLogSeqNum);
}
if (this.nonce != HConstants.NO_NONCE) {
@@ -524,8 +593,9 @@ public class WALKey implements SequenceId, Comparable {
return builder;
}
- public void readFieldsFromPb(
- org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
+ public void readFieldsFromPb(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey,
+ WALCellCodec.ByteStringUncompressor uncompressor)
+ throws IOException {
if (this.compressionContext != null) {
this.encodedRegionName = uncompressor.uncompress(
walKey.getEncodedRegionName(), compressionContext.regionDict);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 51043e4..3741cdf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -2301,7 +2301,7 @@ public class WALSplitter {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
- clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce());
+ clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
logEntry.setFirst(key);
logEntry.setSecond(val);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 7d644bd..474f61b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -285,7 +285,7 @@ public class TestIOFencing {
FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
new Path("store_dir"));
WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
- oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
+ oldHri, compactionDescriptor, compactingRegion.getMVCC());
// Wait till flush has happened, otherwise there won't be multiple store files
long startWaitTime = System.currentTimeMillis();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index a3c106d..d4a5780 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -29,7 +29,6 @@ import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -188,7 +188,6 @@ public class TestWALObserver {
Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
deleteDir(basedir);
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
- final AtomicLong sequenceId = new AtomicLong(0);
// TEST_FAMILY[0] shall be removed from WALEdit.
// TEST_FAMILY[1] value shall be changed.
@@ -237,7 +236,7 @@ public class TestWALObserver {
long now = EnvironmentEdgeManager.currentTime();
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
- edit, sequenceId, true, null);
+ edit, true);
log.sync(txid);
// the edit shall have been change now by the coprocessor.
@@ -273,7 +272,7 @@ public class TestWALObserver {
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
.toString(TEST_TABLE));
final HRegionInfo hri = new HRegionInfo(tableName, null, null);
- final AtomicLong sequenceId = new AtomicLong(0);
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
fs.mkdirs(new Path(FSUtils.getTableDir(hbaseRootDir, tableName), hri.getEncodedName()));
@@ -300,7 +299,7 @@ public class TestWALObserver {
final int countPerFamily = 5;
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
- EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
+ EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
}
LOG.debug("Verify that only the non-legacy CP saw edits.");
@@ -324,7 +323,7 @@ public class TestWALObserver {
final WALEdit edit = new WALEdit();
final byte[] nonce = Bytes.toBytes("1772");
edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
- final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true, null);
+ final long txid = wal.append(htd, hri, legacyKey, edit, true);
wal.sync(txid);
LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
@@ -349,7 +348,7 @@ public class TestWALObserver {
public void testEmptyWALEditAreNotSeen() throws Exception {
final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
- final AtomicLong sequenceId = new AtomicLong(0);
+ final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
WAL log = wals.getWAL(UNSPECIFIED_REGION);
try {
@@ -361,8 +360,8 @@ public class TestWALObserver {
assertFalse(cp.isPostWALWriteCalled());
final long now = EnvironmentEdgeManager.currentTime();
- long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
- new WALEdit(), sequenceId, true, null);
+ long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc),
+ new WALEdit(), true);
log.sync(txid);
assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
@@ -381,7 +380,7 @@ public class TestWALObserver {
// ultimately called by HRegion::initialize()
TableName tableName = TableName.valueOf("testWALCoprocessorReplay");
final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
- final AtomicLong sequenceId = new AtomicLong(0);
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
// final HRegionInfo hri =
// createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
// final HRegionInfo hri1 =
@@ -405,10 +404,9 @@ public class TestWALObserver {
// for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
- EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
+ EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
}
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
- true, null);
+ wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
// sync to fs.
wal.sync();
@@ -528,7 +526,7 @@ public class TestWALObserver {
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
- final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException {
+ final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
String familyStr = Bytes.toString(family);
long txid = -1;
for (int j = 0; j < count; j++) {
@@ -539,7 +537,7 @@ public class TestWALObserver {
// uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care
// about legacy coprocessors
txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
- ee.currentTime()), edit, sequenceId, true, null);
+ ee.currentTime(), mvcc), edit, true);
}
if (-1 != txid) {
wal.sync(txid);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
index d82f36b..5fa588b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
@@ -32,8 +32,8 @@ import org.junit.experimental.categories.Category;
public class TestHLogRecordReader extends TestWALRecordReader {
@Override
- protected WALKey getWalKey(final long sequenceid) {
- return new HLogKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+ protected WALKey getWalKey(final long time) {
+ return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index 1fcb366..962ebe6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -75,6 +76,7 @@ public class TestWALRecordReader {
private static final byte [] value = Bytes.toBytes("value");
private static HTableDescriptor htd;
private static Path logDir;
+ protected MultiVersionConcurrencyControl mvcc;
private static String getName() {
return "TestWALRecordReader";
@@ -82,6 +84,7 @@ public class TestWALRecordReader {
@Before
public void setUp() throws Exception {
+ mvcc = new MultiVersionConcurrencyControl();
FileStatus[] entries = fs.listStatus(hbaseDir);
for (FileStatus dir : entries) {
fs.delete(dir.getPath(), true);
@@ -124,13 +127,11 @@ public class TestWALRecordReader {
// being millisecond based.
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
- final AtomicLong sequenceId = new AtomicLong(0);
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
- log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null);
+ log.append(htd, info, getWalKey(ts), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
- log.append(htd, info, getWalKey(ts+1), edit, sequenceId,
- true, null);
+ log.append(htd, info, getWalKey(ts+1), edit, true);
log.sync();
LOG.info("Before 1st WAL roll " + log.toString());
log.rollWriter();
@@ -141,12 +142,10 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
- log.append(htd, info, getWalKey(ts1+1), edit, sequenceId,
- true, null);
+ log.append(htd, info, getWalKey(ts1+1), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
- log.append(htd, info, getWalKey(ts1+2), edit, sequenceId,
- true, null);
+ log.append(htd, info, getWalKey(ts1+2), edit, true);
log.sync();
log.shutdown();
walfactory.shutdown();
@@ -188,8 +187,7 @@ public class TestWALRecordReader {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
- long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
- null);
+ long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
log.sync(txid);
Thread.sleep(1); // make sure 2nd log gets a later timestamp
@@ -199,8 +197,7 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
- txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
- null);
+ txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
log.sync(txid);
log.shutdown();
walfactory.shutdown();
@@ -239,8 +236,8 @@ public class TestWALRecordReader {
testSplit(splits.get(1));
}
- protected WALKey getWalKey(final long sequenceid) {
- return new WALKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+ protected WALKey getWalKey(final long time) {
+ return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
}
protected WALRecordReader getReader() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index 58ffb86..a174e3a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -1301,7 +1301,7 @@ public class TestDistributedLogSplitting {
e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
wal.append(htd, curRegionInfo,
new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()),
- e, sequenceId, true, null);
+ e, true);
}
wal.sync();
wal.shutdown();
@@ -1396,7 +1396,7 @@ public class TestDistributedLogSplitting {
value++;
e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
- tableName, System.currentTimeMillis()), e, sequenceId, true, null);
+ tableName, System.currentTimeMillis()), e, true);
}
wal.sync();
wal.shutdown();
@@ -1609,7 +1609,7 @@ public class TestDistributedLogSplitting {
e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
log.append(htd, curRegionInfo,
new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName,
- System.currentTimeMillis()), e, sequenceId, true, null);
+ System.currentTimeMillis()), e, true);
if (0 == i % syncEvery) {
log.sync();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index b4bc764..3a77046 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -316,7 +316,6 @@ public class TestAtomicOperation {
*/
@Test
public void testRowMutationMultiThreads() throws IOException {
-
LOG.info("Starting test testRowMutationMultiThreads");
initHRegion(tableName, name.getMethodName(), fam1);
@@ -616,30 +615,33 @@ public class TestAtomicOperation {
}
@Override
- public RowLock getRowLockInternal(final byte[] row, boolean waitForLock) throws IOException {
+ public RowLock getRowLock(final byte[] row, boolean readLock) throws IOException {
if (testStep == TestStep.CHECKANDPUT_STARTED) {
latch.countDown();
}
- return new WrappedRowLock(super.getRowLockInternal(row, waitForLock));
+ return new WrappedRowLock(super.getRowLock(row, readLock));
}
- public class WrappedRowLock extends RowLockImpl {
+ public class WrappedRowLock implements RowLock {
+
+ private final RowLock rowLock;
private WrappedRowLock(RowLock rowLock) {
- setContext(((RowLockImpl)rowLock).getContext());
+ this.rowLock = rowLock;
}
+
@Override
public void release() {
if (testStep == TestStep.INIT) {
- super.release();
+ this.rowLock.release();
return;
}
if (testStep == TestStep.PUT_STARTED) {
try {
testStep = TestStep.PUT_COMPLETED;
- super.release();
+ this.rowLock.release();
// put has been written to the memstore and the row lock has been released, but the
// MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
// operations would cause the non-atomicity to show up:
@@ -657,7 +659,7 @@ public class TestAtomicOperation {
}
}
else if (testStep == TestStep.CHECKANDPUT_STARTED) {
- super.release();
+ this.rowLock.release();
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
index 4ce228f..34278c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
@@ -96,7 +96,7 @@ public class TestBulkLoad {
{
oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)),
with(any(WALKey.class)), with(bulkLogWalEditType(WALEdit.BULK_LOAD)),
- with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class)));
+ with(any(boolean.class)));
will(returnValue(0l));
oneOf(log).sync(with(any(long.class)));
}
@@ -122,8 +122,7 @@ public class TestBulkLoad {
Expectations expection = new Expectations() {
{
oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)),
- with(any(WALKey.class)), with(bulkEventMatcher),
- with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class)));
+ with(any(WALKey.class)), with(bulkEventMatcher), with(any(boolean.class)));
will(returnValue(0l));
oneOf(log).sync(with(any(long.class)));
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index e50260f..6133f0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -124,7 +124,7 @@ public class TestDefaultMemStore extends TestCase {
scanner.close();
}
- memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint());
+ memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
// Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0;
@@ -149,7 +149,7 @@ public class TestDefaultMemStore extends TestCase {
for (KeyValueScanner scanner : memstorescanners) {
scanner.close();
}
- memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint());
+ memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
// Assert that new values are seen in kvset as we scan.
long ts = System.currentTimeMillis();
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
@@ -214,7 +214,7 @@ public class TestDefaultMemStore extends TestCase {
private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2)
throws IOException {
- List memstorescanners = this.memstore.getScanners(mvcc.memstoreReadPoint());
+ List memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
assertEquals(1, memstorescanners.size());
final KeyValueScanner scanner = memstorescanners.get(0);
scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
@@ -249,31 +249,31 @@ public class TestDefaultMemStore extends TestCase {
final byte[] v = Bytes.toBytes("value");
MultiVersionConcurrencyControl.WriteEntry w =
- mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
+ mvcc.newWriteEntry();
KeyValue kv1 = new KeyValue(row, f, q1, v);
kv1.setSequenceId(w.getWriteNumber());
memstore.add(kv1);
- KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
+ KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{});
- mvcc.completeMemstoreInsert(w);
+ mvcc.completeAndWaitForRead(w);
- s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
+ s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{kv1});
- w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
+ w = mvcc.newWriteEntry();
KeyValue kv2 = new KeyValue(row, f, q2, v);
kv2.setSequenceId(w.getWriteNumber());
memstore.add(kv2);
- s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
+ s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{kv1});
- mvcc.completeMemstoreInsert(w);
+ mvcc.completeAndWaitForRead(w);
- s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
+ s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{kv1, kv2});
}
@@ -293,7 +293,7 @@ public class TestDefaultMemStore extends TestCase {
// INSERT 1: Write both columns val1
MultiVersionConcurrencyControl.WriteEntry w =
- mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
+ mvcc.newWriteEntry();
KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setSequenceId(w.getWriteNumber());
@@ -302,14 +302,14 @@ public class TestDefaultMemStore extends TestCase {
KeyValue kv12 = new KeyValue(row, f, q2, v1);
kv12.setSequenceId(w.getWriteNumber());
memstore.add(kv12);
- mvcc.completeMemstoreInsert(w);
+ mvcc.completeAndWaitForRead(w);
// BEFORE STARTING INSERT 2, SEE FIRST KVS
- KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
+ KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START INSERT 2: Write both columns val2
- w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
+ w = mvcc.newWriteEntry();
KeyValue kv21 = new KeyValue(row, f, q1, v2);
kv21.setSequenceId(w.getWriteNumber());
memstore.add(kv21);
@@ -319,16 +319,16 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(kv22);
// BEFORE COMPLETING INSERT 2, SEE FIRST KVS
- s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
+ s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// COMPLETE INSERT 2
- mvcc.completeMemstoreInsert(w);
+ mvcc.completeAndWaitForRead(w);
// NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS.
// See HBASE-1485 for discussion about what we should do with
// the duplicate-TS inserts
- s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
+ s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12});
}
@@ -345,7 +345,7 @@ public class TestDefaultMemStore extends TestCase {
final byte[] v1 = Bytes.toBytes("value1");
// INSERT 1: Write both columns val1
MultiVersionConcurrencyControl.WriteEntry w =
- mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
+ mvcc.newWriteEntry();
KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setSequenceId(w.getWriteNumber());
@@ -354,28 +354,28 @@ public class TestDefaultMemStore extends TestCase {
KeyValue kv12 = new KeyValue(row, f, q2, v1);
kv12.setSequenceId(w.getWriteNumber());
memstore.add(kv12);
- mvcc.completeMemstoreInsert(w);
+ mvcc.completeAndWaitForRead(w);
// BEFORE STARTING INSERT 2, SEE FIRST KVS
- KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
+ KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START DELETE: Insert delete for one of the columns
- w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
+ w = mvcc.newWriteEntry();
KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
KeyValue.Type.DeleteColumn);
kvDel.setSequenceId(w.getWriteNumber());
memstore.add(kvDel);
// BEFORE COMPLETING DELETE, SEE FIRST KVS
- s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
+ s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12});
// COMPLETE DELETE
- mvcc.completeMemstoreInsert(w);
+ mvcc.completeAndWaitForRead(w);
// NOW WE SHOULD SEE DELETE
- s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
+ s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12});
}
@@ -419,7 +419,7 @@ public class TestDefaultMemStore extends TestCase {
private void internalRun() throws IOException {
for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
MultiVersionConcurrencyControl.WriteEntry w =
- mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
+ mvcc.newWriteEntry();
// Insert the sequence value (i)
byte[] v = Bytes.toBytes(i);
@@ -427,10 +427,10 @@ public class TestDefaultMemStore extends TestCase {
KeyValue kv = new KeyValue(row, f, q1, i, v);
kv.setSequenceId(w.getWriteNumber());
memstore.add(kv);
- mvcc.completeMemstoreInsert(w);
+ mvcc.completeAndWaitForRead(w);
// Assert that we can read back
- KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
+ KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
s.seek(kv);
Cell ret = s.next();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 826c9b3..4f679cf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -57,6 +57,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -835,7 +836,7 @@ public class TestHRegion {
.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
- this.region.getRegionInfo(), compactionDescriptor, new AtomicLong(1));
+ this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
@@ -1061,9 +1062,8 @@ public class TestHRegion {
IsFlushWALMarker isFlushWALMarker = new IsFlushWALMarker(FlushAction.START_FLUSH);
// throw exceptions if the WalEdit is a start flush action
- when(wal.append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
- (WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(),
- (List)any()))
+ when(wal.append((HTableDescriptor) any(), (HRegionInfo) any(), (WALKey) any(),
+ (WALEdit) argThat(isFlushWALMarker), Mockito.anyBoolean()))
.thenThrow(new IOException("Fail to append flush marker"));
// start cache flush will throw exception
@@ -1445,14 +1445,19 @@ public class TestHRegion {
LOG.info("batchPut will have to break into four batches to avoid row locks");
RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
- RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_4"));
- RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_6"));
+ RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1"));
+ RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
+ RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
+
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
final AtomicReference retFromThread = new AtomicReference();
+ final CountDownLatch startingPuts = new CountDownLatch(1);
+ final CountDownLatch startingClose = new CountDownLatch(1);
TestThread putter = new TestThread(ctx) {
@Override
public void doWork() throws IOException {
+ startingPuts.countDown();
retFromThread.set(region.batchMutate(puts));
}
};
@@ -1460,43 +1465,38 @@ public class TestHRegion {
ctx.addThread(putter);
ctx.startThreads();
- LOG.info("...waiting for put thread to sync 1st time");
- waitForCounter(source, "syncTimeNumOps", syncs + 1);
-
// Now attempt to close the region from another thread. Prior to HBASE-12565
// this would cause the in-progress batchMutate operation to to fail with
// exception because it use to release and re-acquire the close-guard lock
// between batches. Caller then didn't get status indicating which writes succeeded.
// We now expect this thread to block until the batchMutate call finishes.
- Thread regionCloseThread = new Thread() {
+ Thread regionCloseThread = new TestThread(ctx) {
@Override
- public void run() {
+ public void doWork() {
try {
+ startingPuts.await();
+ // Give some time for the batch mutate to get in.
+ // We don't want to race with the mutate
+ Thread.sleep(10);
+ startingClose.countDown();
HBaseTestingUtility.closeRegionAndWAL(region);
} catch (IOException e) {
throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
}
};
regionCloseThread.start();
+ startingClose.await();
+ startingPuts.await();
+ Thread.sleep(100);
LOG.info("...releasing row lock 1, which should let put thread continue");
rowLock1.release();
-
- LOG.info("...waiting for put thread to sync 2nd time");
- waitForCounter(source, "syncTimeNumOps", syncs + 2);
-
- LOG.info("...releasing row lock 2, which should let put thread continue");
rowLock2.release();
-
- LOG.info("...waiting for put thread to sync 3rd time");
- waitForCounter(source, "syncTimeNumOps", syncs + 3);
-
- LOG.info("...releasing row lock 3, which should let put thread continue");
rowLock3.release();
-
- LOG.info("...waiting for put thread to sync 4th time");
- waitForCounter(source, "syncTimeNumOps", syncs + 4);
+ waitForCounter(source, "syncTimeNumOps", syncs + 1);
LOG.info("...joining on put thread");
ctx.stop();
@@ -1507,6 +1507,7 @@ public class TestHRegion {
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
codes[i].getOperationStatusCode());
}
+ rowLock4.release();
} finally {
HBaseTestingUtility.closeRegionAndWAL(this.region);
this.region = null;
@@ -4684,7 +4685,7 @@ public class TestHRegion {
//verify append called or not
verify(wal, expectAppend ? times(1) : never())
.append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
- (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List)any());
+ (WALEdit)any(), Mockito.anyBoolean());
// verify sync called or not
if (expectSync || expectSyncFromLogSyncer) {
@@ -5846,7 +5847,7 @@ public class TestHRegion {
TEST_UTIL.getConfiguration(), rss, null);
verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
- , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any());
+ , editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getValue();
assertNotNull(edit);
@@ -5960,7 +5961,7 @@ public class TestHRegion {
// verify that we have not appended region open event to WAL because this region is still
// recovering
verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
- , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any());
+ , editCaptor.capture(), anyBoolean());
// not put the region out of recovering state
new FinishRegionRecoveringHandler(rss, region.getRegionInfo().getEncodedName(), "/foo")
@@ -5968,7 +5969,7 @@ public class TestHRegion {
// now we should have put the entry
verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
- , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any());
+ , editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getValue();
assertNotNull(edit);
@@ -6032,7 +6033,7 @@ public class TestHRegion {
// 2 times, one for region open, the other close region
verify(wal, times(2)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
- editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any());
+ editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getAllValues().get(1);
assertNotNull(edit);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index c065ee7..261fcc6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -35,7 +35,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -759,7 +758,7 @@ public class TestHRegionReplayEvents {
// ensure all files are visible in secondary
for (Store store : secondaryRegion.getStores()) {
- assertTrue(store.getMaxSequenceId() <= secondaryRegion.getSequenceId().get());
+ assertTrue(store.getMaxSequenceId() <= secondaryRegion.getSequenceId());
}
LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
@@ -1056,7 +1055,7 @@ public class TestHRegionReplayEvents {
// TODO: what to do with this?
// assert that the newly picked up flush file is visible
- long readPoint = secondaryRegion.getMVCC().memstoreReadPoint();
+ long readPoint = secondaryRegion.getMVCC().getReadPoint();
assertEquals(flushSeqId, readPoint);
// after replay verify that everything is still visible
@@ -1074,7 +1073,7 @@ public class TestHRegionReplayEvents {
HRegion region = initHRegion(tableName, method, family);
try {
// replay an entry that is bigger than current read point
- long readPoint = region.getMVCC().memstoreReadPoint();
+ long readPoint = region.getMVCC().getReadPoint();
long origSeqId = readPoint + 100;
Put put = new Put(row).add(family, row, row);
@@ -1085,7 +1084,7 @@ public class TestHRegionReplayEvents {
assertGet(region, family, row);
// region seqId should have advanced at least to this seqId
- assertEquals(origSeqId, region.getSequenceId().get());
+ assertEquals(origSeqId, region.getSequenceId());
// replay an entry that is smaller than current read point
// caution: adding an entry below current read point might cause partial dirty reads. Normal
@@ -1114,7 +1113,7 @@ public class TestHRegionReplayEvents {
// test for region open and close
secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
- (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any());
+ (WALKey)any(), (WALEdit)any(), anyBoolean());
// test for replay prepare flush
putDataByReplay(secondaryRegion, 0, 10, cq, families);
@@ -1128,11 +1127,11 @@ public class TestHRegionReplayEvents {
.build());
verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
- (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any());
+ (WALKey)any(), (WALEdit)any(), anyBoolean());
secondaryRegion.close();
verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
- (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any());
+ (WALKey)any(), (WALEdit)any(), anyBoolean());
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java
index 9286e0d..d19d709 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -246,6 +247,14 @@ public class TestKeepDeletes {
Put p = new Put(T1, ts);
p.add(c0, c0, T1);
region.put(p);
+
+ Get gOne = new Get(T1);
+ gOne.setMaxVersions();
+ gOne.setTimeRange(0L, ts + 1);
+ Result rOne = region.get(gOne);
+ assertFalse(rOne.isEmpty());
+
+
Delete d = new Delete(T1, ts+2);
d.deleteColumn(c0, c0, ts);
region.delete(d);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java
index 7b6e7b3..7032b3b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java
@@ -50,7 +50,7 @@ public class TestMultiVersionConcurrencyControl extends TestCase {
AtomicLong startPoint = new AtomicLong();
while (!finished.get()) {
MultiVersionConcurrencyControl.WriteEntry e =
- mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet());
+ mvcc.newWriteEntry();
// System.out.println("Begin write: " + e.getWriteNumber());
// 10 usec - 500usec (including 0)
int sleepTime = rnd.nextInt(500);
@@ -61,7 +61,7 @@ public class TestMultiVersionConcurrencyControl extends TestCase {
} catch (InterruptedException e1) {
}
try {
- mvcc.completeMemstoreInsert(e);
+ mvcc.completeAndWaitForRead(e);
} catch (RuntimeException ex) {
// got failure
System.out.println(ex.toString());
@@ -84,9 +84,9 @@ public class TestMultiVersionConcurrencyControl extends TestCase {
final AtomicLong failedAt = new AtomicLong();
Runnable reader = new Runnable() {
public void run() {
- long prev = mvcc.memstoreReadPoint();
+ long prev = mvcc.getReadPoint();
while (!finished.get()) {
- long newPrev = mvcc.memstoreReadPoint();
+ long newPrev = mvcc.getReadPoint();
if (newPrev < prev) {
// serious problem.
System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
index 33ae07b..5bcf6e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
index ed0ac25..ad085f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
@@ -156,7 +156,7 @@ public class TestStoreFileRefresherChore {
}
}
- @Test (timeout = 60000)
+ @Test
public void testIsStale() throws IOException {
int period = 0;
byte[][] families = new byte[][] {Bytes.toBytes("cf")};
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 28ae46a..f86bdd5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -29,7 +29,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
@@ -54,7 +53,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -152,15 +151,21 @@ public class TestFSHLog {
}
}
- protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times,
- AtomicLong sequenceId) throws IOException {
+ protected void addEdits(WAL log,
+ HRegionInfo hri,
+ HTableDescriptor htd,
+ int times,
+ MultiVersionConcurrencyControl mvcc)
+ throws IOException {
final byte[] row = Bytes.toBytes("row");
for (int i = 0; i < times; i++) {
long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, row, row, timestamp, row));
- log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp),
- cols, sequenceId, true, null);
+ WALKey key = new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(),
+ WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
+ HConstants.NO_NONCE, mvcc);
+ log.append(htd, hri, key, cols, true);
}
log.sync();
}
@@ -253,15 +258,13 @@ public class TestFSHLog {
new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
HRegionInfo hri2 =
new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
- // variables to mock region sequenceIds
- final AtomicLong sequenceId1 = new AtomicLong(1);
- final AtomicLong sequenceId2 = new AtomicLong(1);
// add edits and roll the wal
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
try {
- addEdits(wal, hri1, t1, 2, sequenceId1);
+ addEdits(wal, hri1, t1, 2, mvcc);
wal.rollWriter();
// add some more edits and roll the wal. This would reach the log number threshold
- addEdits(wal, hri1, t1, 2, sequenceId1);
+ addEdits(wal, hri1, t1, 2, mvcc);
wal.rollWriter();
// with above rollWriter call, the max logs limit is reached.
assertTrue(wal.getNumRolledLogFiles() == 2);
@@ -272,7 +275,7 @@ public class TestFSHLog {
assertEquals(1, regionsToFlush.length);
assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
// insert edits in second region
- addEdits(wal, hri2, t2, 2, sequenceId2);
+ addEdits(wal, hri2, t2, 2, mvcc);
// get the regions to flush, it should still read region1.
regionsToFlush = wal.findRegionsToForceFlush();
assertEquals(regionsToFlush.length, 1);
@@ -289,12 +292,12 @@ public class TestFSHLog {
// no wal should remain now.
assertEquals(0, wal.getNumRolledLogFiles());
// add edits both to region 1 and region 2, and roll.
- addEdits(wal, hri1, t1, 2, sequenceId1);
- addEdits(wal, hri2, t2, 2, sequenceId2);
+ addEdits(wal, hri1, t1, 2, mvcc);
+ addEdits(wal, hri2, t2, 2, mvcc);
wal.rollWriter();
// add edits and roll the writer, to reach the max logs limit.
assertEquals(1, wal.getNumRolledLogFiles());
- addEdits(wal, hri1, t1, 2, sequenceId1);
+ addEdits(wal, hri1, t1, 2, mvcc);
wal.rollWriter();
// it should return two regions to flush, as the oldest wal file has entries
// for both regions.
@@ -306,7 +309,7 @@ public class TestFSHLog {
wal.rollWriter(true);
assertEquals(0, wal.getNumRolledLogFiles());
// Add an edit to region1, and roll the wal.
- addEdits(wal, hri1, t1, 2, sequenceId1);
+ addEdits(wal, hri1, t1, 2, mvcc);
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
wal.rollWriter();
@@ -399,18 +402,18 @@ public class TestFSHLog {
for (int i = 0; i < countPerFamily; i++) {
final HRegionInfo info = region.getRegionInfo();
final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis(), clusterIds, -1, -1);
- wal.append(htd, info, logkey, edits, region.getSequenceId(), true, null);
+ System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC());
+ wal.append(htd, info, logkey, edits, true);
}
region.flush(true);
// FlushResult.flushSequenceId is not visible here so go get the current sequence id.
- long currentSequenceId = region.getSequenceId().get();
+ long currentSequenceId = region.getSequenceId();
// Now release the appends
goslow.setValue(false);
synchronized (goslow) {
goslow.notifyAll();
}
- assertTrue(currentSequenceId >= region.getSequenceId().get());
+ assertTrue(currentSequenceId >= region.getSequenceId());
} finally {
region.close(true);
wal.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
index 4e07040..f123b05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
@@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.Assert;
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -191,8 +190,9 @@ public class TestLogRollAbort {
HRegionInfo regioninfo = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
final WAL log = wals.getWAL(regioninfo.getEncodedNameAsBytes());
-
- final AtomicLong sequenceId = new AtomicLong(1);
+
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+ mvcc.initialize(1);
final int total = 20;
for (int i = 0; i < total; i++) {
@@ -201,7 +201,7 @@ public class TestLogRollAbort {
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor("column"));
log.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis()), kvs, sequenceId, true, null);
+ System.currentTimeMillis(), mvcc), kvs, true);
}
// Send the data to HDFS datanodes and close the HDFS writer
log.sync();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 41e05ae..40c96a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -123,7 +123,8 @@ public class TestLogRollingNoCluster {
@Override
public void run() {
this.log.info(getName() +" started");
- final AtomicLong sequenceId = new AtomicLong(1);
+ final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+ mvcc.initialize(0);
try {
for (int i = 0; i < this.count; i++) {
long now = System.currentTimeMillis();
@@ -137,7 +138,7 @@ public class TestLogRollingNoCluster {
final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
final HTableDescriptor htd = TEST_UTIL.getMetaTableDescriptor();
final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
- TableName.META_TABLE_NAME, now), edit, sequenceId, true, null);
+ TableName.META_TABLE_NAME, now, mvcc), edit, true);
wal.sync(txid);
}
String msg = getName() + " finished";
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
index 69482d1..0351430 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
@@ -88,7 +88,6 @@ public class TestWALActionsListener {
list.add(observer);
final WALFactory wals = new WALFactory(conf, list, "testActionListener");
DummyWALActionsListener laterobserver = new DummyWALActionsListener();
- final AtomicLong sequenceId = new AtomicLong(1);
HRegionInfo hri = new HRegionInfo(TableName.valueOf(SOME_BYTES),
SOME_BYTES, SOME_BYTES, false);
final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes());
@@ -102,7 +101,7 @@ public class TestWALActionsListener {
htd.addFamily(new HColumnDescriptor(b));
final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
- TableName.valueOf(b), 0), edit, sequenceId, true, null);
+ TableName.valueOf(b), 0), edit, true);
wal.sync(txid);
if (i == 10) {
wal.registerWALActionsListener(laterobserver);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index c943d12..1c97a2d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,17 +64,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
-import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
-import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
-import org.apache.hadoop.hbase.regionserver.FlushRequester;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.*;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -281,6 +270,8 @@ public class TestWALReplay {
// Ensure edits are replayed properly.
final TableName tableName =
TableName.valueOf("test2727");
+
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
deleteDir(basedir);
@@ -293,10 +284,10 @@ public class TestWALReplay {
WAL wal1 = createWAL(this.conf);
// Add 1k to each family.
final int countPerFamily = 1000;
- final AtomicLong sequenceId = new AtomicLong(1);
+
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
- wal1, htd, sequenceId);
+ wal1, htd, mvcc);
}
wal1.shutdown();
runWALSplit(this.conf);
@@ -305,7 +296,7 @@ public class TestWALReplay {
// Add 1k to each family.
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
- ee, wal2, htd, sequenceId);
+ ee, wal2, htd, mvcc);
}
wal2.shutdown();
runWALSplit(this.conf);
@@ -316,10 +307,10 @@ public class TestWALReplay {
long seqid = region.getOpenSeqNum();
// The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
// When opened, this region would apply 6k edits, and increment the sequenceId by 1
- assertTrue(seqid > sequenceId.get());
- assertEquals(seqid - 1, sequenceId.get());
+ assertTrue(seqid > mvcc.getWritePoint());
+ assertEquals(seqid - 1, mvcc.getWritePoint());
LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: "
- + sequenceId.get());
+ + mvcc.getReadPoint());
// TODO: Scan all.
region.close();
@@ -771,6 +762,7 @@ public class TestWALReplay {
public void testReplayEditsWrittenIntoWAL() throws Exception {
final TableName tableName =
TableName.valueOf("testReplayEditsWrittenIntoWAL");
+ final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
deleteDir(basedir);
@@ -781,14 +773,13 @@ public class TestWALReplay {
final WAL wal = createWAL(this.conf);
final byte[] rowName = tableName.getName();
final byte[] regionName = hri.getEncodedNameAsBytes();
- final AtomicLong sequenceId = new AtomicLong(1);
// Add 1k to each family.
final int countPerFamily = 1000;
Set familyNames = new HashSet();
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
- ee, wal, htd, sequenceId);
+ ee, wal, htd, mvcc);
familyNames.add(hcd.getName());
}
@@ -801,16 +792,13 @@ public class TestWALReplay {
long now = ee.currentTime();
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
now, rowName));
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
- true, null);
+ wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
// Delete the c family to verify deletes make it over.
edit = new WALEdit();
now = ee.currentTime();
- edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now,
- KeyValue.Type.DeleteFamily));
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
- true, null);
+ edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
+ wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
// Sync.
wal.sync();
@@ -842,12 +830,17 @@ public class TestWALReplay {
Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
flushcount.incrementAndGet();
return fs;
- };
+ }
};
+ // The seq id this region has opened up with
long seqid = region.initialize();
+
+ // The mvcc readpoint of from inserting data.
+ long writePoint = mvcc.getWritePoint();
+
// We flushed during init.
assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
- assertTrue(seqid - 1 == sequenceId.get());
+ assertTrue((seqid - 1) == writePoint);
Get get = new Get(rowName);
Result result = region.get(get);
@@ -889,7 +882,7 @@ public class TestWALReplay {
for (HColumnDescriptor hcd : htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
}
- long lastestSeqNumber = region.getSequenceId().get();
+ long lastestSeqNumber = region.getSequenceId();
// get the current seq no
wal.doCompleteCacheFlush = true;
// allow complete cache flush with the previous seq number got after first
@@ -992,7 +985,7 @@ public class TestWALReplay {
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
- final HTableDescriptor htd, final AtomicLong sequenceId)
+ final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc)
throws IOException {
String familyStr = Bytes.toString(family);
for (int j = 0; j < count; j++) {
@@ -1001,8 +994,8 @@ public class TestWALReplay {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes,
ee.currentTime(), columnBytes));
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, ee.currentTime()),
- edit, sequenceId, true, null);
+ wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,999, mvcc),
+ edit, true);
}
wal.sync();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 571be26..882ed2c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
@@ -189,9 +190,10 @@ public class TestReplicationSourceManager {
@Test
public void testLogRoll() throws Exception {
- long seq = 0;
long baseline = 1000;
long time = baseline;
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+ mvcc.initialize(0);
KeyValue kv = new KeyValue(r1, f1, r1);
WALEdit edit = new WALEdit();
edit.add(kv);
@@ -211,8 +213,11 @@ public class TestReplicationSourceManager {
wal.rollWriter();
}
LOG.info(i);
- final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test,
- System.currentTimeMillis()), edit, sequenceId, true ,null);
+ final long txid = wal.append(htd,
+ hri,
+ new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+ edit,
+ true);
wal.sync(txid);
}
@@ -224,8 +229,10 @@ public class TestReplicationSourceManager {
LOG.info(baseline + " and " + time);
for (int i = 0; i < 3; i++) {
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test,
- System.currentTimeMillis()), edit, sequenceId, true, null);
+ wal.append(htd, hri,
+ new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+ edit,
+ true);
}
wal.sync();
@@ -236,8 +243,10 @@ public class TestReplicationSourceManager {
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false, false);
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test,
- System.currentTimeMillis()), edit, sequenceId, true, null);
+ wal.append(htd, hri,
+ new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+ edit,
+ true);
wal.sync();
assertEquals(1, manager.getWALs().size());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
index 577f0ba..abe8403 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL;
@@ -74,7 +75,7 @@ public class TestReplicationWALReaderManager {
private PathWatcher pathWatcher;
private int nbRows;
private int walEditKVs;
- private final AtomicLong sequenceId = new AtomicLong(1);
+ private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
@Parameters
public static Collection