diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 4a8f55c..b0720af 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1315,6 +1315,10 @@ public final class HConstants { public static final String DEFAULT_TEMPORARY_HDFS_DIRECTORY = "/user/" + System.getProperty("user.name") + "/hbase-staging"; + /** Config key for using mvcc pre-assign feature for put */ + public static final String HREGION_MVCC_PRE_ASSIGN = "hbase.hregion.mvcc.preassign"; + public static final boolean DEFAULT_HREGION_MVCC_PRE_ASSIGN = true; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index debaec9..bee49ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -78,6 +78,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -592,6 +593,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // that has non-default scope private final NavigableMap replicationScope = new TreeMap( Bytes.BYTES_COMPARATOR); + // flag and lock for MVCC preassign + private final boolean mvccPreAssign; + private final ReentrantLock preAssignMvccLock; /** * HRegion constructor. This constructor should only be used for testing and @@ -751,6 +755,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi false : conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + + // get mvcc pre-assign flag and lock + this.mvccPreAssign = conf.getBoolean(HConstants.HREGION_MVCC_PRE_ASSIGN, + HConstants.DEFAULT_HREGION_MVCC_PRE_ASSIGN); + if (this.mvccPreAssign) { + this.preAssignMvccLock = new ReentrantLock(); + } else { + this.preAssignMvccLock = null; + } } void setHTableSpecificConf() { @@ -3225,36 +3238,61 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // STEP 4. Append the final edit to WAL and sync. Mutation mutation = batchOp.getMutation(firstIndex); WALKey walKey = null; + long txid; if (replay) { // use wal key from the original walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId()); - } - // Not sure what is going on here when replay is going on... does the below append get - // called for replayed edits? Am afraid to change it without test. - if (!walEdit.isEmpty()) { - if (!replay) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, - this.getReplicationScope()); - } - // TODO: Use the doAppend methods below... complicated by the replay stuff above. + if (!walEdit.isEmpty()) { + txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); + if (txid != 0) { + sync(txid, durability); + } + } + } else { try { - long txid = this.wal.append(this.getRegionInfo(), walKey, - walEdit, true); - if (txid != 0) sync(txid, durability); - writeEntry = walKey.getWriteEntry(); + if (!walEdit.isEmpty()) { + try { + if (this.mvccPreAssign) { + preAssignMvccLock.lock(); + writeEntry = mvcc.begin(); + } + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, + this.getReplicationScope()); + if (this.mvccPreAssign) { + walKey.setPreAssignedWriteEntry(writeEntry); + } + // TODO: Use the doAppend methods below... complicated by the replay stuff above. + txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); + } finally { + if (mvccPreAssign) { + preAssignMvccLock.unlock(); + } + } + if (txid != 0) { + sync(txid, durability); + } + if (writeEntry == null) { + // if MVCC not preassigned, wait here until assigned + writeEntry = walKey.getWriteEntry(); + } + } } catch (IOException ioe) { - if (walKey != null) mvcc.complete(walKey.getWriteEntry()); + if (walKey != null && writeEntry == null) { + // the writeEntry is not preassigned and error occurred during append or sync + mvcc.complete(walKey.getWriteEntry()); + } throw ioe; } } if (walKey == null) { - // If no walKey, then skipping WAL or some such. Being an mvcc transaction so sequenceid. + // If no walKey, then not in replay and skipping WAL or some such. Begin an MVCC transaction + // to get sequence id. writeEntry = mvcc.begin(); } @@ -3270,7 +3308,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // STEP 6. Complete mvcc. if (replay) { this.mvcc.advanceTo(batchOp.getReplaySequenceId()); - } else if (writeEntry != null/*Can be null if in replay mode*/) { + } else { + // writeEntry won't be empty if not in replay mode + assert writeEntry != null; mvcc.completeAndWait(writeEntry); writeEntry = null; } @@ -7589,9 +7629,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 49 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + - 5 * Bytes.SIZEOF_BOOLEAN); + 6 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: // 1 x HashMap - coprocessorServiceHandlers diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 72474a0..c4546f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -112,11 +112,16 @@ class FSWALEntry extends Entry { } stamped = true; long regionSequenceId = WALKey.NO_SEQUENCE_ID; - MultiVersionConcurrencyControl mvcc = getKey().getMvcc(); - MultiVersionConcurrencyControl.WriteEntry we = null; - - if (mvcc != null) { - we = mvcc.begin(); + WALKey key = getKey(); + MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); + boolean preAssigned = (we != null); + if (!preAssigned) { + MultiVersionConcurrencyControl mvcc = key.getMvcc(); + if (mvcc != null) { + we = mvcc.begin(); + } + } + if (we != null) { regionSequenceId = we.getWriteNumber(); } @@ -125,7 +130,9 @@ class FSWALEntry extends Entry { CellUtil.setSequenceId(c, regionSequenceId); } } - getKey().setWriteEntry(we); + if (!preAssigned) { + key.setWriteEntry(we); + } return regionSequenceId; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 86fdfbd..ce6369e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.SequenceId; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; @@ -91,6 +92,10 @@ public class WALKey implements SequenceId, Comparable { */ @InterfaceAudience.Private // For internal use only. public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException { + if (this.preAssignedWriteEntry != null) { + // don't wait for seqNumAssignedLatch if writeEntry is preassigned + return this.preAssignedWriteEntry; + } try { this.sequenceIdAssignedLatch.await(); } catch (InterruptedException ie) { @@ -202,6 +207,7 @@ public class WALKey implements SequenceId, Comparable { * Set in a way visible to multiple threads; e.g. synchronized getter/setters. */ private MultiVersionConcurrencyControl.WriteEntry writeEntry; + private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = null; public static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList()); // visible for deprecated HLogKey @@ -730,4 +736,24 @@ public class WALKey implements SequenceId, Comparable { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } } + + /** + * @return The preassigned writeEntry, if any + */ + @InterfaceAudience.Private // For internal use only. + public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() { + return this.preAssignedWriteEntry; + } + + /** + * Preassign writeEntry + * @param writeEntry the entry to assign + */ + @InterfaceAudience.Private // For internal use only. + public void setPreAssignedWriteEntry(WriteEntry writeEntry) { + if (writeEntry != null) { + this.preAssignedWriteEntry = writeEntry; + this.sequenceId = writeEntry.getWriteNumber(); + } + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 2042f52..c2f16bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -6223,8 +6223,11 @@ public class TestHRegion { @Override public Long answer(InvocationOnMock invocation) throws Throwable { WALKey key = invocation.getArgumentAt(1, WALKey.class); - MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); - key.setWriteEntry(we); + MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); + if (we == null) { + we = key.getMvcc().begin(); + key.setWriteEntry(we); + } return 1L; }