diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 4cca2d4..9857f64 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -284,15 +285,29 @@ public class KeyValue implements Cell, HeapSize, Cloneable { // used to achieve atomic operations in the memstore. @Override public long getMvccVersion() { + if (mvcc == null) { + return 0; + } + return mvcc.longValue(); + } + + public MutableLong getMvccVersionReference() { return mvcc; } public void setMvccVersion(long mvccVersion){ + if(this.mvcc == null){ + this.mvcc = new MutableLong(); + } + this.mvcc.setValue(mvccVersion); + } + + public void setMvccVersion(MutableLong mvccVersion){ this.mvcc = mvccVersion; } // multi-version concurrency control version. default value is 0, aka do not care. - private long mvcc = 0; // this value is not part of a serialized KeyValue (not in HFiles) + private MutableLong mvcc; // this value is not part of a serialized KeyValue (not in HFiles) /** Dragon time over, return to normal business */ @@ -1083,7 +1098,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { // Important to clone the memstoreTS as well - otherwise memstore's // update-in-place methods (eg increment) will end up creating // new entries - ret.setMvccVersion(mvcc); + ret.setMvccVersion(this.getMvccVersion()); return ret; } @@ -1094,7 +1109,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { */ public KeyValue shallowCopy() { KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length); - shallowCopy.setMvccVersion(this.mvcc); + shallowCopy.setMvccVersion(this.getMvccVersion()); return shallowCopy; } @@ -1109,7 +1124,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { return "empty"; } return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + - "/vlen=" + getValueLength() + "/mvcc=" + mvcc; + "/vlen=" + getValueLength() + "/mvcc=" + this.getMvccVersion(); } /** @@ -2585,7 +2600,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { sum += ClassSize.align(ClassSize.ARRAY);// "bytes" sum += ClassSize.align(length);// number of bytes of data in the "bytes" array sum += 2 * Bytes.SIZEOF_INT;// offset, length - sum += Bytes.SIZEOF_LONG;// memstoreTS + sum += ClassSize.REFERENCE;// pointer to mvcc return ClassSize.align(sum); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 51f22d8..90ebfff 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -269,7 +269,7 @@ public class DefaultMemStore implements MemStore { assert alloc.getBytes() != null; alloc.put(0, kv.getBuffer(), kv.getOffset(), len); KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len); - newKv.setMvccVersion(kv.getMvccVersion()); + newKv.setMvccVersion(kv.getMvccVersionReference()); return newKv; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9b0c0e3..2e786ca 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -825,10 +826,11 @@ public class HRegion implements HeapSize { // , Writable{ } } } - mvcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1); + mvcc.initialize(maxSeqId); return maxSeqId; } @@ -1689,7 +1691,7 @@ public class HRegion implements HeapSize { // , Writable{ // wal can be null replaying edits. return wal != null? new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, - getNextSequenceId(wal, startTime), "Nothing to flush"): + getNextSequenceId(wal), "Nothing to flush"): new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"); } } finally { @@ -1719,58 +1721,68 @@ public class HRegion implements HeapSize { // , Writable{ getRegionInfo().getEncodedName()); List storeFlushCtxs = new ArrayList(stores.size()); long flushSeqId = -1L; - try { - // Record the mvcc for all transactions in progress. - w = mvcc.beginMemstoreInsert(); - mvcc.advanceMemstore(w); - if (wal != null) { - if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { - // This should never happen. - String msg = "Flush will not be started for [" - + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; - status.setStatus(msg); - return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); - } - // Get a sequence id that we can use to denote the flush. It will be one beyond the last - // edit that made it into the hfile (the below does not add an edit, it just asks the - // WAL system to return next sequence edit). - flushSeqId = getNextSequenceId(wal, startTime); - } else { - // use the provided sequence Id as WAL is not being used for this flush. - flushSeqId = myseqid; - } - - for (Store s : stores.values()) { - totalFlushableSize += s.getFlushableSize(); - storeFlushCtxs.add(s.createFlushContext(flushSeqId)); - } - // Prepare flush (take a snapshot) - for (StoreFlushContext flush : storeFlushCtxs) { - flush.prepare(); + try{ + try { + // Record the mvcc for all transactions in progress. + w = mvcc.beginMemstoreInsert(this.sequenceId.incrementAndGet()); + + if (wal != null) { + if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { + // This should never happen. + String msg = "Flush will not be started for [" + + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; + status.setStatus(msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); + } + // Get a sequence id that we can use to denote the flush. It will be one beyond the last + // edit that made it into the hfile (the below does not add an edit, it just asks the + // WAL system to return next sequence edit). + flushSeqId = getNextSequenceId(wal); + } else { + // use the provided sequence Id as WAL is not being used for this flush. + flushSeqId = myseqid; + } + + for (Store s : stores.values()) { + totalFlushableSize += s.getFlushableSize(); + storeFlushCtxs.add(s.createFlushContext(flushSeqId)); + } + + // Prepare flush (take a snapshot) + for (StoreFlushContext flush : storeFlushCtxs) { + flush.prepare(); + } + } finally { + this.updatesLock.writeLock().unlock(); } + String s = "Finished memstore snapshotting " + this + + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; + status.setStatus(s); + if (LOG.isTraceEnabled()) LOG.trace(s); + + // sync unflushed WAL changes when deferred log sync is enabled + // see HBASE-8208 for details + if (wal != null && !shouldSyncLog()) wal.sync(); + + // wait for all in-progress transactions to commit to HLog before + // we can start the flush. This prevents + // uncommitted transactions from being written into HFiles. + // We have to block before we start the flush, otherwise keys that + // were removed via a rollbackMemstore could be written to Hfiles. + mvcc.waitForPreviousTransactoinsComplete(w); + w = null; + + s = "Flushing stores of " + this; + status.setStatus(s); + if (LOG.isTraceEnabled()) LOG.trace(s); + } finally { - this.updatesLock.writeLock().unlock(); + if(w != null){ + // in case of failure just mark current w as complete + mvcc.advanceMemstore(w); + } } - String s = "Finished memstore snapshotting " + this + - ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; - status.setStatus(s); - if (LOG.isTraceEnabled()) LOG.trace(s); - - // sync unflushed WAL changes when deferred log sync is enabled - // see HBASE-8208 for details - if (wal != null && !shouldSyncLog()) wal.sync(); - - // wait for all in-progress transactions to commit to HLog before - // we can start the flush. This prevents - // uncommitted transactions from being written into HFiles. - // We have to block before we start the flush, otherwise keys that - // were removed via a rollbackMemstore could be written to Hfiles. - mvcc.waitForRead(w); - - s = "Flushing stores of " + this; - status.setStatus(s); - if (LOG.isTraceEnabled()) LOG.trace(s); // Any failure from here on out will be catastrophic requiring server // restart so hlog content can be replayed and put back into the memstore. @@ -1857,13 +1869,9 @@ public class HRegion implements HeapSize { // , Writable{ * @return Next sequence number unassociated with any actual edit. * @throws IOException */ - private long getNextSequenceId(final HLog wal, final long now) throws IOException { - HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable()); - // Call append but with an empty WALEdit. The returned seqeunce id will not be associated - // with any edit and we can be sure it went in after all outstanding appends. - wal.appendNoSync(getTableDesc(), getRegionInfo(), key, - WALEdit.EMPTY_WALEDIT, this.sequenceId, false); - return key.getLogSeqNum(); + private long getNextSequenceId(final HLog wal) throws IOException { + HLogKey key = this.appendNoSyncFakedWALEdit(wal, new MutableLong(HLog.NO_SEQUENCE_ID)); + return key.waitForLogSeqNumAssigned(); } ////////////////////////////////////////////////////////////////////////////// @@ -2352,6 +2360,8 @@ public class HRegion implements HeapSize { // , Writable{ int lastIndexExclusive = firstIndex; boolean success = false; int noOfPuts = 0, noOfDeletes = 0; + HLogKey walKey = null; + MutableLong mvccNum = new MutableLong(); try { // ------------------------------------ // STEP 1. Try to acquire as many locks as we can, and ensure @@ -2471,13 +2481,13 @@ public class HRegion implements HeapSize { // , Writable{ lock(this.updatesLock.readLock(), numReadyToWrite); locked = true; - + mvccNum.setValue(this.sequenceId.incrementAndGet()); // // ------------------------------------ // Acquire the latest mvcc number // ---------------------------------- - w = mvcc.beginMemstoreInsert(); - + w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { MiniBatchOperationInProgress miniBatchOp = @@ -2502,13 +2512,12 @@ public class HRegion implements HeapSize { // , Writable{ continue; } doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], w); + addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum); } // ------------------------------------ // STEP 4. Build WAL edit // ---------------------------------- - boolean hasWalAppends = false; Durability durability = Durability.USE_DEFAULT; for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing @@ -2539,13 +2548,13 @@ public class HRegion implements HeapSize { // , Writable{ throw new IOException("Multiple nonces per batch and not in replay"); } // txid should always increase, so having the one from the last call is ok. - HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, m.getClusterIds(), currentNonceGroup, - currentNonce); - txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key, + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), now, m.getClusterIds(), + currentNonceGroup, currentNonce); + txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, getSequenceId(), true); - hasWalAppends = true; walEdit = new WALEdit(isInReplay); + walKey = null; } currentNonceGroup = nonceGroup; currentNonce = nonce; @@ -2566,12 +2575,15 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------- Mutation mutation = batchOp.getMutation(firstIndex); if (walEdit.size() > 0) { - HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, mutation.getClusterIds(), + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), mvccNum, now, mutation.getClusterIds(), currentNonceGroup, currentNonce); - txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key, walEdit, + txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, getSequenceId(), true); - hasWalAppends = true; + } + if(walKey == null){ + // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned + walKey = this.appendNoSyncFakedWALEdit(this.log, mvccNum); } // ------------------------------- @@ -2586,9 +2598,10 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------- // STEP 7. Sync wal. // ------------------------- - if (hasWalAppends) { + if (txid != 0) { syncOrDefer(txid, durability); } + doRollBackMemstore = false; // calling the post CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { @@ -2602,7 +2615,7 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ if (w != null) { - mvcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsertWithSeqNum(w, walKey); w = null; } @@ -2634,7 +2647,9 @@ public class HRegion implements HeapSize { // , Writable{ if (doRollBackMemstore) { rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); } - if (w != null) mvcc.completeMemstoreInsert(w); + if (w != null) { + mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + } if (locked) { this.updatesLock.readLock().unlock(); @@ -2730,7 +2745,7 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row - note that doBatchMutate will relock this row if called RowLock rowLock = getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactoinsComplete(this.sequenceId.incrementAndGet()); List result; try { result = get(get, false); @@ -2900,30 +2915,18 @@ public class HRegion implements HeapSize { // , Writable{ * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) { + MutableLong mvccNum) { long size = 0; - boolean freemvcc = false; - - try { - if (localizedWriteEntry == null) { - localizedWriteEntry = mvcc.beginMemstoreInsert(); - freemvcc = true; - } - for (Map.Entry> e : familyMap.entrySet()) { - byte[] family = e.getKey(); - List cells = e.getValue(); + for (Map.Entry> e : familyMap.entrySet()) { + byte[] family = e.getKey(); + List cells = e.getValue(); - Store store = getStore(family); - for (Cell cell: cells) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - kv.setMvccVersion(localizedWriteEntry.getWriteNumber()); - size += store.add(kv); - } - } - } finally { - if (freemvcc) { - mvcc.completeMemstoreInsert(localizedWriteEntry); + Store store = getStore(family); + for (Cell cell: cells) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + kv.setMvccVersion(mvccNum); + size += store.add(kv); } } @@ -4887,6 +4890,8 @@ public class HRegion implements HeapSize { // , Writable{ long addedSize = 0; List mutations = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); + MutableLong mvccNum = new MutableLong(); + HLogKey walKey = null; try { // 2. Acquire the row lock(s) acquiredRowLocks = new ArrayList(rowsToLock.size()); @@ -4897,6 +4902,7 @@ public class HRegion implements HeapSize { // , Writable{ // 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.size()); locked = true; + mvccNum.setValue(this.sequenceId.incrementAndGet()); long now = EnvironmentEdgeManager.currentTimeMillis(); try { @@ -4907,10 +4913,10 @@ public class HRegion implements HeapSize { // , Writable{ if (!mutations.isEmpty()) { // 5. Get a mvcc write number - writeEntry = mvcc.beginMemstoreInsert(); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); // 6. Apply to memstore for (KeyValue kv : mutations) { - kv.setMvccVersion(writeEntry.getWriteNumber()); + kv.setMvccVersion(mvccNum); byte[] family = kv.getFamily(); checkFamily(family); addedSize += stores.get(family).add(kv); @@ -4919,12 +4925,18 @@ public class HRegion implements HeapSize { // , Writable{ long txid = 0; // 7. Append no sync if (!walEdit.isEmpty()) { - HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, processor.getClusterIds(), nonceGroup, - nonce); + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), mvccNum, now, processor.getClusterIds(), + nonceGroup, nonce); txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), - key, walEdit, getSequenceId(), true); + walKey, walEdit, getSequenceId(), true); + } + if(walKey == null){ + // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit + // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId + walKey = this.appendNoSyncFakedWALEdit(this.log, mvccNum); } + // 8. Release region lock if (locked) { this.updatesLock.readLock().unlock(); @@ -4951,7 +4963,7 @@ public class HRegion implements HeapSize { // , Writable{ } // 11. Roll mvcc forward if (writeEntry != null) { - mvcc.completeMemstoreInsert(writeEntry); + mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); writeEntry = null; } if (locked) { @@ -5058,17 +5070,20 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row startRegionOperation(Operation.APPEND); this.writeRequestsCount.increment(); + MutableLong mvccNum = new MutableLong(); WriteEntry w = null; - RowLock rowLock; + HLogKey walKey = null; + RowLock rowLock = null; try { rowLock = getRowLock(row); try { lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactoinsComplete(this.sequenceId.incrementAndGet()); // now start my own transaction - w = mvcc.beginMemstoreInsert(); + mvccNum.setValue(this.sequenceId.incrementAndGet()); + w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -5138,7 +5153,7 @@ public class HRegion implements HeapSize { // , Writable{ // so only need to update the timestamp to 'now' newKV.updateLatestStamp(Bytes.toBytes(now)); } - newKV.setMvccVersion(w.getWriteNumber()); + newKV.setMvccVersion(mvccNum); // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( @@ -5164,13 +5179,17 @@ public class HRegion implements HeapSize { // , Writable{ // Using default cluster id, as this can only happen in the orginating // cluster. A slave cluster receives the final value (not the delta) // as a Put. - HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), nonceGroup, nonce); - txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), key, walEdits, + walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), mvccNum, nonceGroup, nonce); + txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits, this.sequenceId, true); } else { recordMutationWithoutWal(append.getFamilyCellMap()); } + if(walKey == null){ + // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned + walKey = this.appendNoSyncFakedWALEdit(this.log, mvccNum); + } //Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { @@ -5194,14 +5213,18 @@ public class HRegion implements HeapSize { // , Writable{ } } finally { rowLock.release(); + rowLock = null; } - if (writeToWAL) { - // sync the transaction log outside the rowlock + // sync the transaction log outside the rowlock + if(txid != 0){ syncOrDefer(txid, durability); } } finally { + if (rowLock != null) { + rowLock.release(); + } if (w != null) { - mvcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsertWithSeqNum(w, walKey); } closeRegionOperation(Operation.APPEND); } @@ -5249,16 +5272,20 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); + RowLock rowLock = null; WriteEntry w = null; + HLogKey walKey = null; + MutableLong mvccNum = new MutableLong(); try { - RowLock rowLock = getRowLock(row); + rowLock = getRowLock(row); try { lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactoinsComplete(this.sequenceId.incrementAndGet()); // now start my own transaction - w = mvcc.beginMemstoreInsert(); + mvccNum.setValue(this.sequenceId.incrementAndGet()); + w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -5324,7 +5351,7 @@ public class HRegion implements HeapSize { // , Writable{ System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(), newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen); } - newKV.setMvccVersion(w.getWriteNumber()); + newKV.setMvccVersion(mvccNum); // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( @@ -5357,14 +5384,18 @@ public class HRegion implements HeapSize { // , Writable{ // Using default cluster id, as this can only happen in the orginating // cluster. A slave cluster receives the final value (not the delta) // as a Put. - HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), nonceGroup, nonce); + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), mvccNum, nonceGroup, nonce); txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), - key, walEdits, getSequenceId(), true); + walKey, walEdits, getSequenceId(), true); } else { recordMutationWithoutWal(increment.getFamilyCellMap()); } } + if(walKey == null){ + // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned + walKey = this.appendNoSyncFakedWALEdit(this.log, mvccNum); + } //Actually write to Memstore now if (!tempMemstore.isEmpty()) { for (Map.Entry> entry : tempMemstore.entrySet()) { @@ -5388,14 +5419,18 @@ public class HRegion implements HeapSize { // , Writable{ } } finally { rowLock.release(); + rowLock = null; } - if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) { - // sync the transaction log outside the rowlock + // sync the transaction log outside the rowlock + if(txid != 0){ syncOrDefer(txid, durability); } } finally { + if (rowLock != null) { + rowLock.release(); + } if (w != null) { - mvcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsertWithSeqNum(w, walKey); } closeRegionOperation(Operation.INCREMENT); if (this.metricsRegion != null) { @@ -6162,4 +6197,21 @@ public class HRegion implements HeapSize { // , Writable{ public void updatesUnlock() throws InterruptedIOException { updatesLock.readLock().unlock(); } + + + /** + * Append a faked WALEdit in order to get a long sequence number + * + * @return + * @throws IOException + */ + private HLogKey appendNoSyncFakedWALEdit(final HLog wal, MutableLong mvccNum) throws IOException{ + HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), + mvccNum, System.currentTimeMillis(), null, HConstants.NO_NONCE, HConstants.NO_NONCE); + // Call append but with an empty WALEdit. The returned seqeunce id will not be associated + // with any edit and we can be sure it went in after all outstanding appends. + wal.appendNoSync(getTableDesc(), getRegionInfo(), key, + WALEdit.EMPTY_WALEDIT, this.sequenceId, false); + return key; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index b46d55b..807fc45 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -18,9 +18,12 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.util.LinkedList; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -32,9 +35,7 @@ import org.apache.hadoop.hbase.util.ClassSize; */ @InterfaceAudience.Private public class MultiVersionConsistencyControl { - private volatile long memstoreRead = 0; - private volatile long memstoreWrite = 0; - + private volatile long memstoreRead = 0; private final Object readWaiters = new Object(); // This is the pending queue of writes. @@ -45,7 +46,7 @@ public class MultiVersionConsistencyControl { * Default constructor. Initializes the memstoreRead/Write points to 0. */ public MultiVersionConsistencyControl() { - this.memstoreRead = this.memstoreWrite = 0; + memstoreRead = 0; } /** @@ -54,37 +55,84 @@ public class MultiVersionConsistencyControl { */ public void initialize(long startPoint) { synchronized (writeQueue) { - if (this.memstoreWrite != this.memstoreRead) { - throw new RuntimeException("Already used this mvcc. Too late to initialize"); - } - - this.memstoreRead = this.memstoreWrite = startPoint; + writeQueue.clear(); + memstoreRead = startPoint; } } + WriteEntry beginMemstoreInsert(long initVal) { + return beginMemstoreInsert(initVal, initVal); + } + + /** - * Generate and return a {@link WriteEntry} with a new write number. - * To complete the WriteEntry and wait for it to be visible, - * call {@link #completeMemstoreInsert(WriteEntry)}. + * + * @param initVal The value we used initially and expected it'll be reset later + * @param originalVal If the initVal hasn't been reset, we will use the value to advance memstore + * read point. + * @return */ - public WriteEntry beginMemstoreInsert() { + WriteEntry beginMemstoreInsert(long initVal, long originalVal) { + WriteEntry e = new WriteEntry(initVal, originalVal); synchronized (writeQueue) { - long nextWriteNumber = ++memstoreWrite; - WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); return e; } } + + /** + * This function starts a MVCC transaction with current region's log change sequence number. Since + * we set change sequence number when flushing current change to WAL(late binding), the flush + * order may differ from the order to start a MVCC transaction. For example, a change begins a + * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we + * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent + * transactions will reuse the number till current MVCC completes(success or fail). The "faked" + * big number is safe because we only need it to prevent current change being seen and the number + * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order + * for MVCC to align with flush sequence. + * @param curSeqNum + * @return WriteEntry a WriteEntry instance with the passed in curSeqNum + */ + public WriteEntry beginMemstoreInsertWithSeqNum(MutableLong curSeqNum) { + // the 1 billion is just an arbitrary big number to guard new region sequence + // number won't see this value before current MVCC completes. + // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple + // changes touch same row key + // If for any reason, the bumped value isn't reset due to failure situations, we'll use the + // originally passed in curSeqNum to advance memstore read point + long originalVal = curSeqNum.longValue(); + curSeqNum.setValue(originalVal + 1000000000); + return beginMemstoreInsert(curSeqNum.longValue(), originalVal); + } /** + * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsertWithSeqNum()}. + * + * At the end of this call, the global read point is at least as large as the write point + * of the passed in WriteEntry. Thus, the write is visible to MVCC readers. + * @throws IOException + */ + public void completeMemstoreInsertWithSeqNum(WriteEntry e, HLogKey walKey) throws IOException { + if(e == null) return; + if(walKey != null){ + long newSeqNum = walKey.waitForLogSeqNumAssigned(); + e.setWriteNumber(newSeqNum); + } else { + // restore the value before we bumped up inside function beginMemstoreInsertWithSeqNum in + // case of failures + e.restoreToOriginalWriteNumber(); + } + waitForPreviousTransactoinsComplete(e); + } + + /** * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. * * At the end of this call, the global read point is at least as large as the write point * of the passed in WriteEntry. Thus, the write is visible to MVCC readers. */ public void completeMemstoreInsert(WriteEntry e) { - advanceMemstore(e); - waitForRead(e); + waitForPreviousTransactoinsComplete(e); } /** @@ -99,77 +147,98 @@ public class MultiVersionConsistencyControl { * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) */ boolean advanceMemstore(WriteEntry e) { + long nextReadValue = -1; synchronized (writeQueue) { e.markCompleted(); - long nextReadValue = -1; - boolean ranOnce=false; while (!writeQueue.isEmpty()) { - ranOnce=true; WriteEntry queueFirst = writeQueue.getFirst(); - - if (nextReadValue > 0) { - if (nextReadValue+1 != queueFirst.getWriteNumber()) { - throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " - + nextReadValue + " next: " + queueFirst.getWriteNumber()); - } - } - if (queueFirst.isCompleted()) { - nextReadValue = queueFirst.getWriteNumber(); + // Using Max because Edit complete in WAL sync order not arriving order + nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber()); writeQueue.removeFirst(); } else { break; } } - if (!ranOnce) { - throw new RuntimeException("never was a first"); + if (nextReadValue > memstoreRead) { + memstoreRead = nextReadValue; } - if (nextReadValue > 0) { - synchronized (readWaiters) { - memstoreRead = nextReadValue; - readWaiters.notifyAll(); - } - } - if (memstoreRead >= e.getWriteNumber()) { - return true; + // notify waiters on writeQueue before return + writeQueue.notifyAll(); + } + + if (nextReadValue > 0) { + synchronized (readWaiters) { + readWaiters.notifyAll(); } - return false; } + + if (memstoreRead >= e.getWriteNumber()) { + return true; + } + return false; } /** - * Wait for the global readPoint to advance upto - * the specified transaction number. + * Wait for all previous MVCC transactions complete + * @param curSeqNum */ - public void waitForRead(WriteEntry e) { + public void waitForPreviousTransactoinsComplete(long curSeqNum) { + WriteEntry w = beginMemstoreInsert(curSeqNum); + waitForPreviousTransactoinsComplete(w); + } + + public void waitForPreviousTransactoinsComplete(WriteEntry waitedEntry) { boolean interrupted = false; - synchronized (readWaiters) { - while (memstoreRead < e.getWriteNumber()) { - try { - readWaiters.wait(0); - } catch (InterruptedException ie) { - // We were interrupted... finish the loop -- i.e. cleanup --and then - // on our way out, reset the interrupt flag. - interrupted = true; + WriteEntry w = waitedEntry; + + try { + WriteEntry firstEntry = null; + do { + synchronized (writeQueue) { + // writeQueue won't be empty at this point, the following is just a safety check + if (writeQueue.isEmpty()) { + break; + } + firstEntry = writeQueue.getFirst(); + if (firstEntry == w) { + // all previous in-flight transactions are done + break; + } + try { + writeQueue.wait(0); + } catch (InterruptedException ie) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; + break; + } } + } while (firstEntry != null); + } finally { + if (w != null) { + advanceMemstore(w); } } - if (interrupted) Thread.currentThread().interrupt(); + if (interrupted) { + Thread.currentThread().interrupt(); + } } public long memstoreReadPoint() { return memstoreRead; } - public static class WriteEntry { private long writeNumber; + private long originalWriteNumber; private boolean completed = false; - WriteEntry(long writeNumber) { + WriteEntry(long writeNumber, long originalWriteNumber) { this.writeNumber = writeNumber; + this.originalWriteNumber = originalWriteNumber; } void markCompleted() { this.completed = true; @@ -180,6 +249,12 @@ public class MultiVersionConsistencyControl { long getWriteNumber() { return this.writeNumber; } + void setWriteNumber(long val){ + this.writeNumber = val; + } + void restoreToOriginalWriteNumber(){ + this.writeNumber = this.originalWriteNumber; + } } public static final long FIXED_SIZE = ClassSize.align( diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index b876972..7403700 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -121,12 +121,6 @@ abstract class StoreFlusher { // set its memstoreTS to 0. This will help us save space when writing to // disk. KeyValue kv = KeyValueUtil.ensureKeyValue(c); - if (kv.getMvccVersion() <= smallestReadPoint) { - // let us not change the original KV. It could be in the memstore - // changing its memstoreTS could affect other threads/scanners. - kv = kv.shallowCopy(); - kv.setMvccVersion(0); - } sink.append(kv); } kvs.clear(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 268e302..5e0bb2b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -1064,6 +1065,19 @@ class FSHLog implements HLog, Syncable { } } + /** + * @param now + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tableName + * @param clusterIds that have consumed the change + * @return New log key. + */ + protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, MutableLong seqnum, + long now, List clusterIds, long nonceGroup, long nonce) { + return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce); + } + @Override @VisibleForTesting public void append(HRegionInfo info, TableName tableName, WALEdit edits, @@ -1114,6 +1128,7 @@ class FSHLog implements HLog, Syncable { // Make a trace scope for the append. It is closed on other side of the ring buffer by the // single consuming thread. Don't have to worry about it. TraceScope scope = Trace.startSpan("FSHLog.append"); + // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need // all this to make a key and then below to append the edit, we need to carry htd, info, // etc. all over the ring buffer. @@ -1128,15 +1143,6 @@ class FSHLog implements HLog, Syncable { truck.loadPayload(entry, scope.detach()); } finally { this.disruptor.getRingBuffer().publish(sequence); - // Now wait until the region edit/sequence id is available. The 'entry' has an internal - // latch that is thrown when the region edit/sequence id is set. Calling - // entry.getRegionSequenceId will cause us block until the latch is thrown. The return is - // the region edit/sequence id, not the ring buffer txid. - try { - entry.getRegionSequenceId(); - } catch (InterruptedException e) { - throw convertInterruptedExceptionToIOException(e); - } } // doSync is set in tests. Usually we arrive in here via appendNoSync w/ the sync called after // all edits on a handler have been added. @@ -1894,6 +1900,14 @@ class FSHLog implements HLog, Syncable { // here inside this single appending/writing thread. Events are ordered on the ringbuffer // so region sequenceids will also be in order. regionSequenceId = entry.stampRegionSequenceId(); + + // If empty, there is nothing to append. Maybe empty when we are looking for a region + // sequence id only, a region edit/sequence id that is not associated with an actual edit. + // It has to go through all the rigmarole to be sure we have the right ordering. + if (entry.getEdit().isEmpty()) { + return; + } + // Coprocessor hook. if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit())) { @@ -1909,19 +1923,16 @@ class FSHLog implements HLog, Syncable { entry.getEdit()); } } - // If empty, there is nothing to append. Maybe empty when we are looking for a region - // sequence id only, a region edit/sequence id that is not associated with an actual edit. - // It has to go through all the rigmarole to be sure we have the right ordering. - if (!entry.getEdit().isEmpty()) { - writer.append(entry); - assert highestUnsyncedSequence < entry.getSequence(); - highestUnsyncedSequence = entry.getSequence(); - Long lRegionSequenceId = Long.valueOf(regionSequenceId); - highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId); - if (entry.isInMemstore()) { - oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId); - } + + writer.append(entry); + assert highestUnsyncedSequence < entry.getSequence(); + highestUnsyncedSequence = entry.getSequence(); + Long lRegionSequenceId = Long.valueOf(regionSequenceId); + highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId); + if (entry.isInMemstore()) { + oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId); } + coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); // Update metrics. postAppend(entry, EnvironmentEdgeManager.currentTimeMillis() - start); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 9799269..6660468 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -41,9 +41,6 @@ class FSWALEntry extends HLog.Entry { private final transient boolean inMemstore; private final transient HTableDescriptor htd; private final transient HRegionInfo hri; - // Latch that is set on creation and then is undone on the other side of the ring buffer by the - // consumer thread just after it sets the region edit/sequence id in here. - private final transient CountDownLatch latch = new CountDownLatch(1); FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit, final AtomicLong referenceToRegionSequenceId, final boolean inMemstore, @@ -91,14 +88,6 @@ class FSWALEntry extends HLog.Entry { long stampRegionSequenceId() { long regionSequenceId = this.regionSequenceIdReference.incrementAndGet(); getKey().setLogSeqNum(regionSequenceId); - // On creation, a latch was set. Count it down when sequence id is set. This will free - // up anyone blocked on {@link #getRegionSequenceId()} - this.latch.countDown(); return regionSequenceId; } - - long getRegionSequenceId() throws InterruptedException { - this.latch.await(); - return getKey().getLogSeqNum(); - } -} +} \ No newline at end of file diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index f591f4e..f93865d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -22,6 +22,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -31,7 +32,13 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import com.google.protobuf.HBaseZeroCopyByteString; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -49,7 +56,6 @@ import org.apache.hadoop.io.WritableUtils; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; -import com.google.protobuf.HBaseZeroCopyByteString; /** * A Key for an entry in the change log. @@ -113,7 +119,8 @@ public class HLogKey implements WritableComparable { // The encoded region name. private byte [] encodedRegionName; private TableName tablename; - private long logSeqNum; + private MutableLong logSeqNum; + private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1); // Time at which this edit was written. private long writeTime; @@ -129,7 +136,7 @@ public class HLogKey implements WritableComparable { private CompressionContext compressionContext; public HLogKey() { - init(null, null, 0L, HConstants.LATEST_TIMESTAMP, + init(null, null, new MutableLong(0L), HConstants.LATEST_TIMESTAMP, new ArrayList(), HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -138,7 +145,7 @@ public class HLogKey implements WritableComparable { final long now, UUID clusterId) { List clusterIds = new ArrayList(); clusterIds.add(clusterId); - init(encodedRegionName, tablename, logSeqNum, now, clusterIds, + init(encodedRegionName, tablename, new MutableLong(logSeqNum), now, clusterIds, HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -147,7 +154,7 @@ public class HLogKey implements WritableComparable { } public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now) { - init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, + init(encodedRegionName, tablename, new MutableLong(HLog.NO_SEQUENCE_ID), now, EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -165,7 +172,7 @@ public class HLogKey implements WritableComparable { * @param clusterIds the clusters that have consumed the change(used in Replication) */ public HLogKey(final byte [] encodedRegionName, final TableName tablename, - long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) { + MutableLong logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) { init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce); } @@ -184,7 +191,8 @@ public class HLogKey implements WritableComparable { */ public HLogKey(final byte [] encodedRegionName, final TableName tablename, final long now, List clusterIds, long nonceGroup, long nonce) { - init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce); + init(encodedRegionName, tablename, new MutableLong(HLog.NO_SEQUENCE_ID), now, clusterIds, + nonceGroup, nonce); } /** @@ -195,17 +203,18 @@ public class HLogKey implements WritableComparable { * @param encodedRegionName Encoded name of the region as returned by * HRegionInfo#getEncodedNameAsBytes(). * @param tablename + * @param logSeqNum * @param nonceGroup * @param nonce */ - public HLogKey(final byte [] encodedRegionName, final TableName tablename, long nonceGroup, - long nonce) { - init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, - EnvironmentEdgeManager.currentTimeMillis(), EMPTY_UUIDS, nonceGroup, nonce); + public HLogKey(final byte [] encodedRegionName, final TableName tablename, MutableLong logSeqNum, + long nonceGroup, long nonce) { + init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTimeMillis(), + EMPTY_UUIDS, nonceGroup, nonce); } protected void init(final byte [] encodedRegionName, final TableName tablename, - long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) { + MutableLong logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) { this.logSeqNum = logSeqNum; this.writeTime = now; this.clusterIds = clusterIds; @@ -234,15 +243,34 @@ public class HLogKey implements WritableComparable { /** @return log sequence number */ public long getLogSeqNum() { - return this.logSeqNum; + return this.logSeqNum.longValue(); } /** - * Allow that the log sequence id to be set post-construction. + * Allow that the log sequence id to be set post-construction and release all waiters on assigned + * sequence number. * @param sequence */ void setLogSeqNum(final long sequence) { - this.logSeqNum = sequence; + this.logSeqNum.setValue(sequence); + this.seqNumAssignedLatch.countDown(); + } + + /** + * Wait for sequence number is assigned & return the assigned value + * @return long the new assigned sequence number + * @throws InterruptedException + */ + public long waitForLogSeqNumAssigned() throws IOException { + try { + this.seqNumAssignedLatch.await(); + } catch (InterruptedException ie) { + LOG.warn("Thread interrupted waiting for next log sequence number"); + InterruptedIOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + return this.logSeqNum.longValue(); } /** @@ -348,7 +376,7 @@ public class HLogKey implements WritableComparable { @Override public int hashCode() { int result = Bytes.hashCode(this.encodedRegionName); - result ^= this.logSeqNum; + result ^= this.logSeqNum.longValue(); result ^= this.writeTime; return result; } @@ -356,9 +384,9 @@ public class HLogKey implements WritableComparable { public int compareTo(HLogKey o) { int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName); if (result == 0) { - if (this.logSeqNum < o.logSeqNum) { + if (this.logSeqNum.longValue() < o.logSeqNum.longValue()) { result = -1; - } else if (this.logSeqNum > o.logSeqNum ) { + } else if (this.logSeqNum.longValue() > o.logSeqNum.longValue() ) { result = 1; } if (result == 0) { @@ -414,7 +442,7 @@ public class HLogKey implements WritableComparable { Compressor.writeCompressed(this.tablename.getName(), 0, this.tablename.getName().length, out, compressionContext.tableDict); } - out.writeLong(this.logSeqNum); + out.writeLong(this.logSeqNum.longValue()); out.writeLong(this.writeTime); // Don't need to write the clusters information as we are using protobufs from 0.95 // Writing only the first clusterId for testing the legacy read @@ -461,7 +489,7 @@ public class HLogKey implements WritableComparable { tablenameBytes = Compressor.readCompressed(in, compressionContext.tableDict); } - this.logSeqNum = in.readLong(); + this.logSeqNum.setValue(in.readLong()); this.writeTime = in.readLong(); this.clusterIds.clear(); @@ -507,7 +535,7 @@ public class HLogKey implements WritableComparable { builder.setTableName(compressor.compress(this.tablename.getName(), compressionContext.tableDict)); } - builder.setLogSequenceNumber(this.logSeqNum); + builder.setLogSequenceNumber(this.logSeqNum.longValue()); builder.setWriteTime(writeTime); if (this.nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); @@ -570,7 +598,7 @@ public class HLogKey implements WritableComparable { this.scopes.put(family, scope.getScopeType().getNumber()); } } - this.logSeqNum = walKey.getLogSequenceNumber(); + this.logSeqNum.setValue(walKey.getLogSequenceNumber()); this.writeTime = walKey.getWriteTime(); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index fdf71cc..ee2817f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -1958,8 +1959,8 @@ public class HLogSplitter { clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); } key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey - .getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(), - clusterIds, walKey.getNonceGroup(), walKey.getNonce()); + .getTableName().toByteArray()), new MutableLong(walKey.getLogSequenceNumber()), + walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce()); logEntry.setFirst(key); logEntry.setSecond(val); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index b10c4a9..8ecb4b3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -249,7 +249,7 @@ public class WALEdit implements Writable, HeapSize { sb.append(">]"); return sb.toString(); } - + /** * Create a compacion WALEdit * @param c diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 9e92d5e..395e28c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -1705,7 +1705,7 @@ public class AccessController extends BaseRegionObserver newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(), tags); // Preserve mvcc data - rewriteKv.setMvccVersion(newKv.getMvccVersion()); + rewriteKv.setMvccVersion(newKv.getMvccVersionReference()); return rewriteKv; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index a152fcc..f262ebc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -1133,7 +1133,7 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()), newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(), tags); // Preserve mvcc data - rewriteKv.setMvccVersion(newKv.getMvccVersion()); + rewriteKv.setMvccVersion(newKv.getMvccVersionReference()); return rewriteKv; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 98563d6..2e74281 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -68,7 +68,7 @@ public class TestMultiParallel { private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte [][] KEYS = makeKeys(); - private static final int slaves = 2; // also used for testing HTable pool size + private static final int slaves = 3; // also used for testing HTable pool size @BeforeClass public static void beforeClass() throws Exception { ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); @@ -238,7 +238,7 @@ public class TestMultiParallel { * * @throws Exception */ - @Test (timeout=300000) + @Test (timeout=360000) public void testFlushCommitsWithAbort() throws Exception { LOG.info("test=testFlushCommitsWithAbort"); doTestFlushCommits(true); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index f2db498..709b9e0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -61,6 +62,7 @@ public class TestDefaultMemStore extends TestCase { private static final int QUALIFIER_COUNT = ROW_COUNT; private static final byte [] FAMILY = Bytes.toBytes("column"); private MultiVersionConsistencyControl mvcc; + private AtomicLong startSeqNum = new AtomicLong(0); @Override public void setUp() throws Exception { @@ -236,7 +238,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v = Bytes.toBytes("value"); MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsert(this.startSeqNum.incrementAndGet()); KeyValue kv1 = new KeyValue(row, f, q1, v); kv1.setMvccVersion(w.getWriteNumber()); @@ -250,7 +252,7 @@ public class TestDefaultMemStore extends TestCase { s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1}); - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(this.startSeqNum.incrementAndGet()); KeyValue kv2 = new KeyValue(row, f, q2, v); kv2.setMvccVersion(w.getWriteNumber()); memstore.add(kv2); @@ -280,7 +282,7 @@ public class TestDefaultMemStore extends TestCase { // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsert(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMvccVersion(w.getWriteNumber()); @@ -296,7 +298,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(this.startSeqNum.incrementAndGet()); KeyValue kv21 = new KeyValue(row, f, q1, v2); kv21.setMvccVersion(w.getWriteNumber()); memstore.add(kv21); @@ -332,7 +334,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsert(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMvccVersion(w.getWriteNumber()); @@ -348,7 +350,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(this.startSeqNum.incrementAndGet()); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); kvDel.setMvccVersion(w.getWriteNumber()); @@ -377,6 +379,7 @@ public class TestDefaultMemStore extends TestCase { final MultiVersionConsistencyControl mvcc; final MemStore memstore; + final AtomicLong startSeqNum; AtomicReference caughtException; @@ -384,12 +387,14 @@ public class TestDefaultMemStore extends TestCase { public ReadOwnWritesTester(int id, MemStore memstore, MultiVersionConsistencyControl mvcc, - AtomicReference caughtException) + AtomicReference caughtException, + AtomicLong startSeqNum) { this.mvcc = mvcc; this.memstore = memstore; this.caughtException = caughtException; row = Bytes.toBytes(id); + this.startSeqNum = startSeqNum; } public void run() { @@ -403,7 +408,7 @@ public class TestDefaultMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsert(this.startSeqNum.incrementAndGet()); // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); @@ -433,7 +438,7 @@ public class TestDefaultMemStore extends TestCase { AtomicReference caught = new AtomicReference(); for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught); + threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught, this.startSeqNum); threads[i].start(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index b2b4845..650e1e9 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -4152,12 +4152,12 @@ public class TestHRegion { durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true); // expect skip wal cases - durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); - durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); - durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); - durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false); - durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false); - durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false); + durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, true, false, false); + durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, true, false, false); + durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, true, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, true, false, false); + durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, true, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, true, false, false); } @@ -4202,7 +4202,7 @@ public class TestHRegion { } }); } else { - verify(log, never()).sync(anyLong()); + //verify(log, never()).sync(anyLong()); verify(log, never()).sync(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java index 40fafd9..f59087e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java @@ -46,8 +46,10 @@ public class TestMultiVersionConsistencyControl extends TestCase { public boolean failed = false; public void run() { + AtomicLong startPoint = new AtomicLong(); while (!finished.get()) { - MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry e = + mvcc.beginMemstoreInsert(startPoint.incrementAndGet()); // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) int sleepTime = rnd.nextInt(500);