diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b616176..e2ae36c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -666,9 +666,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final Durability durability; private final boolean regionStatsEnabled; - // flag and lock for MVCC preassign - private final boolean mvccPreAssign; - private final ReentrantLock preAssignMvccLock; /** * HRegion constructor. This constructor should only be used for testing and @@ -819,14 +816,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi false : conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); - - // get mvcc pre-assign flag and lock - this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN); - if (this.mvccPreAssign) { - this.preAssignMvccLock = new ReentrantLock(); - } else { - this.preAssignMvccLock = null; - } } void setHTableSpecificConf() { @@ -2669,9 +2658,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // a timeout. May happen in tests after we tightened the semantic via HBASE-14317. // Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches // so if an abort or stop, there is no way to call them in. - WALKey key = this.appendEmptyEdit(wal, null); + WALKey key = this.appendEmptyEdit(wal); mvcc.complete(key.getWriteEntry()); - return key.getSequenceId(this.maxWaitForSeqId); + return key.getSequenceId(); } ////////////////////////////////////////////////////////////////////////////// @@ -3413,29 +3402,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); } } else { - try { - if (mvccPreAssign) { - preAssignMvccLock.lock(); - writeEntry = mvcc.begin(); - } - if (walEdit.size() > 0) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); - if (mvccPreAssign) { - walKey.setPreAssignedWriteEntry(writeEntry); - } - txid = - this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); - } else { - // If this is a skip wal operation just get the read point from mvcc - walKey = this.appendEmptyEdit(this.wal, writeEntry); - } - } finally { - if (mvccPreAssign) { - preAssignMvccLock.unlock(); - } + if (walEdit.size() > 0) { + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); + txid = this.wal + .append(this.htableDescriptor, this.getRegionInfo(), walKey, + walEdit, true); + } else { + walKey = appendEmptyEdit(wal); } } // ------------------------------------ @@ -3473,7 +3449,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // before apply to memstore to avoid scan return incorrect value. // we use durability of the original mutation for the mutation passed by CP. boolean updateSeqId = isInReplay - || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL || mvccPreAssign; + || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; if (updateSeqId) { updateSequenceId(familyMaps[i].values(), mvccNum); } @@ -7412,7 +7388,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if(walKey == null){ // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId - walKey = this.appendEmptyEdit(this.wal, null); + walKey = this.appendEmptyEdit(this.wal); } // 7. Start mvcc transaction @@ -7737,7 +7713,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean updateSeqId = false; if (walKey == null) { // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned - walKey = this.appendEmptyEdit(this.wal, null); + walKey = this.appendEmptyEdit(this.wal); // If no WAL, FSWALEntry won't be used and no update for sequence id updateSeqId = true; } @@ -7974,7 +7950,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true); } else { // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned - walKey = this.appendEmptyEdit(this.wal, null); + walKey = this.appendEmptyEdit(this.wal); // If no WAL, FSWALEntry won't be used and no update for sequence id updateSeqId = true; } @@ -8204,9 +8180,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 47 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 46 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + - 6 * Bytes.SIZEOF_BOOLEAN); + 5 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: // 1 x HashMap - coprocessorServiceHandlers @@ -8792,16 +8768,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return Return the key used appending with no sync and no append. * @throws IOException */ - private WALKey appendEmptyEdit(final WAL wal, WriteEntry writeEntry) throws IOException { + private WALKey appendEmptyEdit(final WAL wal) throws IOException { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. @SuppressWarnings("deprecation") WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC()); - if (writeEntry != null) { - key.setPreAssignedWriteEntry(writeEntry); - } - // Call append but with an empty WALEdit. The returned sequence id will not be associated // with any edit and we can be sure it went in after all outstanding appends. try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index d65af00..57d6356 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -18,12 +18,12 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; + import java.util.LinkedList; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.annotations.VisibleForTesting; - -import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -111,20 +111,34 @@ public class MultiVersionConcurrencyControl { } /** + * Call {@link #begin(Runnable)} with an empty {@link Runnable}. + */ + public WriteEntry begin() { + return begin(new Runnable() { + @Override public void run() { + + } + }); + } + + /** * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it - * to our queue of ongoing writes. Return this WriteEntry instance. - * To complete the write transaction and wait for it to be visible, call - * {@link #completeAndWait(WriteEntry)}. If the write failed, call - * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write - * transaction. + * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write + * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the + * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of + * the failed write transaction. + *

+ * The {@code action} will be executed under the lock which means it can keep the same order with + * mvcc. * @see #complete(WriteEntry) * @see #completeAndWait(WriteEntry) */ - public WriteEntry begin() { + public WriteEntry begin(Runnable action) { synchronized (writeQueue) { long nextWriteNumber = writePoint.incrementAndGet(); WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); + action.run(); return e; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index d3f302a..8d97b64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import com.lmax.disruptor.*; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -65,6 +67,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -89,11 +92,6 @@ import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.ExceptionHandler; -import com.lmax.disruptor.LifecycleAware; -import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; @@ -1112,21 +1110,22 @@ public class FSHLog implements WAL { // Make a trace scope for the append. It is closed on other side of the ring buffer by the // single consuming thread. Don't have to worry about it. TraceScope scope = Trace.startSpan("FSHLog.append"); - - // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need - // all this to make a key and then below to append the edit, we need to carry htd, info, - // etc. all over the ring buffer. - FSWALEntry entry = null; - long sequence = this.disruptor.getRingBuffer().next(); + final MutableLong txidHolder = new MutableLong(); + final RingBuffer ringBuffer = disruptor.getRingBuffer(); + MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(new Runnable() { + @Override public void run() { + txidHolder.setValue(ringBuffer.next()); + } + }); + long txid = txidHolder.longValue(); try { - RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); - // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. - entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore); - truck.loadPayload(entry, scope.detach()); + FSWALEntry entry = new FSWALEntry(txid, key, edits, htd, hri, inMemstore); + entry.stampRegionSequenceId(we); + ringBuffer.get(txid).loadPayload(entry, scope.detach()); } finally { - this.disruptor.getRingBuffer().publish(sequence); + ringBuffer.publish(txid); } - return sequence; + return txid; } /** @@ -1814,12 +1813,6 @@ public class FSHLog implements WAL { try { FSWALEntry entry = truck.unloadFSWALEntryPayload(); if (this.exception != null) { - // We got an exception on an earlier attempt at append. Do not let this append - // go through. Fail it but stamp the sequenceid into this append though failed. - // We need to do this to close the latch held down deep in WALKey...that is waiting - // on sequenceid assignment otherwise it will just hang out (The #append method - // called below does this also internally). - entry.stampRegionSequenceId(); // Return to keep processing events coming off the ringbuffer return; } @@ -1940,10 +1933,8 @@ public class FSHLog implements WAL { byte [] encodedRegionName = entry.getKey().getEncodedRegionName(); long regionSequenceId = WALKey.NO_SEQUENCE_ID; try { - // We are about to append this edit; update the region-scoped sequence number. Do it - // here inside this single appending/writing thread. Events are ordered on the ringbuffer - // so region sequenceids will also be in order. - regionSequenceId = entry.stampRegionSequenceId(); + + regionSequenceId = entry.getKey().getSequenceId(); // Edits are empty, there is nothing to append. Maybe empty when we are looking for a // region sequence id only, a region edit/sequence id that is not associated with an actual // edit. It has to go through all the rigmarole to be sure we have the right ordering. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index f55e185..69d1c59 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -106,34 +106,19 @@ class FSWALEntry extends Entry { /** * Here is where a WAL edit gets its sequenceid. + * @param we after HBASE-17471 we already get the mvcc number + * in WriteEntry, just stamp the writenumber to cells and walkey * @return The sequenceid we stamped on this edit. * @throws IOException */ - long stampRegionSequenceId() throws IOException { - long regionSequenceId = WALKey.NO_SEQUENCE_ID; - WALKey key = getKey(); - MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); - boolean preAssigned = (we != null); - if (!preAssigned) { - MultiVersionConcurrencyControl mvcc = key.getMvcc(); - if (mvcc != null) { - we = mvcc.begin(); - } - } - if (we != null) { - regionSequenceId = we.getWriteNumber(); - } - + long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws IOException { + long regionSequenceId = we.getWriteNumber(); if (!this.getEdit().isReplay() && inMemstore) { - for (Cell c:getEdit().getCells()) { + for (Cell c : getEdit().getCells()) { CellUtil.setSequenceId(c, regionSequenceId); } } - - // This has to stay in this order - if (!preAssigned) { - key.setWriteEntry(we); - } + getKey().setWriteEntry(we); return regionSequenceId; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 585c8f4..01420d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -95,37 +95,16 @@ public class WALKey implements SequenceId, Comparable { */ @InterfaceAudience.Private // For internal use only. public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException { - if (this.preAssignedWriteEntry != null) { - // don't wait for seqNumAssignedLatch if writeEntry is preassigned - return this.preAssignedWriteEntry; - } - try { - this.seqNumAssignedLatch.await(); - } catch (InterruptedException ie) { - // If interrupted... clear out our entry else we can block up mvcc. - MultiVersionConcurrencyControl mvcc = getMvcc(); - LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry); - if (mvcc != null) { - if (this.writeEntry != null) { - mvcc.complete(this.writeEntry); - } - } - InterruptedIOException iie = new InterruptedIOException(); - iie.initCause(ie); - throw iie; - } + assert this.writeEntry != null; return this.writeEntry; } @InterfaceAudience.Private // For internal use only. public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) { - assert this.writeEntry == null : "Non-null writeEntry when trying to set one"; + assert this.writeEntry == null; this.writeEntry = writeEntry; // Set our sequenceid now using WriteEntry. - if (this.writeEntry != null) { - this.logSeqNum = this.writeEntry.getWriteNumber(); - } - this.seqNumAssignedLatch.countDown(); + this.logSeqNum = writeEntry.getWriteNumber(); } // should be < 0 (@see HLogKey#readFields(DataInput)) @@ -189,7 +168,6 @@ public class WALKey implements SequenceId, Comparable { @InterfaceAudience.Private protected long logSeqNum; private long origLogSeqNum = 0; - private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1); // Time at which this edit was written. // visible for deprecated HLogKey @InterfaceAudience.Private @@ -206,7 +184,6 @@ public class WALKey implements SequenceId, Comparable { private long nonce = HConstants.NO_NONCE; private MultiVersionConcurrencyControl mvcc; private MultiVersionConcurrencyControl.WriteEntry writeEntry; - private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = null; public static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList()); // visible for deprecated HLogKey @@ -393,36 +370,6 @@ public class WALKey implements SequenceId, Comparable { */ @Override public long getSequenceId() throws IOException { - return getSequenceId(-1); - } - - /** - * Wait for sequence number to be assigned & return the assigned value. - * @param maxWaitForSeqId maximum time to wait in milliseconds for sequenceid - * @return long the new assigned sequence number - * @throws IOException - */ - public long getSequenceId(final long maxWaitForSeqId) throws IOException { - // TODO: This implementation waiting on a latch is problematic because if a higher level - // determines we should stop or abort, there is no global list of all these blocked WALKeys - // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId. - // - // UPDATE: I think we can remove the timeout now we are stamping all walkeys with sequenceid, - // even those that have failed (previously we were not... so they would just hang out...). - // St.Ack 20150910 - try { - if (maxWaitForSeqId < 0) { - this.seqNumAssignedLatch.await(); - } else if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) { - throw new TimeoutIOException("Failed to get sequenceid after " + maxWaitForSeqId + - "ms; WAL system stuck or has gone away?"); - } - } catch (InterruptedException ie) { - LOG.warn("Thread interrupted waiting for next log sequence number"); - InterruptedIOException iie = new InterruptedIOException(); - iie.initCause(ie); - throw iie; - } return this.logSeqNum; } @@ -667,23 +614,4 @@ public class WALKey implements SequenceId, Comparable { } } - /** - * @return The preassigned writeEntry, if any - */ - @InterfaceAudience.Private // For internal use only. - public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() { - return this.preAssignedWriteEntry; - } - - /** - * Preassign writeEntry - * @param writeEntry the entry to assign - */ - @InterfaceAudience.Private // For internal use only. - public void setPreAssignedWriteEntry(WriteEntry writeEntry) { - if (writeEntry != null) { - this.preAssignedWriteEntry = writeEntry; - this.logSeqNum = writeEntry.getWriteNumber(); - } - } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index aca2978..2acafb3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -242,7 +242,9 @@ public class TestWALObserver { // it's where WAL write cp should occur. long now = EnvironmentEdgeManager.currentTime(); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now), + long txid = log.append(htd, hri, + new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, + new MultiVersionConcurrencyControl()), edit, true); log.sync(txid); @@ -326,7 +328,7 @@ public class TestWALObserver { LOG.debug("write a log edit that supports legacy cps."); final long now = EnvironmentEdgeManager.currentTime(); - final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now); + final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc); final WALEdit edit = new WALEdit(); final byte[] nonce = Bytes.toBytes("1772"); edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index e1af83a..20e56af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -6272,11 +6272,8 @@ public class TestHRegion { @Override public Long answer(InvocationOnMock invocation) throws Throwable { WALKey key = invocation.getArgumentAt(2, WALKey.class); - MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); - if (we == null) { - we = key.getMvcc().begin(); - key.setWriteEntry(we); - } + MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); + key.setWriteEntry(we); return 1L; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index d82d1df..0a0393a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -216,13 +216,15 @@ public class TestWALLockup { HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME); final HRegion region = initHRegion(tableName, null, null, dodgyWAL); byte [] bytes = Bytes.toBytes(getName()); + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); try { // First get something into memstore. Make a Put and then pull the Cell out of it. Will // manage append and sync carefully in below to manufacture hang. We keep adding same // edit. WAL subsystem doesn't care. Put put = new Put(bytes); put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); - WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName()); + WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), + htd.getTableName(), System.currentTimeMillis(), mvcc); WALEdit edit = new WALEdit(); CellScanner CellScanner = put.cellScanner(); assertTrue(CellScanner.advance()); @@ -388,12 +390,12 @@ public class TestWALLockup { HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME); final HRegion region = initHRegion(tableName, null, null, dodgyWAL1); byte[] bytes = Bytes.toBytes(getName()); - + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); try { Put put = new Put(bytes); put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), - htd.getTableName()); + htd.getTableName(), System.currentTimeMillis(), mvcc); WALEdit edit = new WALEdit(); CellScanner CellScanner = put.cellScanner(); assertTrue(CellScanner.advance()); @@ -425,7 +427,7 @@ public class TestWALLockup { // make RingBufferEventHandler sleep 1s, so the following sync // endOfBatch=false key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), - TableName.valueOf("sleep")); + TableName.valueOf("sleep"), System.currentTimeMillis(), mvcc); dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true); Thread t = new Thread("Sync") { @@ -449,7 +451,7 @@ public class TestWALLockup { } // make append throw DamagedWALException key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), - TableName.valueOf("DamagedWALException")); + TableName.valueOf("DamagedWALException"), System.currentTimeMillis(), mvcc); dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true); while (latch.getCount() > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 1fcb241..3a23a05 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -416,6 +416,7 @@ public class TestFSHLog { final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC()); wal.append(htd, info, logkey, edits, true); + region.getMVCC().completeAndWait(logkey.getWriteEntry()); } region.flush(true); // FlushResult.flushSequenceId is not visible here so go get the current sequence id. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index bac1b6f..b1eccdf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -101,7 +102,7 @@ public class TestWALActionsListener { HRegionInfo hri = new HRegionInfo(TableName.valueOf(SOME_BYTES), SOME_BYTES, SOME_BYTES, false); final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); - + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); for (int i = 0; i < 20; i++) { byte[] b = Bytes.toBytes(i+""); KeyValue kv = new KeyValue(b,b,b); @@ -111,7 +112,7 @@ public class TestWALActionsListener { htd.addFamily(new HColumnDescriptor(b)); final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), - TableName.valueOf(b), 0), edit, true); + TableName.valueOf(b), 0, mvcc), edit, true); wal.sync(txid); if (i == 10) { wal.registerWALActionsListener(laterobserver); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 2622f6d..bf28bca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -1189,7 +1189,7 @@ public class TestWALReplay { FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc), createWALEdit( rowName, family, ee, index), htd, hri, true); - entry.stampRegionSequenceId(); + entry.stampRegionSequenceId(mvcc.begin()); return entry; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java index d9454bb..b405698 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; @@ -95,11 +96,12 @@ public class TestSecureWAL { final WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes(), regioninfo.getTable().getNamespace()); + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, true); + System.currentTimeMillis(), mvcc), kvs, true); } wal.sync(); final Path walPath = DefaultWALProvider.getCurrentFileName(wal); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 9b4a968..621d092 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -370,12 +370,12 @@ public class TestWALFactory { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor(tableName.getName())); - + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, true); + System.currentTimeMillis(), mvcc), kvs, true); } // Now call sync to send the data to HDFS datanodes wal.sync();