diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f35d788..0badf12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -645,8 +645,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final NavigableMap replicationScope = new TreeMap( Bytes.BYTES_COMPARATOR); // flag and lock for MVCC preassign - private final boolean mvccPreAssign; - private final ReentrantLock preAssignMvccLock; + //private final boolean mvccPreAssign; + //private final ReentrantLock preAssignMvccLock; /** * HRegion constructor. This constructor should only be used for testing and @@ -808,12 +808,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); // get mvcc pre-assign flag and lock - this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN); + /*this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN); if (this.mvccPreAssign) { this.preAssignMvccLock = new ReentrantLock(); } else { this.preAssignMvccLock = null; - } + }*/ } void setHTableSpecificConf() { @@ -3351,26 +3351,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } else { try { if (!walEdit.isEmpty()) { - try { - if (this.mvccPreAssign) { - preAssignMvccLock.lock(); - writeEntry = mvcc.begin(); - } - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, - this.getReplicationScope()); - if (this.mvccPreAssign) { - walKey.setPreAssignedWriteEntry(writeEntry); - } - // TODO: Use the doAppend methods below... complicated by the replay stuff above. - txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - } finally { - if (mvccPreAssign) { - preAssignMvccLock.unlock(); - } - } + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, + now, mutation.getClusterIds(), currentNonceGroup, currentNonce, + mvcc, this.getReplicationScope()); + // TODO: Use the doAppend methods below... complicated by the replay stuff above. + txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); if (txid != 0) { sync(txid, durability); } @@ -3398,15 +3385,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; } - // We need to update the sequence id for following reasons. - // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. - // 2) If no WAL, FSWALEntry won't be used - // we use durability of the original mutation for the mutation passed by CP. - boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL || mvccPreAssign; - if (updateSeqId) { - this.updateSequenceId(familyMaps[i].values(), - replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); - } + //HBASE-17471, stamp seqid is removed from ringbuffer consume thread to increase throughput + //so, we need to stamp seqid here no matter what + this.updateSequenceId(familyMaps[i].values(), replay ? + batchOp.getReplaySequenceId() : + writeEntry.getWriteNumber()); + applyFamilyMapToMemstore(familyMaps[i], memstoreSize); } @@ -7100,11 +7084,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rewriteCellTags(m.getFamilyCellMap(), m); for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { Cell cell = cellScanner.current(); - if (walEdit.isEmpty()) { - // If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id. - // If no WAL, need to stamp it here. + //HBASE-17471, stamp seqid is removed from ringbuffer consume thread to increase throughput + //so, we need to stamp seqid here no matter what CellUtil.setSequenceId(cell, sequenceId); - } applyToMemstore(getHStore(cell), cell, memstoreSize); } } @@ -7293,8 +7275,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // transaction. recordMutationWithoutWal(mutation.getFamilyCellMap()); writeEntry = mvcc.begin(); - updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber()); } + //HBASE-17471, stamp seqid is removed from ringbuffer consume thread to increase throughput + //so, we need to stamp seqid here no matter what + updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber()); // Now write to MemStore. Do it a column family at a time. for (Map.Entry> e : forMemStore.entrySet()) { applyToMemstore(e.getKey(), e.getValue(), true, memstoreSize); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index 7424e4e..df18eb7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -129,6 +129,28 @@ public class MultiVersionConcurrencyControl { } /** + * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it + * to our queue of ongoing writes. Return this WriteEntry instance. + * @param runnable a runnable function called under the lock of writeQueue, everything inside + * runnable will be synchronized in writeQueue + * To complete the write transaction and wait for it to be visible, call + * {@link #completeAndWait(WriteEntry)}. If the write failed, call + * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write + * transaction. + * @see #complete(WriteEntry) + * @see #completeAndWait(WriteEntry) + */ + public WriteEntry begin(Runnable runnable) { + synchronized (writeQueue) { + long nextWriteNumber = writePoint.incrementAndGet(); + WriteEntry e = new WriteEntry(nextWriteNumber); + writeQueue.add(e); + runnable.run(); + return e; + } + } + + /** * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs * to complete. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 316e2f6..294c233 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -876,7 +876,8 @@ public abstract class AbstractFSWAL implements WAL { // We are about to append this edit; update the region-scoped sequence number. Do it // here inside this single appending/writing thread. Events are ordered on the ringbuffer // so region sequenceids will also be in order. - regionSequenceId = entry.stampRegionSequenceId(); + //HBASE-17471, seqid has been stamped in wal.append() + //regionSequenceId = entry.stampRegionSequenceId(); // Edits are empty, there is nothing to append. Maybe empty when we are looking for a // region sequence id only, a region edit/sequence id that is not associated with an actual // edit. It has to go through all the rigmarole to be sure we have the right ordering. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 34103dd..17548d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; @@ -448,18 +450,32 @@ public class FSHLog extends AbstractFSWAL { // all this to make a key and then below to append the edit, we need to carry htd, info, // etc. all over the ring buffer. FSWALEntry entry = null; - long sequence = this.disruptor.getRingBuffer().next(); + AtomicLong sequence = new AtomicLong(MultiVersionConcurrencyControl.NONE); + //for special cases using mokito any(), key will be null, it's OK to return + if(key == null) { + return sequence.get(); + } + MultiVersionConcurrencyControl mvcc = key.getMvcc(); + if(mvcc == null) { + sequence.set(this.disruptor.getRingBuffer().next()); + // since seqid is generated from mvcc, this case only happens in UT? + } else { + MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin(() -> { + sequence.set(this.disruptor.getRingBuffer().next()); + }); + key.setPreAssignedWriteEntry(writeEntry); + } try { - RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); + RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence.get()); // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the // edit with its edit/sequence id. // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. - entry = new FSWALEntry(sequence, key, edits, hri, inMemstore); + entry = new FSWALEntry(sequence.get(), key, edits, hri, inMemstore); truck.load(entry, scope.detach()); } finally { - this.disruptor.getRingBuffer().publish(sequence); + this.disruptor.getRingBuffer().publish(sequence.get()); } - return sequence; + return sequence.get(); } /** @@ -1014,7 +1030,8 @@ public class FSHLog extends AbstractFSWAL { // We need to do this to close the latch held down deep in WALKey...that is waiting // on sequenceid assignment otherwise it will just hang out (The #append method // called below does this also internally). - entry.stampRegionSequenceId(); + //HBASE-17471, seqid has been stamped in wal.append() + //entry.stampRegionSequenceId(); // Return to keep processing events coming off the ringbuffer return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java new file mode 100644 index 0000000..1e16029 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java @@ -0,0 +1,193 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; + +public class TestWALMonotonicallyIncreasingSeqId { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Path testDir = TEST_UTIL.getDataTestDir("TestWALMonotonicallyIncreasingSeqId"); + private WALFactory wals; + private FileSystem fileSystem; + private Configuration walConf; + + public static final String KEY_SEED = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + private static final int KEY_SEED_LEN = KEY_SEED.length(); + + private static final char[] KEY_SEED_CHARS = KEY_SEED.toCharArray(); + + private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) { + HTableDescriptor htd = new HTableDescriptor(tableName); + for (byte[] family : families) { + HColumnDescriptor hcd = new HColumnDescriptor(family); + // Set default to be three versions. + hcd.setMaxVersions(Integer.MAX_VALUE); + htd.addFamily(hcd); + } + return htd; + } + + private Region initHRegion(HTableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId) + throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("hbase.hregion.mvcc.preassign", false); + Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName()); + + HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId); + fileSystem = tableDir.getFileSystem(conf); + HRegionFileSystem fs = new HRegionFileSystem(conf, fileSystem, tableDir, info); + final Configuration walConf = new Configuration(conf); + FSUtils.setRootDir(walConf, tableDir); + this.walConf = walConf; + wals = new WALFactory(walConf, null, "log_" + replicaId); + HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), + info.getTable().getNamespace()), conf, htd, null); + region.initialize(); + return region; + } + + CountDownLatch latch = new CountDownLatch(1); + public class PutThread extends Thread { + HRegion region; + public PutThread(HRegion region) { + this.region = region; + } + + @Override + public void run() { + try { + for(int i = 0; i < 100; i++) { + byte[] row = Bytes.toBytes("putRow" + i); + Put put = new Put(row); + put.addColumn("cf".getBytes(), Bytes.toBytes(0), Bytes.toBytes("")); + //put.setDurability(Durability.ASYNC_WAL); + latch.await(); + region.batchMutate(new Mutation[]{put}); + Thread.sleep(10); + } + + + } catch (Throwable t) { + LOG.warn("Error happend when Increment: ", t); + } + + } + } + + public class IncThread extends Thread { + HRegion region; + public IncThread(HRegion region) { + this.region = region; + } + @Override + public void run() { + try { + for(int i = 0; i < 100; i++) { + byte[] row = Bytes.toBytes("incrementRow" + i); + Increment inc = new Increment(row); + inc.addColumn("cf".getBytes(), Bytes.toBytes(0), 1); + //inc.setDurability(Durability.ASYNC_WAL); + region.increment(inc); + latch.countDown(); + Thread.sleep(10); + } + + + } catch (Throwable t) { + LOG.warn("Error happend when Put: ", t); + } + + } + } + + @Test + public void TestWALMonotonicallyIncreasingSeqId() throws Exception { + byte[][] families = new byte[][] {Bytes.toBytes("cf")}; + byte[] qf = Bytes.toBytes("cq"); + HTableDescriptor htd = getTableDesc(TableName.valueOf("TestWALMonotonicallyIncreasingSeqId"), families); + HRegion region = (HRegion)initHRegion(htd, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 0); + List putThreads = new ArrayList<>(); + for(int i = 0; i < 1; i++) { + putThreads.add(new PutThread(region)); + } + IncThread incThread = new IncThread(region); + for(int i = 0; i < 1; i++) { + putThreads.get(i).start(); + } + incThread.start(); + incThread.join(); + + Path logPath = ((FSHLog) region.getWAL()).getCurrentFileName(); + region.getWAL().rollWriter(); + Thread.sleep(10); + Path hbaseDir = new Path(walConf.get(HConstants.HBASE_DIR)); + Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); + WAL.Reader reader = null; + try { + reader = wals.createReader(fileSystem, logPath); + } catch (Throwable t) { + reader = wals.createReader(fileSystem, new Path(oldWalsDir, logPath.getName())); + + } + WAL.Entry e; + try { + long currentMaxSeqid = 0; + while ((e = reader.next()) != null) { + if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { + long currentSeqid = e.getKey().getSequenceId(); + if(currentSeqid > currentMaxSeqid) { + currentMaxSeqid = currentSeqid; + } else { + Assert.fail("Current max Seqid is " + currentMaxSeqid + + ", but the next seqid in wal is smaller:" + currentSeqid); + } + } + } + } finally { + if(reader != null) { + reader.close(); + } + } + } + + +}