diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ef6239d..97fc4e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -104,7 +104,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; @@ -644,9 +643,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // that has non-default scope private final NavigableMap replicationScope = new TreeMap( Bytes.BYTES_COMPARATOR); - // flag and lock for MVCC preassign - private final boolean mvccPreAssign; - private final ReentrantLock preAssignMvccLock; /** * HRegion constructor. This constructor should only be used for testing and @@ -806,14 +802,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi false : conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); - - // get mvcc pre-assign flag and lock - this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN); - if (this.mvccPreAssign) { - this.preAssignMvccLock = new ReentrantLock(); - } else { - this.preAssignMvccLock = null; - } } void setHTableSpecificConf() { @@ -3349,26 +3337,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } else { try { if (!walEdit.isEmpty()) { - try { - if (this.mvccPreAssign) { - preAssignMvccLock.lock(); - writeEntry = mvcc.begin(); - } - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, - this.getReplicationScope()); - if (this.mvccPreAssign) { - walKey.setPreAssignedWriteEntry(writeEntry); - } - // TODO: Use the doAppend methods below... complicated by the replay stuff above. - txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - } finally { - if (mvccPreAssign) { - preAssignMvccLock.unlock(); - } - } + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, + this.getReplicationScope()); + // TODO: Use the doAppend methods below... complicated by the replay stuff above. + txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); if (txid != 0) { sync(txid, durability); } @@ -3400,7 +3375,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. // 2) If no WAL, FSWALEntry won't be used // we use durability of the original mutation for the mutation passed by CP. - boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL || mvccPreAssign; + boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; if (updateSeqId) { this.updateSequenceId(familyMaps[i].values(), replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index 7424e4e..ee4fbb9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -18,12 +18,12 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; + import java.util.LinkedList; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.annotations.VisibleForTesting; - -import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -110,20 +110,30 @@ public class MultiVersionConcurrencyControl { } /** + * Call {@link #begin(Runnable)} with an empty {@link Runnable}. + */ + public WriteEntry begin() { + return begin(() -> {}); + } + + /** * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it - * to our queue of ongoing writes. Return this WriteEntry instance. - * To complete the write transaction and wait for it to be visible, call - * {@link #completeAndWait(WriteEntry)}. If the write failed, call - * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write - * transaction. + * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write + * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the + * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of + * the failed write transaction. + *

+ * The {@code action} will be executed under the lock which means it can keep the same order with + * mvcc. * @see #complete(WriteEntry) * @see #completeAndWait(WriteEntry) */ - public WriteEntry begin() { + public WriteEntry begin(Runnable action) { synchronized (writeQueue) { long nextWriteNumber = writePoint.incrementAndGet(); WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); + action.run(); return e; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 7e3bd59..211f307 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -73,6 +75,7 @@ import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; +import com.lmax.disruptor.RingBuffer; /** * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one @@ -881,11 +884,8 @@ public abstract class AbstractFSWAL implements WAL { atHeadOfRingBufferEventHandlerAppend(); long start = EnvironmentEdgeManager.currentTime(); byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); - long regionSequenceId = WALKey.NO_SEQUENCE_ID; - // We are about to append this edit; update the region-scoped sequence number. Do it - // here inside this single appending/writing thread. Events are ordered on the ringbuffer - // so region sequenceids will also be in order. - regionSequenceId = entry.stampRegionSequenceId(); + long regionSequenceId = entry.getKey().getSequenceId(); + // Edits are empty, there is nothing to append. Maybe empty when we are looking for a // region sequence id only, a region edit/sequence id that is not associated with an actual // edit. It has to go through all the rigmarole to be sure we have the right ordering. @@ -944,6 +944,42 @@ public abstract class AbstractFSWAL implements WAL { } } + protected long stampSequenceIdAndPublishToRingBuffer(HRegionInfo hri, WALKey key, WALEdit edits, + boolean inMemstore, RingBuffer ringBuffer) + throws IOException { + if (this.closed) { + throw new IOException("Cannot append; log is closed"); + } + TraceScope scope = Trace.startSpan(getClass().getSimpleName() + ".append"); + MutableLong txidHolder = new MutableLong(); + MultiVersionConcurrencyControl mvcc = key.getMvcc(); + MultiVersionConcurrencyControl.WriteEntry we = null; + if(mvcc == null) { + //special cases in UT + txidHolder.setValue(ringBuffer.next()); + } else { + we = mvcc.begin(() -> { + txidHolder.setValue(ringBuffer.next()); + }); + } + long txid = txidHolder.longValue(); + try { + FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); + if(we != null) { + entry.stampRegionSequenceId(we); + } + ringBuffer.get(txid).load(entry, scope.detach()); + } finally { + ringBuffer.publish(txid); + } + //for cases cells will not insert to inmemstore(such as write flush marker, unit test), + //mvcc writeEntry is just use for seqid, should complete it here rather than forgetting it + if(we != null && mvcc != null && !inMemstore) { + mvcc.completeAndWait(we); + } + return txid; + } + @Override public String toString() { return getClass().getSimpleName() + " " + walFilePrefix + ":" + walFileSuffix + "(num " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 20c43aa..83d93fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -544,17 +544,8 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { - if (closed) { - throw new IOException("Cannot append; log is closed"); - } - TraceScope scope = Trace.startSpan("AsyncFSWAL.append"); - long txid = waitingConsumePayloads.next(); - try { - RingBufferTruck truck = waitingConsumePayloads.get(txid); - truck.load(new FSWALEntry(txid, key, edits, hri, inMemstore), scope.detach()); - } finally { - waitingConsumePayloads.publish(txid); - } + long txid = + stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); if (shouldScheduleConsumer()) { eventLoop.execute(consumer); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 34103dd..73720cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -435,31 +435,10 @@ public class FSHLog extends AbstractFSWAL { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION", justification = "Will never be null") @Override - public long append(final HRegionInfo hri, - final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException { - if (this.closed) { - throw new IOException("Cannot append; log is closed"); - } - // Make a trace scope for the append. It is closed on other side of the ring buffer by the - // single consuming thread. Don't have to worry about it. - TraceScope scope = Trace.startSpan("FSHLog.append"); - - // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need - // all this to make a key and then below to append the edit, we need to carry htd, info, - // etc. all over the ring buffer. - FSWALEntry entry = null; - long sequence = this.disruptor.getRingBuffer().next(); - try { - RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); - // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the - // edit with its edit/sequence id. - // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. - entry = new FSWALEntry(sequence, key, edits, hri, inMemstore); - truck.load(entry, scope.detach()); - } finally { - this.disruptor.getRingBuffer().publish(sequence); - } - return sequence; + public long append(final HRegionInfo hri, final WALKey key, final WALEdit edits, + final boolean inMemstore) throws IOException { + return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, + disruptor.getRingBuffer()); } /** @@ -1009,12 +988,6 @@ public class FSHLog extends AbstractFSWAL { try { if (this.exception != null) { - // We got an exception on an earlier attempt at append. Do not let this append - // go through. Fail it but stamp the sequenceid into this append though failed. - // We need to do this to close the latch held down deep in WALKey...that is waiting - // on sequenceid assignment otherwise it will just hang out (The #append method - // called below does this also internally). - entry.stampRegionSequenceId(); // Return to keep processing events coming off the ringbuffer return; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index f599b46..7ac276d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -52,9 +52,6 @@ class FSWALEntry extends Entry { private final transient boolean inMemstore; private final transient HRegionInfo hri; private final transient Set familyNames; - // In the new WAL logic, we will rewrite failed WAL entries to new WAL file, so we need to avoid - // calling stampRegionSequenceId again. - private transient boolean stamped = false; // The tracing span for this entry when writing WAL. private transient Span span; @@ -105,38 +102,18 @@ class FSWALEntry extends Entry { } /** - * Here is where a WAL edit gets its sequenceid. - * SIDE-EFFECT is our stamping the sequenceid into every Cell AND setting the sequenceid into the - * MVCC WriteEntry!!!! + * Here is where a WAL edit gets its sequenceid. SIDE-EFFECT is our stamping the sequenceid into + * every Cell AND setting the sequenceid into the MVCC WriteEntry!!!! * @return The sequenceid we stamped on this edit. */ - long stampRegionSequenceId() throws IOException { - if (stamped) { - return getKey().getSequenceId(); - } - stamped = true; - long regionSequenceId = WALKey.NO_SEQUENCE_ID; - WALKey key = getKey(); - MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); - boolean preAssigned = (we != null); - if (!preAssigned) { - MultiVersionConcurrencyControl mvcc = key.getMvcc(); - if (mvcc != null) { - we = mvcc.begin(); - } - } - if (we != null) { - regionSequenceId = we.getWriteNumber(); - } - + long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws IOException { + long regionSequenceId = we.getWriteNumber(); if (!this.getEdit().isReplay() && inMemstore) { - for (Cell c:getEdit().getCells()) { + for (Cell c : getEdit().getCells()) { CellUtil.setSequenceId(c, regionSequenceId); } } - if (!preAssigned) { - key.setWriteEntry(we); - } + getKey().setWriteEntry(we); return regionSequenceId; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 85696b6..26781e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -147,16 +147,9 @@ public class WALUtil { WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc, replicationScope); long trx = MultiVersionConcurrencyControl.NONE; - try { - trx = wal.append(hri, walKey, edit, false); - if (sync) { - wal.sync(trx); - } - // Call complete only here because these are markers only. They are not for clients to read. - mvcc.complete(walKey.getWriteEntry()); - } catch (IOException ioe) { - mvcc.complete(walKey.getWriteEntry()); - throw ioe; + trx = wal.append(hri, walKey, edit, false); + if (sync) { + wal.sync(trx); } return walKey; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 63e6649..276ab36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.wal; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -29,30 +31,24 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.SequenceId; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * A Key for an entry in the WAL. @@ -70,8 +66,7 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; // purposes. They need to be merged into WALEntry. @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public class WALKey implements SequenceId, Comparable { - private static final Log LOG = LogFactory.getLog(WALKey.class); - private final CountDownLatch sequenceIdAssignedLatch = new CountDownLatch(1); + /** * Used to represent when a particular wal key doesn't know/care about the sequence ordering. */ @@ -93,35 +88,16 @@ public class WALKey implements SequenceId, Comparable { */ @InterfaceAudience.Private // For internal use only. public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException { - if (this.preAssignedWriteEntry != null) { - // don't wait for seqNumAssignedLatch if writeEntry is preassigned - return this.preAssignedWriteEntry; - } - try { - this.sequenceIdAssignedLatch.await(); - } catch (InterruptedException ie) { - MultiVersionConcurrencyControl mvcc = getMvcc(); - if (LOG.isDebugEnabled()) { - LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry); - } - InterruptedIOException iie = new InterruptedIOException(); - iie.initCause(ie); - throw iie; - } + assert this.writeEntry != null; return this.writeEntry; } @InterfaceAudience.Private // For internal use only. public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) { - if (this.writeEntry != null) { - throw new RuntimeException("Non-null!!!"); - } + assert this.writeEntry == null; this.writeEntry = writeEntry; // Set our sequenceid now using WriteEntry. - if (this.writeEntry != null) { - this.sequenceId = this.writeEntry.getWriteNumber(); - } - this.sequenceIdAssignedLatch.countDown(); + this.sequenceId = writeEntry.getWriteNumber(); } // REMOVE!!!! No more Writables!!!! @@ -208,7 +184,6 @@ public class WALKey implements SequenceId, Comparable { * Set in a way visible to multiple threads; e.g. synchronized getter/setters. */ private MultiVersionConcurrencyControl.WriteEntry writeEntry; - private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = null; public static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList()); // visible for deprecated HLogKey @@ -722,24 +697,4 @@ public class WALKey implements SequenceId, Comparable { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } } - - /** - * @return The preassigned writeEntry, if any - */ - @InterfaceAudience.Private // For internal use only. - public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() { - return this.preAssignedWriteEntry; - } - - /** - * Preassign writeEntry - * @param writeEntry the entry to assign - */ - @InterfaceAudience.Private // For internal use only. - public void setPreAssignedWriteEntry(WriteEntry writeEntry) { - if (writeEntry != null) { - this.preAssignedWriteEntry = writeEntry; - this.sequenceId = writeEntry.getWriteNumber(); - } - } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ba27622..3bdce5d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -5895,11 +5895,8 @@ public class TestHRegion { @Override public Long answer(InvocationOnMock invocation) throws Throwable { WALKey key = invocation.getArgumentAt(1, WALKey.class); - MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); - if (we == null) { - we = key.getMvcc().begin(); - key.setWriteEntry(we); - } + MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); + key.setWriteEntry(we); return 1L; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java new file mode 100644 index 0000000..edffa24 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java @@ -0,0 +1,200 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; + +/** + * Test for HBASE-17471 + */ +@Category({RegionServerTests.class, SmallTests.class}) +public class TestWALMonotonicallyIncreasingSeqId { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Path testDir = TEST_UTIL.getDataTestDir("TestWALMonotonicallyIncreasingSeqId"); + private WALFactory wals; + private FileSystem fileSystem; + private Configuration walConf; + + public static final String KEY_SEED = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + private static final int KEY_SEED_LEN = KEY_SEED.length(); + + private static final char[] KEY_SEED_CHARS = KEY_SEED.toCharArray(); + + private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) { + HTableDescriptor htd = new HTableDescriptor(tableName); + for (byte[] family : families) { + HColumnDescriptor hcd = new HColumnDescriptor(family); + // Set default to be three versions. + hcd.setMaxVersions(Integer.MAX_VALUE); + htd.addFamily(hcd); + } + return htd; + } + + private Region initHRegion(HTableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId) + throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("hbase.hregion.mvcc.preassign", false); + Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName()); + + HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId); + fileSystem = tableDir.getFileSystem(conf); + HRegionFileSystem fs = new HRegionFileSystem(conf, fileSystem, tableDir, info); + final Configuration walConf = new Configuration(conf); + FSUtils.setRootDir(walConf, tableDir); + this.walConf = walConf; + wals = new WALFactory(walConf, null, "log_" + replicaId); + HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), + info.getTable().getNamespace()), conf, htd, null); + region.initialize(); + return region; + } + + CountDownLatch latch = new CountDownLatch(1); + public class PutThread extends Thread { + HRegion region; + public PutThread(HRegion region) { + this.region = region; + } + + @Override + public void run() { + try { + for(int i = 0; i < 100; i++) { + byte[] row = Bytes.toBytes("putRow" + i); + Put put = new Put(row); + put.addColumn("cf".getBytes(), Bytes.toBytes(0), Bytes.toBytes("")); + //put.setDurability(Durability.ASYNC_WAL); + latch.await(); + region.batchMutate(new Mutation[]{put}); + Thread.sleep(10); + } + + + } catch (Throwable t) { + LOG.warn("Error happend when Increment: ", t); + } + + } + } + + public class IncThread extends Thread { + HRegion region; + public IncThread(HRegion region) { + this.region = region; + } + @Override + public void run() { + try { + for(int i = 0; i < 100; i++) { + byte[] row = Bytes.toBytes("incrementRow" + i); + Increment inc = new Increment(row); + inc.addColumn("cf".getBytes(), Bytes.toBytes(0), 1); + //inc.setDurability(Durability.ASYNC_WAL); + region.increment(inc); + latch.countDown(); + Thread.sleep(10); + } + + + } catch (Throwable t) { + LOG.warn("Error happend when Put: ", t); + } + + } + } + + @Test + public void TestWALMonotonicallyIncreasingSeqId() throws Exception { + byte[][] families = new byte[][] {Bytes.toBytes("cf")}; + byte[] qf = Bytes.toBytes("cq"); + HTableDescriptor htd = getTableDesc(TableName.valueOf("TestWALMonotonicallyIncreasingSeqId"), families); + HRegion region = (HRegion)initHRegion(htd, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 0); + List putThreads = new ArrayList<>(); + for(int i = 0; i < 1; i++) { + putThreads.add(new PutThread(region)); + } + IncThread incThread = new IncThread(region); + for(int i = 0; i < 1; i++) { + putThreads.get(i).start(); + } + incThread.start(); + incThread.join(); + + Path logPath = ((FSHLog) region.getWAL()).getCurrentFileName(); + region.getWAL().rollWriter(); + Thread.sleep(10); + Path hbaseDir = new Path(walConf.get(HConstants.HBASE_DIR)); + Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); + WAL.Reader reader = null; + try { + reader = wals.createReader(fileSystem, logPath); + } catch (Throwable t) { + reader = wals.createReader(fileSystem, new Path(oldWalsDir, logPath.getName())); + + } + WAL.Entry e; + try { + long currentMaxSeqid = 0; + while ((e = reader.next()) != null) { + if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { + long currentSeqid = e.getKey().getSequenceId(); + if(currentSeqid > currentMaxSeqid) { + currentMaxSeqid = currentSeqid; + } else { + Assert.fail("Current max Seqid is " + currentMaxSeqid + + ", but the next seqid in wal is smaller:" + currentSeqid); + } + } + } + } finally { + if(reader != null) { + reader.close(); + } + } + } + + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index b5c464e..7019684 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -407,7 +407,8 @@ public abstract class AbstractTestFSWAL { final HRegionInfo info = region.getRegionInfo(); final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes); - wal.append(info, logkey, edits, true); + wal.append(info, logkey, edits, false); + //region.getMVCC().completeAndWait(logkey.getWriteEntry()); } region.flush(true); // FlushResult.flushSequenceId is not visible here so go get the current sequence id. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 84bdc69..f7a88f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1190,7 +1190,7 @@ public abstract class AbstractTestWALReplay { FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit( rowName, family, ee, index), hri, true); - entry.stampRegionSequenceId(); + entry.stampRegionSequenceId(mvcc.begin()); return entry; }