From 7cced1971373c367daf921abef1c42c7fc4eb41f Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 23 Jan 2017 13:13:56 +0800 Subject: [PATCH] HBASE-17471 Region Seqid will be out of order in WAL if using mvccPreAssign --- .../apache/hadoop/hbase/regionserver/HRegion.java | 41 +++---------- .../MultiVersionConcurrencyControl.java | 28 ++++++--- .../hbase/regionserver/wal/AbstractFSWAL.java | 32 ++++++++-- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 13 +--- .../hadoop/hbase/regionserver/wal/FSHLog.java | 35 ++--------- .../hadoop/hbase/regionserver/wal/FSWALEntry.java | 35 ++--------- .../java/org/apache/hadoop/hbase/wal/WALKey.java | 69 ++++------------------ .../hadoop/hbase/regionserver/TestHRegion.java | 7 +-- .../regionserver/wal/AbstractTestWALReplay.java | 2 +- .../regionserver/wal/TestWALActionsListener.java | 24 ++++---- 10 files changed, 95 insertions(+), 191 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ef6239d..97fc4e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -104,7 +104,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; @@ -644,9 +643,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // that has non-default scope private final NavigableMap replicationScope = new TreeMap( Bytes.BYTES_COMPARATOR); - // flag and lock for MVCC preassign - private final boolean mvccPreAssign; - private final ReentrantLock preAssignMvccLock; /** * HRegion constructor. This constructor should only be used for testing and @@ -806,14 +802,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi false : conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); - - // get mvcc pre-assign flag and lock - this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN); - if (this.mvccPreAssign) { - this.preAssignMvccLock = new ReentrantLock(); - } else { - this.preAssignMvccLock = null; - } } void setHTableSpecificConf() { @@ -3349,26 +3337,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } else { try { if (!walEdit.isEmpty()) { - try { - if (this.mvccPreAssign) { - preAssignMvccLock.lock(); - writeEntry = mvcc.begin(); - } - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, - this.getReplicationScope()); - if (this.mvccPreAssign) { - walKey.setPreAssignedWriteEntry(writeEntry); - } - // TODO: Use the doAppend methods below... complicated by the replay stuff above. - txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - } finally { - if (mvccPreAssign) { - preAssignMvccLock.unlock(); - } - } + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, + this.getReplicationScope()); + // TODO: Use the doAppend methods below... complicated by the replay stuff above. + txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); if (txid != 0) { sync(txid, durability); } @@ -3400,7 +3375,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. // 2) If no WAL, FSWALEntry won't be used // we use durability of the original mutation for the mutation passed by CP. - boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL || mvccPreAssign; + boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; if (updateSeqId) { this.updateSequenceId(familyMaps[i].values(), replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index 7424e4e..ee4fbb9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -18,12 +18,12 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; + import java.util.LinkedList; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.annotations.VisibleForTesting; - -import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -110,20 +110,30 @@ public class MultiVersionConcurrencyControl { } /** + * Call {@link #begin(Runnable)} with an empty {@link Runnable}. + */ + public WriteEntry begin() { + return begin(() -> {}); + } + + /** * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it - * to our queue of ongoing writes. Return this WriteEntry instance. - * To complete the write transaction and wait for it to be visible, call - * {@link #completeAndWait(WriteEntry)}. If the write failed, call - * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write - * transaction. + * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write + * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the + * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of + * the failed write transaction. + *

+ * The {@code action} will be executed under the lock which means it can keep the same order with + * mvcc. * @see #complete(WriteEntry) * @see #completeAndWait(WriteEntry) */ - public WriteEntry begin() { + public WriteEntry begin(Runnable action) { synchronized (writeQueue) { long nextWriteNumber = writePoint.incrementAndGet(); WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); + action.run(); return e; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 7e3bd59..cad8029 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -73,6 +75,7 @@ import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; +import com.lmax.disruptor.RingBuffer; /** * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one @@ -881,11 +884,8 @@ public abstract class AbstractFSWAL implements WAL { atHeadOfRingBufferEventHandlerAppend(); long start = EnvironmentEdgeManager.currentTime(); byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); - long regionSequenceId = WALKey.NO_SEQUENCE_ID; - // We are about to append this edit; update the region-scoped sequence number. Do it - // here inside this single appending/writing thread. Events are ordered on the ringbuffer - // so region sequenceids will also be in order. - regionSequenceId = entry.stampRegionSequenceId(); + long regionSequenceId = entry.getKey().getSequenceId(); + // Edits are empty, there is nothing to append. Maybe empty when we are looking for a // region sequence id only, a region edit/sequence id that is not associated with an actual // edit. It has to go through all the rigmarole to be sure we have the right ordering. @@ -944,6 +944,28 @@ public abstract class AbstractFSWAL implements WAL { } } + protected long stampSequenceIdAndPublishToRingBuffer(HRegionInfo hri, WALKey key, WALEdit edits, + boolean inMemstore, RingBuffer ringBuffer) + throws IOException { + if (this.closed) { + throw new IOException("Cannot append; log is closed"); + } + TraceScope scope = Trace.startSpan(getClass().getSimpleName() + ".append"); + MutableLong txidHolder = new MutableLong(); + MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { + txidHolder.setValue(ringBuffer.next()); + }); + long txid = txidHolder.longValue(); + try { + FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); + entry.stampRegionSequenceId(we); + ringBuffer.get(txid).load(entry, scope.detach()); + } finally { + ringBuffer.publish(txid); + } + return txid; + } + @Override public String toString() { return getClass().getSimpleName() + " " + walFilePrefix + ":" + walFileSuffix + "(num " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 20c43aa..83d93fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -544,17 +544,8 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { - if (closed) { - throw new IOException("Cannot append; log is closed"); - } - TraceScope scope = Trace.startSpan("AsyncFSWAL.append"); - long txid = waitingConsumePayloads.next(); - try { - RingBufferTruck truck = waitingConsumePayloads.get(txid); - truck.load(new FSWALEntry(txid, key, edits, hri, inMemstore), scope.detach()); - } finally { - waitingConsumePayloads.publish(txid); - } + long txid = + stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); if (shouldScheduleConsumer()) { eventLoop.execute(consumer); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 34103dd..73720cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -435,31 +435,10 @@ public class FSHLog extends AbstractFSWAL { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION", justification = "Will never be null") @Override - public long append(final HRegionInfo hri, - final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException { - if (this.closed) { - throw new IOException("Cannot append; log is closed"); - } - // Make a trace scope for the append. It is closed on other side of the ring buffer by the - // single consuming thread. Don't have to worry about it. - TraceScope scope = Trace.startSpan("FSHLog.append"); - - // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need - // all this to make a key and then below to append the edit, we need to carry htd, info, - // etc. all over the ring buffer. - FSWALEntry entry = null; - long sequence = this.disruptor.getRingBuffer().next(); - try { - RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); - // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the - // edit with its edit/sequence id. - // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. - entry = new FSWALEntry(sequence, key, edits, hri, inMemstore); - truck.load(entry, scope.detach()); - } finally { - this.disruptor.getRingBuffer().publish(sequence); - } - return sequence; + public long append(final HRegionInfo hri, final WALKey key, final WALEdit edits, + final boolean inMemstore) throws IOException { + return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, + disruptor.getRingBuffer()); } /** @@ -1009,12 +988,6 @@ public class FSHLog extends AbstractFSWAL { try { if (this.exception != null) { - // We got an exception on an earlier attempt at append. Do not let this append - // go through. Fail it but stamp the sequenceid into this append though failed. - // We need to do this to close the latch held down deep in WALKey...that is waiting - // on sequenceid assignment otherwise it will just hang out (The #append method - // called below does this also internally). - entry.stampRegionSequenceId(); // Return to keep processing events coming off the ringbuffer return; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index f599b46..7ac276d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -52,9 +52,6 @@ class FSWALEntry extends Entry { private final transient boolean inMemstore; private final transient HRegionInfo hri; private final transient Set familyNames; - // In the new WAL logic, we will rewrite failed WAL entries to new WAL file, so we need to avoid - // calling stampRegionSequenceId again. - private transient boolean stamped = false; // The tracing span for this entry when writing WAL. private transient Span span; @@ -105,38 +102,18 @@ class FSWALEntry extends Entry { } /** - * Here is where a WAL edit gets its sequenceid. - * SIDE-EFFECT is our stamping the sequenceid into every Cell AND setting the sequenceid into the - * MVCC WriteEntry!!!! + * Here is where a WAL edit gets its sequenceid. SIDE-EFFECT is our stamping the sequenceid into + * every Cell AND setting the sequenceid into the MVCC WriteEntry!!!! * @return The sequenceid we stamped on this edit. */ - long stampRegionSequenceId() throws IOException { - if (stamped) { - return getKey().getSequenceId(); - } - stamped = true; - long regionSequenceId = WALKey.NO_SEQUENCE_ID; - WALKey key = getKey(); - MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); - boolean preAssigned = (we != null); - if (!preAssigned) { - MultiVersionConcurrencyControl mvcc = key.getMvcc(); - if (mvcc != null) { - we = mvcc.begin(); - } - } - if (we != null) { - regionSequenceId = we.getWriteNumber(); - } - + long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws IOException { + long regionSequenceId = we.getWriteNumber(); if (!this.getEdit().isReplay() && inMemstore) { - for (Cell c:getEdit().getCells()) { + for (Cell c : getEdit().getCells()) { CellUtil.setSequenceId(c, regionSequenceId); } } - if (!preAssigned) { - key.setWriteEntry(we); - } + getKey().setWriteEntry(we); return regionSequenceId; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 63e6649..276ab36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.wal; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -29,30 +31,24 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.SequenceId; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * A Key for an entry in the WAL. @@ -70,8 +66,7 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; // purposes. They need to be merged into WALEntry. @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public class WALKey implements SequenceId, Comparable { - private static final Log LOG = LogFactory.getLog(WALKey.class); - private final CountDownLatch sequenceIdAssignedLatch = new CountDownLatch(1); + /** * Used to represent when a particular wal key doesn't know/care about the sequence ordering. */ @@ -93,35 +88,16 @@ public class WALKey implements SequenceId, Comparable { */ @InterfaceAudience.Private // For internal use only. public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException { - if (this.preAssignedWriteEntry != null) { - // don't wait for seqNumAssignedLatch if writeEntry is preassigned - return this.preAssignedWriteEntry; - } - try { - this.sequenceIdAssignedLatch.await(); - } catch (InterruptedException ie) { - MultiVersionConcurrencyControl mvcc = getMvcc(); - if (LOG.isDebugEnabled()) { - LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry); - } - InterruptedIOException iie = new InterruptedIOException(); - iie.initCause(ie); - throw iie; - } + assert this.writeEntry != null; return this.writeEntry; } @InterfaceAudience.Private // For internal use only. public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) { - if (this.writeEntry != null) { - throw new RuntimeException("Non-null!!!"); - } + assert this.writeEntry == null; this.writeEntry = writeEntry; // Set our sequenceid now using WriteEntry. - if (this.writeEntry != null) { - this.sequenceId = this.writeEntry.getWriteNumber(); - } - this.sequenceIdAssignedLatch.countDown(); + this.sequenceId = writeEntry.getWriteNumber(); } // REMOVE!!!! No more Writables!!!! @@ -208,7 +184,6 @@ public class WALKey implements SequenceId, Comparable { * Set in a way visible to multiple threads; e.g. synchronized getter/setters. */ private MultiVersionConcurrencyControl.WriteEntry writeEntry; - private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = null; public static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList()); // visible for deprecated HLogKey @@ -722,24 +697,4 @@ public class WALKey implements SequenceId, Comparable { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } } - - /** - * @return The preassigned writeEntry, if any - */ - @InterfaceAudience.Private // For internal use only. - public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() { - return this.preAssignedWriteEntry; - } - - /** - * Preassign writeEntry - * @param writeEntry the entry to assign - */ - @InterfaceAudience.Private // For internal use only. - public void setPreAssignedWriteEntry(WriteEntry writeEntry) { - if (writeEntry != null) { - this.preAssignedWriteEntry = writeEntry; - this.sequenceId = writeEntry.getWriteNumber(); - } - } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ba27622..3bdce5d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -5895,11 +5895,8 @@ public class TestHRegion { @Override public Long answer(InvocationOnMock invocation) throws Throwable { WALKey key = invocation.getArgumentAt(1, WALKey.class); - MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); - if (we == null) { - we = key.getMvcc().begin(); - key.setWriteEntry(we); - } + MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); + key.setWriteEntry(we); return 1L; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 84bdc69..f7a88f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1190,7 +1190,7 @@ public abstract class AbstractTestWALReplay { FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit( rowName, family, ee, index), hri, true); - entry.stampRegionSequenceId(); + entry.stampRegionSequenceId(mvcc.begin()); return entry; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index 52f8fe7..4d2ed19 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -18,23 +18,30 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.junit.Assert.assertEquals; + import java.util.ArrayList; import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.junit.After; import org.junit.Before; @@ -42,14 +49,11 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.*; - /** * Test that the actions are called while playing with an WAL */ @Category({RegionServerTests.class, SmallTests.class}) public class TestWALActionsListener { - private static final Log LOG = LogFactory.getLog(TestWALActionsListener.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -92,9 +96,9 @@ public class TestWALActionsListener { HRegionInfo hri = new HRegionInfo(TableName.valueOf(SOME_BYTES), SOME_BYTES, SOME_BYTES, false); final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); - + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); for (int i = 0; i < 20; i++) { - byte[] b = Bytes.toBytes(i+""); + byte[] b = Bytes.toBytes(i + ""); KeyValue kv = new KeyValue(b,b,b); WALEdit edit = new WALEdit(); edit.add(kv); @@ -106,7 +110,7 @@ public class TestWALActionsListener { scopes.put(fam, 0); } final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), - TableName.valueOf(b), 0, scopes), edit, true); + TableName.valueOf(b), 0, mvcc, scopes), edit, true); wal.sync(txid); if (i == 10) { wal.registerWALActionsListener(laterobserver); -- 2.7.4