.../hbase/regionserver/wal/AbstractFSWAL.java | 4 +- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 245 ++++++++++++++++----- .../hadoop/hbase/regionserver/wal/FSHLog.java | 6 +- .../hadoop/hbase/regionserver/wal/SyncFuture.java | 23 +- .../hbase/regionserver/wal/TestSyncFuture.java | 6 +- 5 files changed, 223 insertions(+), 61 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 0ef0cf7..feb5a59 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -828,10 +828,10 @@ public abstract class AbstractFSWAL implements WAL { protected SyncFuture getSyncFuture(final long sequence, Span span) { SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); if (syncFuture == null) { - syncFuture = new SyncFuture(sequence, span); + syncFuture = new SyncFuture(span); this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture); } else { - syncFuture.reset(sequence, span); + syncFuture.reset(span); } return syncFuture; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 279a6ae..14e7a47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -21,6 +21,13 @@ import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; import io.netty.channel.EventLoop; import io.netty.util.concurrent.Future; @@ -50,12 +57,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog.RingBufferExceptionHandler; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.ipc.RemoteException; import org.apache.htrace.NullScope; +import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -150,37 +160,69 @@ public class AsyncFSWAL extends AbstractFSWAL { */ private static final class Payload { + public Payload() { + } // a wal entry which need to be appended - public final FSWALEntry entry; + private EntryHolder entryHolder; + + private FSWALEntry fsWALEntry; // indicate that we need to sync our wal writer. - public final SyncFuture sync; + private SyncFuture sync; // incidate that we want to roll the writer. - public final Promise roll; + private Promise roll; + + private void createFSWALEntry(long txid) { + assert entryHolder != null; + this.fsWALEntry = + new FSWALEntry(txid, this.entryHolder.getWALKey(), this.entryHolder.getWALEdits(), + this.entryHolder.getHRegionInfo(), this.entryHolder.isInMemstore()); + this.sync = null; + this.roll = null; + this.entryHolder = null; + } - public Payload(FSWALEntry entry) { - this.entry = entry; + private void loadEntryHolder(EntryHolder entry) { + this.entryHolder = entry; this.sync = null; this.roll = null; + this.fsWALEntry = null; } - public Payload(SyncFuture sync) { - this.entry = null; + private void loadSync(SyncFuture sync) { + this.entryHolder = null; this.sync = sync; this.roll = null; + this.fsWALEntry = null; } - public Payload(Promise roll) { - this.entry = null; + private Payload(Promise roll) { + this.entryHolder = null; this.sync = null; this.roll = roll; + this.fsWALEntry = null; + } + + private void updateSyncFuture(long txid) { + if (this.sync != null) { + this.sync.setTxid(txid); + } } @Override public String toString() { - return "Payload [entry=" + entry + ", sync=" + sync + ", roll=" + roll + "]"; + return "Payload [entry=" + entryHolder + ", sync=" + sync + ", roll=" + roll +"]"; } + + /** + * Factory for making a bunch of these. Needed by the ringbuffer/disruptor. + */ + final static EventFactory EVENT_FACTORY = new EventFactory() { + public Payload newInstance() { + return new Payload(); + } + }; } private final EventLoop eventLoop; @@ -259,9 +301,9 @@ public class AsyncFSWAL extends AbstractFSWAL { } synchronized (waitingConsumePayloads) { for (Payload p : waitingConsumePayloads) { - if (p.entry != null) { + if (p.fsWALEntry != null) { try { - p.entry.stampRegionSequenceId(); + p.fsWALEntry.stampRegionSequenceId(); } catch (IOException e) { throw new AssertionError("should not happen", e); } @@ -281,6 +323,11 @@ public class AsyncFSWAL extends AbstractFSWAL { private LogRollerExitedChecker logRollerExitedChecker; + private final Disruptor disruptor; + private final ExecutorService appendExecutor; + + private final RingBufferEventHandler ringBufferEventHandler; + public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoop eventLoop) @@ -289,12 +336,28 @@ public class AsyncFSWAL extends AbstractFSWAL { this.eventLoop = eventLoop; int maxHandlersCount = conf.getInt(REGION_SERVER_HANDLER_COUNT, 200); waitingConsumePayloads = new ArrayDeque(maxHandlersCount * 3); + // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense + rollWriter(); + // spinning as other strategies do. + String hostingThreadName = Thread.currentThread().getName(); + this.appendExecutor = Executors + .newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append")); + final int preallocatedEventCount = + this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); + // TODO : Unify with FSHLog + this.disruptor = new Disruptor(Payload.EVENT_FACTORY, preallocatedEventCount, + this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy()); + this.disruptor.getRingBuffer().next(); + this.ringBufferEventHandler = new RingBufferEventHandler(); + this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler()); + this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler }); + // Starting up threads in constructor is a no no; Interface should have an init call. + this.disruptor.start(); batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); createMaxRetries = conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS, DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS); - rollWriter(); } private void tryFinishRoll() { @@ -438,6 +501,7 @@ public class AsyncFSWAL extends AbstractFSWAL { if (newHighestProcessedTxid > 0) { highestProcessedTxid = newHighestProcessedTxid; } else { + // which means the append did not happen newHighestProcessedTxid = highestProcessedTxid; } if (writer.getLength() - fileLengthAtLastSync >= batchSize) { @@ -483,8 +547,8 @@ public class AsyncFSWAL extends AbstractFSWAL { } } else { for (Payload p; (p = waitingConsumePayloads.pollFirst()) != null;) { - if (p.entry != null) { - waitingAppendEntries.addLast(p.entry); + if (p.fsWALEntry != null) { + waitingAppendEntries.addLast(p.fsWALEntry); } else if (p.sync != null) { syncFutures.add(p.sync); } else { @@ -528,38 +592,57 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { - boolean scheduleTask; - long txid; - synchronized (waitingConsumePayloads) { - if (this.closed) { - throw new IOException("Cannot append; log is closed"); - } - txid = nextTxid++; - FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); - scheduleTask = shouldScheduleConsumer(); - waitingConsumePayloads.add(new Payload(entry)); + long sequence = this.disruptor.getRingBuffer().next(); + EntryHolder entry; + try { + // Use the ring buffer sequence to publish the events. But do not use them as the + // WALEntry's txid. So we use the ring buffer to release the handlers faster + Payload truck = this.disruptor.getRingBuffer().get(sequence); + entry = new EntryHolder(hri, key, edits, inMemstore); + truck.loadEntryHolder(entry); + } finally { + this.disruptor.getRingBuffer().publish(sequence); } - if (scheduleTask) { - eventLoop.execute(consumer); + return sequence; + } + + // Holds the necessary states to form the FSWALEntry + // We do this because in AsyncFSWAL as we don't use the Ring buffer's sequence + private static class EntryHolder { + private WALKey walKey; + private WALEdit edits; + private boolean inMemstore; + private HRegionInfo hri; + + public EntryHolder(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) { + this.walKey = key; + this.edits = edits; + this.inMemstore = inMemstore; + this.hri = hri; + } + + private HRegionInfo getHRegionInfo() { + return this.hri; + } + + private WALKey getWALKey() { + return this.walKey; + } + + private WALEdit getWALEdits() { + return this.edits; + } + + private boolean isInMemstore() { + return this.inMemstore; } - return txid; } @Override public void sync() throws IOException { - TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); + TraceScope scope = Trace.startSpan("FSHLog.sync"); try { - SyncFuture future; - boolean scheduleTask; - synchronized (waitingConsumePayloads) { - scheduleTask = shouldScheduleConsumer(); - future = getSyncFuture(nextTxid - 1, scope.detach()); - waitingConsumePayloads.addLast(new Payload(future)); - } - if (scheduleTask) { - eventLoop.execute(consumer); - } - scope = Trace.continueSpan(blockOnSync(future)); + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); } finally { assert scope == NullScope.INSTANCE || !scope.isDetached(); scope.close(); @@ -567,25 +650,68 @@ public class AsyncFSWAL extends AbstractFSWAL { } @Override - public void sync(long txid) throws IOException { - if (highestSyncedTxid.get() >= txid) { - return; + public void sync(long txid) throws IOException { + TraceScope scope = Trace.startSpan("FSHLog.sync"); + try { + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); + } finally { + assert scope == NullScope.INSTANCE || !scope.isDetached(); + scope.close(); } - TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); + } + + private Span publishSyncThenBlockOnCompletion(Span span) throws IOException { + return blockOnSync(publishSyncOnRingBuffer(span)); + } + + private SyncFuture publishSyncOnRingBuffer(Span span) { + long sequence = this.disruptor.getRingBuffer().next(); + return publishSyncOnRingBuffer(sequence, span); + } + + private SyncFuture publishSyncOnRingBuffer(long sequence, Span span) { + // here we use ring buffer sequence as transaction id + SyncFuture syncFuture = getSyncFuture(sequence, span); try { - SyncFuture future = getSyncFuture(txid, scope.detach()); - boolean scheduleTask; + Payload truck = this.disruptor.getRingBuffer().get(sequence); + truck.loadSync(syncFuture); + } finally { + this.disruptor.getRingBuffer().publish(sequence); + } + return syncFuture; + } + + class RingBufferEventHandler implements EventHandler, LifecycleAware { + + @Override + public void onShutdown() { + } + + @Override + public void onStart() { + boolean scheduleTask = shouldScheduleConsumer(); + if (scheduleTask) { + eventLoop.execute(consumer); + } + } + + @Override + public void onEvent(Payload payload, final long sequence, boolean endOfBatch) throws Exception { + // hand over to the event loop + boolean scheduleTask = shouldScheduleConsumer(); synchronized (waitingConsumePayloads) { - scheduleTask = shouldScheduleConsumer(); - waitingConsumePayloads.addLast(new Payload(future)); + if (payload.entryHolder != null) { + // increment the txid for append calls + long txid = nextTxid++; + payload.createFSWALEntry(txid); + } else { + payload.updateSyncFuture(nextTxid - 1); + } + waitingConsumePayloads.addLast(payload); } if (scheduleTask) { eventLoop.execute(consumer); } - scope = Trace.continueSpan(blockOnSync(future)); - } finally { - assert scope == NullScope.INSTANCE || !scope.isDetached(); - scope.close(); } } @@ -702,6 +828,21 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override protected void doShutdown() throws IOException { waitForSafePoint(); + if (this.disruptor != null) { + long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000); + try { + this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " + + "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)"); + this.disruptor.halt(); + this.disruptor.shutdown(); + } + } + // With disruptor down, this is safe to let go. + if (this.appendExecutor != null) { + this.appendExecutor.shutdown(); + } this.writer.close(); this.writer = null; closeExecutor.shutdown(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index edf698e..7b64abb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1019,7 +1019,11 @@ public class FSHLog extends AbstractFSWAL { try { if (truck.hasSyncFuturePayload()) { - this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload(); + SyncFuture syncFuture = truck.unloadSyncFuturePayload(); + if (syncFuture != null) { + syncFuture.setTxid(sequence); + } + this.syncFutures[this.syncFuturesCount++] = syncFuture; // Force flush of syncs if we are carrying a full complement of syncFutures. if (this.syncFuturesCount == this.syncFutures.length) { endOfBatch = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 6e302a3..5f63ee6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -50,6 +50,8 @@ class SyncFuture { // events happen. private static final long NOT_DONE = -1L; + private static final long NOT_INITIALIZED = -1L; + /** * The transaction id of this operation, monotonically increases. */ @@ -73,9 +75,9 @@ class SyncFuture { */ private Span span; - SyncFuture(long txid, Span span) { + SyncFuture(Span span) { this.t = Thread.currentThread(); - this.txid = txid; + this.txid = NOT_INITIALIZED; this.span = span; this.doneTxid = NOT_DONE; } @@ -87,7 +89,7 @@ class SyncFuture { * call to {@link #get()}. * @return this */ - synchronized SyncFuture reset(final long txid, Span span) { + synchronized SyncFuture reset(Span span) { if (t != null && t != Thread.currentThread()) { throw new IllegalStateException(); } @@ -96,12 +98,25 @@ class SyncFuture { throw new IllegalStateException("" + txid + " " + Thread.currentThread()); } this.doneTxid = NOT_DONE; - this.txid = txid; + this.txid = NOT_INITIALIZED; this.span = span; this.throwable = null; return this; } + /** + * Updates the txid. + * @param txid + * @return + */ + synchronized SyncFuture setTxid(final long txid) { + if (isDone() || this.txid != NOT_INITIALIZED) { + throw new IllegalStateException("" + txid + " " + Thread.currentThread()); + } + this.txid = txid; + return this; + } + @Override public synchronized String toString() { return "done=" + isDone() + ", txid=" + this.txid; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java index 2cba040..d6f8f16 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFuture.java @@ -33,11 +33,13 @@ public class TestSyncFuture { public void testGet() throws Exception { long timeout = 5000; long txid = 100000; - SyncFuture syncFulture = new SyncFuture(txid, null); + SyncFuture syncFulture = new SyncFuture(null); + syncFulture.setTxid(txid); syncFulture.done(txid, null); assertEquals(txid, syncFulture.get(timeout)); - syncFulture.reset(txid, null); + syncFulture.reset(null); + syncFulture.setTxid(txid); try { syncFulture.get(timeout); fail("Should have timed out but not");