From ca5a0c4b3e5267bccd3d13c90083ffcc94d946b3 Mon Sep 17 00:00:00 2001 From: Walter Koetke Date: Tue, 29 Mar 2016 07:05:16 -0700 Subject: [PATCH] HBASE-15556: add extensible ConsistencyControl interface Summary: preserve existing functionality of MultiVersionConsistencyControl class but abstract it into a pluggable interface so it can be extended. Tests: existing unit tests --- .../java/org/apache/hadoop/hbase/HConstants.java | 4 +- .../hbase/regionserver/ConsistencyControl.java | 47 +++++++++ .../regionserver/ConsistencyControlUtils.java | 49 +++++++++ .../apache/hadoop/hbase/regionserver/HRegion.java | 62 ++++-------- .../MultiVersionConsistencyControl.java | 110 ++++++++++++--------- .../hadoop/hbase/regionserver/WriteEntry.java | 26 +++++ .../hadoop/hbase/regionserver/wal/FSHLog.java | 21 ++-- .../wal/HBaseWALBlockingWaitStrategy.java | 47 +++++++++ .../hbase/regionserver/TestDefaultMemStore.java | 8 +- .../TestMultiVersionConsistencyControl.java | 2 +- 10 files changed, 269 insertions(+), 107 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControl.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControlUtils.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WriteEntry.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HBaseWALBlockingWaitStrategy.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index f1f3e1a..6ec624a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -578,7 +578,9 @@ public final class HConstants { public static final String REGION_IMPL = "hbase.hregion.impl"; - /** modifyTable op for replacing the table descriptor */ + public static final String MVCC_IMPL = "hbase.mvcc.impl"; + + /** modifyTable op for replacing the table descriptor */ @InterfaceAudience.Private public static enum Modify { CLOSE_REGION, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControl.java new file mode 100644 index 0000000..8e44ac3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControl.java @@ -0,0 +1,47 @@ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Scan; +import java.io.IOException; + +public interface ConsistencyControl { + + void initialize(long startPoint); + + void advanceMemstoreReadPointIfNeeded(long seqNumber); + + WriteEntry beginMemstoreInsert(); + + WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum); + + long getReadpoint(IsolationLevel isolationLevel); + + long addScan(RegionScanner regionScanner, Scan scan); + + void removeScan(RegionScanner regionScanner); + + long getSmallestReadPoint(); + + void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId) + throws IOException; + + void completeMemstoreInsert(WriteEntry e); + + boolean advanceMemstore(WriteEntry e); + + /** + * Wait for all previous MVCC transactions complete, if required + */ + void waitForPreviousTransactionsComplete(); + + /** + * Wait for previous transactions to complete, if required + */ + void waitForPreviousTransactionsComplete(WriteEntry waitedEntry); + + /** + * Retrieve a readpoint for scans + */ + long memstoreReadPoint(); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControlUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControlUtils.java new file mode 100644 index 0000000..b39afa5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConsistencyControlUtils.java @@ -0,0 +1,49 @@ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.wal.WAL; + +import java.lang.reflect.Constructor; +import java.util.concurrent.atomic.AtomicLong; + +public class ConsistencyControlUtils { + /** + * Create ConsistencyControl + * @param config + * @return + * @throws IllegalArgumentException + */ + public static ConsistencyControl getConsistencyControl(Configuration config) throws IllegalArgumentException { + try { + @SuppressWarnings("unchecked") + Class clazz = config.getClass(HConstants.MVCC_IMPL, MultiVersionConsistencyControl.class); + return (ConsistencyControl) clazz.newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException("Consistency Control could not be created"); + } + } + + /** + * Get a mvcc write number before an actual one(its log sequence Id) being assigned + * @param sequenceId + * @return long a faked write number which is bigger enough not to be seen by others before a real + * one is assigned + */ + public static long getPreAssignedWriteNumber(AtomicLong sequenceId) { + // the 1 billion is just an arbitrary big number to guard no scanner will reach it before + // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers + // because each handler could increment sequence num twice and max concurrent in-flight + // transactions is the number of RPC handlers. + // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple + // changes touch same row key + // If for any reason, the bumped value isn't reset due to failure situations, we'll reset + // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all + return sequenceId.incrementAndGet() + 1000000000; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8aed3a6..fd5a75d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -132,7 +132,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; -import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; @@ -343,8 +342,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L; final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); - private final ConcurrentHashMap scannerReadPoints; - /** * The sequence ID that was encountered when this region was opened. */ @@ -383,21 +380,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * read operation. */ public long getSmallestReadPoint() { - long minimumReadPoint; - // We need to ensure that while we are calculating the smallestReadPoint - // no new RegionScanners can grab a readPoint that we are unaware of. - // We achieve this by synchronizing on the scannerReadPoints object. - synchronized(scannerReadPoints) { - minimumReadPoint = mvcc.memstoreReadPoint(); - - for (Long readPoint: this.scannerReadPoints.values()) { - if (readPoint < minimumReadPoint) { - minimumReadPoint = readPoint; - } - } - } - return minimumReadPoint; + return mvcc.getSmallestReadPoint(); } + /* * Data structure of write state flags used coordinating flushes, * compactions and closes. @@ -541,8 +526,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // private boolean splitRequest; private byte[] explicitSplitPoint = null; - private final MultiVersionConsistencyControl mvcc = - new MultiVersionConsistencyControl(); + private final ConsistencyControl mvcc; // Coprocessor host private RegionCoprocessorHost coprocessorHost; @@ -608,7 +592,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (confParam instanceof CompoundConfiguration) { throw new IllegalArgumentException("Need original base configuration"); } - + mvcc = ConsistencyControlUtils.getConsistencyControl(confParam); this.comparator = fs.getRegionInfo().getComparator(); this.wal = wal; this.fs = fs; @@ -635,7 +619,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.rsServices = rsServices; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); setHTableSpecificConf(); - this.scannerReadPoints = new ConcurrentHashMap(); this.busyWaitDuration = conf.getLong( "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION); @@ -1122,7 +1105,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - public MultiVersionConsistencyControl getMVCC() { + public ConsistencyControl getMVCC() { return mvcc; } @@ -1130,11 +1113,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * Returns readpoint considering given IsolationLevel */ public long getReadpoint(IsolationLevel isolationLevel) { - if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { - // This scan can read even uncommitted transactions - return Long.MAX_VALUE; - } - return mvcc.memstoreReadPoint(); + return mvcc.getReadpoint(isolationLevel); } public boolean isLoadingCfsOnDemandDefault() { @@ -1820,7 +1799,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (this.memstoreSize.get() <= 0) { // Take an update lock because am about to change the sequence id and we want the sequence id // to be at the border of the empty memstore. - MultiVersionConsistencyControl.WriteEntry w = null; + WriteEntry w = null; this.updatesLock.writeLock().lock(); try { if (this.memstoreSize.get() <= 0) { @@ -1861,7 +1840,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received // during flush - MultiVersionConsistencyControl.WriteEntry w = null; + WriteEntry w = null; // We have to take an update lock during snapshot, or else a write could end up in both snapshot // and memstore (makes it difficult to do atomic rows then) @@ -2583,7 +2562,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; WALEdit walEdit = new WALEdit(isInReplay); - MultiVersionConsistencyControl.WriteEntry w = null; + WriteEntry w = null; long txid = 0; boolean doRollBackMemstore = false; boolean locked = false; @@ -2729,7 +2708,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if(isInReplay) { mvccNum = batchOp.getReplaySequenceId(); } else { - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + mvccNum = ConsistencyControlUtils.getPreAssignedWriteNumber(this.sequenceId); } // // ------------------------------------ @@ -4227,13 +4206,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - // synchronize on scannerReadPoints so that nobody calculates - // getSmallestReadPoint, before scannerReadPoints is updated. - IsolationLevel isolationLevel = scan.getIsolationLevel(); - synchronized(scannerReadPoints) { - this.readPt = getReadpoint(isolationLevel); - scannerReadPoints.put(this, this.readPt); - } + this.readPt = mvcc.addScan(this,scan); // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). @@ -4574,8 +4547,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // joinedHeap.close(); joinedHeap = null; } - // no need to synchronize here. - scannerReadPoints.remove(this); + mvcc.removeScan(this); this.filterClosed = true; } @@ -5385,7 +5357,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return; } - MultiVersionConsistencyControl.WriteEntry writeEntry = null; + WriteEntry writeEntry = null; boolean locked; boolean walSyncSuccessful = false; List acquiredRowLocks; @@ -5406,7 +5378,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); locked = true; // Get a mvcc write number - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + mvccNum = ConsistencyControlUtils.getPreAssignedWriteNumber(this.sequenceId); long now = EnvironmentEdgeManager.currentTime(); try { @@ -5612,7 +5584,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } // now start my own transaction - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + mvccNum = ConsistencyControlUtils.getPreAssignedWriteNumber(this.sequenceId); w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTime(); // Process each family @@ -5856,7 +5828,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (r != null) return r; } // Now start my own transaction - long mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + long mvccNum = ConsistencyControlUtils.getPreAssignedWriteNumber(this.sequenceId); writeEntry = this.mvcc.beginMemstoreInsertWithSeqNum(mvccNum); // Process increments a Store/family at a time. @@ -6092,7 +6064,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 42 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (12 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index 8b9f41b..10b22fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.LinkedHashSet; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -33,14 +35,16 @@ import org.apache.hadoop.hbase.util.ClassSize; * the new writes for readers to read (thus forming atomic transactions). */ @InterfaceAudience.Private -public class MultiVersionConsistencyControl { +public class MultiVersionConsistencyControl implements ConsistencyControl { static final long NO_WRITE_NUMBER = 0; private volatile long memstoreRead = 0; private final Object readWaiters = new Object(); + private final ConcurrentHashMap scannerReadPoints = + new ConcurrentHashMap(); // This is the pending queue of writes. private final LinkedHashSet writeQueue = - new LinkedHashSet(); + new LinkedHashSet(); /** * Default constructor. Initializes the memstoreRead/Write points to 0. @@ -52,6 +56,7 @@ public class MultiVersionConsistencyControl { * Initializes the memstoreRead/Write points appropriately. * @param startPoint */ + @Override public void initialize(long startPoint) { synchronized (writeQueue) { writeQueue.clear(); @@ -59,32 +64,11 @@ public class MultiVersionConsistencyControl { } } - /** - * - * @param initVal The value we used initially and expected it'll be reset later - * @return WriteEntry instance. - */ - WriteEntry beginMemstoreInsert() { + @Override + public WriteEntry beginMemstoreInsert() { return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER); } - /** - * Get a mvcc write number before an actual one(its log sequence Id) being assigned - * @param sequenceId - * @return long a faked write number which is bigger enough not to be seen by others before a real - * one is assigned - */ - public static long getPreAssignedWriteNumber(AtomicLong sequenceId) { - // the 1 billion is just an arbitrary big number to guard no scanner will reach it before - // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers - // because each handler could increment sequence num twice and max concurrent in-flight - // transactions is the number of RPC handlers. - // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple - // changes touch same row key - // If for any reason, the bumped value isn't reset due to failure situations, we'll reset - // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all - return sequenceId.incrementAndGet() + 1000000000; - } /** * This function starts a MVCC transaction with current region's log change sequence number. Since @@ -99,6 +83,7 @@ public class MultiVersionConsistencyControl { * @param curSeqNum * @return WriteEntry a WriteEntry instance with the passed in curSeqNum */ + @Override public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { return beginMemstoreInsertWithSeqNum(curSeqNum, false); } @@ -121,6 +106,7 @@ public class MultiVersionConsistencyControl { * visible to MVCC readers. * @throws IOException */ + @Override public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId) throws IOException { if(e == null) return; @@ -139,6 +125,7 @@ public class MultiVersionConsistencyControl { * end of this call, the global read point is at least as large as the write point of the passed * in WriteEntry. Thus, the write is visible to MVCC readers. */ + @Override public void completeMemstoreInsert(WriteEntry e) { waitForPreviousTransactionsComplete(e); } @@ -154,7 +141,8 @@ public class MultiVersionConsistencyControl { * @param e * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) */ - boolean advanceMemstore(WriteEntry e) { + @Override + public boolean advanceMemstore(WriteEntry e) { long nextReadValue = -1; synchronized (writeQueue) { e.markCompleted(); @@ -194,7 +182,8 @@ public class MultiVersionConsistencyControl { * Advances the current read point to be given seqNum if it is smaller than * that. */ - void advanceMemstoreReadPointIfNeeded(long seqNum) { + @Override + public void advanceMemstoreReadPointIfNeeded(long seqNum) { synchronized (writeQueue) { if (this.memstoreRead < seqNum) { memstoreRead = seqNum; @@ -210,6 +199,50 @@ public class MultiVersionConsistencyControl { waitForPreviousTransactionsComplete(w); } + public long addScan(RegionScanner regionScanner, Scan scan) { + IsolationLevel isolationLevel = scan.getIsolationLevel(); + synchronized(scannerReadPoints) { + long readPt = getReadpoint(isolationLevel); + scannerReadPoints.put(regionScanner, readPt); + return readPt; + } + } + + public void removeScan(RegionScanner regionScanner) { + scannerReadPoints.remove(regionScanner); + } + + public long getReadpoint(IsolationLevel isolationLevel) { + if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { + // This scan can read even uncommitted transactions + return Long.MAX_VALUE; + } + return memstoreReadPoint(); + } + + + /** + * @return The smallest mvcc readPoint across all the scanners in this + * region. Writes older than this readPoint, are included in every + * read operation. + */ + public long getSmallestReadPoint() { + long minimumReadPoint; + // We need to ensure that while we are calculating the smallestReadPoint + // no new RegionScanners can grab a readPoint that we are unaware of. + // We achieve this by synchronizing on the scannerReadPoints object. + synchronized(scannerReadPoints) { + minimumReadPoint = memstoreReadPoint(); + + for (Long readPoint: this.scannerReadPoints.values()) { + if (readPoint < minimumReadPoint) { + minimumReadPoint = readPoint; + } + } + } + return minimumReadPoint; + } + public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) { boolean interrupted = false; WriteEntry w = waitedEntry; @@ -253,27 +286,6 @@ public class MultiVersionConsistencyControl { return memstoreRead; } - public static class WriteEntry { - private long writeNumber; - private volatile boolean completed = false; - - WriteEntry(long writeNumber) { - this.writeNumber = writeNumber; - } - void markCompleted() { - this.completed = true; - } - boolean isCompleted() { - return this.completed; - } - long getWriteNumber() { - return this.writeNumber; - } - void setWriteNumber(long val){ - this.writeNumber = val; - } - } - public static final long FIXED_SIZE = ClassSize.align( ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WriteEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WriteEntry.java new file mode 100644 index 0000000..e7953dc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WriteEntry.java @@ -0,0 +1,26 @@ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +public class WriteEntry { + private long writeNumber; + private volatile boolean completed = false; + + WriteEntry(long writeNumber) { + this.writeNumber = writeNumber; + } + void markCompleted() { + this.completed = true; + } + boolean isCompleted() { + return this.completed; + } + long getWriteNumber() { + return this.writeNumber; + } + void setWriteNumber(long val){ + this.writeNumber = val; + } + +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 85de419..d737232 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import com.lmax.disruptor.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -569,7 +570,8 @@ public class FSHLog implements WAL { // spinning as other strategies do. this.disruptor = new Disruptor(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount, - this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy()); + this.appendExecutor, ProducerType.MULTI, + new HBaseWALBlockingWaitStrategy(conf.getLong("hbase.wal.disruptor.wait.time", HBaseWALBlockingWaitStrategy.DEFAULT_NANO_SECOND_DELAY))); // Advance the ring buffer sequence so that it starts from 1 instead of 0, // because SyncFuture.NOT_DONE = 0. this.disruptor.getRingBuffer().next(); @@ -1457,7 +1459,11 @@ public class FSHLog implements WAL { private void postSync(final long timeInNanos, final int handlerSyncs) { if (timeInNanos > this.slowSyncNs) { String msg = - new StringBuilder().append("Slow sync cost: ") + new StringBuilder().append("Slow sync cost (") + .append(handlerSyncs) + .append(",") + .append(this.getCurrentFileName()) + .append("): ") .append(timeInNanos / 1000000).append(" ms, current pipeline: ") .append(Arrays.toString(getPipeLine())).toString(); Trace.addTimelineAnnotation(msg); @@ -1831,14 +1837,15 @@ public class FSHLog implements WAL { // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll // add appends to dfsclient as they come in. Batching appends doesn't give any significant // benefit on measurement. Handler sync calls we will batch up. - try { if (truck.hasSyncFuturePayload()) { - this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload(); + this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload(); // Force flush of syncs if we are carrying a full complement of syncFutures. - if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true; + if (this.syncFuturesCount == this.syncFutures.length) { + endOfBatch = true; + } } else if (truck.hasFSWALEntryPayload()) { - TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload()); + TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload()); try { append(truck.unloadFSWALEntryPayload()); } catch (Exception e) { @@ -1867,7 +1874,7 @@ public class FSHLog implements WAL { // Now we have a batch. - if (LOG.isTraceEnabled()) { + if (LOG.isTraceEnabled()) { LOG.trace("Sequence=" + sequence + ", syncCount=" + this.syncFuturesCount); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HBaseWALBlockingWaitStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HBaseWALBlockingWaitStrategy.java new file mode 100644 index 0000000..fc437d3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HBaseWALBlockingWaitStrategy.java @@ -0,0 +1,47 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +import com.lmax.disruptor.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.util.concurrent.locks.LockSupport; + +/** + * A Blocking Wait Strategy that will batch up writes to limit the HBase impact on HDFS. + */ +public class HBaseWALBlockingWaitStrategy implements WaitStrategy { + static final Log LOG = LogFactory.getLog(HBaseWALBlockingWaitStrategy.class); + // 2 Millisecond Delay by default + public static final long DEFAULT_NANO_SECOND_DELAY = 1000*1000*2; + private final long nanoSecondDelay; + private WaitStrategy blockingWaitStrategy; + private long lastTimestamp; + + public HBaseWALBlockingWaitStrategy() { + this(DEFAULT_NANO_SECOND_DELAY); + } + + public HBaseWALBlockingWaitStrategy(long nanoSecondDelay) { + blockingWaitStrategy = new BlockingWaitStrategy(); + this.nanoSecondDelay = nanoSecondDelay; + } + + @Override + public long waitFor(final long sequence, Sequence cursorSequence, final Sequence dependentSequence, final SequenceBarrier barrier) + throws AlertException, InterruptedException, TimeoutException { + if (lastTimestamp==0) + LockSupport.parkNanos(nanoSecondDelay); + else { + long nextDelay = nanoSecondDelay - (System.currentTimeMillis() - lastTimestamp)*1000*1000; + if (nextDelay > 0) + LockSupport.parkNanos(nextDelay); + } + long availableSequence = blockingWaitStrategy.waitFor(sequence, cursorSequence, dependentSequence, barrier); + lastTimestamp = System.currentTimeMillis(); + return availableSequence; + } + + @Override + public void signalAllWhenBlocking() { + blockingWaitStrategy.signalAllWhenBlocking(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 1b0af3e..505af70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -239,7 +239,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v = Bytes.toBytes("value"); - MultiVersionConsistencyControl.WriteEntry w = + WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv1 = new KeyValue(row, f, q1, v); @@ -283,7 +283,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v2 = Bytes.toBytes("value2"); // INSERT 1: Write both columns val1 - MultiVersionConsistencyControl.WriteEntry w = + WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); @@ -335,7 +335,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 - MultiVersionConsistencyControl.WriteEntry w = + WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); @@ -409,7 +409,7 @@ public class TestDefaultMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { - MultiVersionConsistencyControl.WriteEntry w = + WriteEntry w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); // Insert the sequence value (i) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java index e876a94..81ad627 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java @@ -48,7 +48,7 @@ public class TestMultiVersionConsistencyControl extends TestCase { public void run() { AtomicLong startPoint = new AtomicLong(); while (!finished.get()) { - MultiVersionConsistencyControl.WriteEntry e = + WriteEntry e = mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet()); // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) -- 2.3.2 (Apple Git-55)