.../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 254 ++++++++++++++++----- .../hadoop/hbase/regionserver/wal/SyncFuture.java | 18 ++ 2 files changed, 220 insertions(+), 52 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 279a6ae..0a65e94 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -21,6 +21,13 @@ import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; import io.netty.channel.EventLoop; import io.netty.util.concurrent.Future; @@ -50,12 +57,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog.RingBufferExceptionHandler; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.ipc.RemoteException; import org.apache.htrace.NullScope; +import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -150,37 +160,79 @@ public class AsyncFSWAL extends AbstractFSWAL { */ private static final class Payload { + public Payload() { + } // a wal entry which need to be appended - public final FSWALEntry entry; + private EntryHolder entryHolder; + + private FSWALEntry fsWALEntry; // indicate that we need to sync our wal writer. - public final SyncFuture sync; + private SyncFuture sync; // incidate that we want to roll the writer. - public final Promise roll; + private Promise roll; + + private long txid = -1L; + + private void createFSWALEntry(long txid) { + assert entryHolder != null; + this.fsWALEntry = + new FSWALEntry(txid, this.entryHolder.getWALKey(), this.entryHolder.getWALEdits(), + this.entryHolder.getHRegionInfo(), this.entryHolder.isInMemstore()); + this.sync = null; + this.roll = null; + this.entryHolder = null; + } - public Payload(FSWALEntry entry) { - this.entry = entry; + private void loadEntryHolder(EntryHolder entry) { + this.entryHolder = entry; this.sync = null; this.roll = null; + this.fsWALEntry = null; } - public Payload(SyncFuture sync) { - this.entry = null; + private void loadSync(SyncFuture sync) { + this.entryHolder = null; this.sync = sync; this.roll = null; + this.fsWALEntry = null; } - public Payload(Promise roll) { - this.entry = null; + private Payload(Promise roll) { + this.entryHolder = null; this.sync = null; this.roll = roll; + this.fsWALEntry = null; + } + //TODO : Need to handle log roll + public void loadRoll(Promise roll) { + this.entryHolder = null; + this.sync = null; + this.roll = roll; + this.fsWALEntry = null; + } + + public void setTxid(long txid) { + if(this.sync != null) { + this.sync.setTxid(txid); + } + this.txid = txid; } - @Override public String toString() { - return "Payload [entry=" + entry + ", sync=" + sync + ", roll=" + roll + "]"; + return "Payload [entry=" + entryHolder + ", sync=" + sync + ", roll=" + roll + ", txid= "+txid+"]"; } + + /** + * Factory for making a bunch of these. Needed by the ringbuffer/disruptor. + */ + final static EventFactory EVENT_FACTORY = new EventFactory() { + public Payload newInstance() { + // Getting created every time?? + return new Payload(); + } + }; } private final EventLoop eventLoop; @@ -259,9 +311,9 @@ public class AsyncFSWAL extends AbstractFSWAL { } synchronized (waitingConsumePayloads) { for (Payload p : waitingConsumePayloads) { - if (p.entry != null) { + if (p.fsWALEntry != null) { try { - p.entry.stampRegionSequenceId(); + p.fsWALEntry.stampRegionSequenceId(); } catch (IOException e) { throw new AssertionError("should not happen", e); } @@ -281,6 +333,11 @@ public class AsyncFSWAL extends AbstractFSWAL { private LogRollerExitedChecker logRollerExitedChecker; + private final Disruptor disruptor; + private final ExecutorService appendExecutor; + + private final RingBufferEventHandler ringBufferEventHandler; + public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoop eventLoop) @@ -289,12 +346,27 @@ public class AsyncFSWAL extends AbstractFSWAL { this.eventLoop = eventLoop; int maxHandlersCount = conf.getInt(REGION_SERVER_HANDLER_COUNT, 200); waitingConsumePayloads = new ArrayDeque(maxHandlersCount * 3); + // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense + rollWriter(); + // spinning as other strategies do. + String hostingThreadName = Thread.currentThread().getName(); + this.appendExecutor = Executors + .newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append")); + final int preallocatedEventCount = + this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); + this.disruptor = new Disruptor(Payload.EVENT_FACTORY, preallocatedEventCount, + this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy()); + this.disruptor.getRingBuffer().next(); + this.ringBufferEventHandler = new RingBufferEventHandler(); + this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler()); + this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler }); + // Starting up threads in constructor is a no no; Interface should have an init call. + this.disruptor.start(); batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); createMaxRetries = conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS, DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS); - rollWriter(); } private void tryFinishRoll() { @@ -397,6 +469,7 @@ public class AsyncFSWAL extends AbstractFSWAL { private int finishSync(boolean addSyncTrace) { long doneTxid = highestSyncedTxid.get(); int finished = 0; + // remove the first one for (SyncFuture future; (future = syncFutures.peek()) != null;) { if (future.getTxid() <= doneTxid) { future.done(doneTxid, null); @@ -438,6 +511,7 @@ public class AsyncFSWAL extends AbstractFSWAL { if (newHighestProcessedTxid > 0) { highestProcessedTxid = newHighestProcessedTxid; } else { + // which means the append did not happen newHighestProcessedTxid = highestProcessedTxid; } if (writer.getLength() - fileLengthAtLastSync >= batchSize) { @@ -483,8 +557,8 @@ public class AsyncFSWAL extends AbstractFSWAL { } } else { for (Payload p; (p = waitingConsumePayloads.pollFirst()) != null;) { - if (p.entry != null) { - waitingAppendEntries.addLast(p.entry); + if (p.fsWALEntry != null) { + waitingAppendEntries.addLast(p.fsWALEntry); } else if (p.sync != null) { syncFutures.add(p.sync); } else { @@ -528,38 +602,53 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { - boolean scheduleTask; - long txid; - synchronized (waitingConsumePayloads) { - if (this.closed) { - throw new IOException("Cannot append; log is closed"); - } - txid = nextTxid++; - FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); - scheduleTask = shouldScheduleConsumer(); - waitingConsumePayloads.add(new Payload(entry)); - } - if (scheduleTask) { - eventLoop.execute(consumer); + long sequence = this.disruptor.getRingBuffer().next(); + EntryHolder entry; + try { + Payload truck = this.disruptor.getRingBuffer().get(sequence); + entry = new EntryHolder(hri, key, edits, inMemstore); + truck.loadEntryHolder(entry); + } finally { + this.disruptor.getRingBuffer().publish(sequence); } - return txid; + return sequence; } + public static class EntryHolder { + private WALKey walKey; + private WALEdit edits; + private boolean inMemstore; + private HRegionInfo hri; + + public EntryHolder(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) { + this.walKey = key; + this.edits = edits; + this.inMemstore = inMemstore; + this.hri = hri; + } + + public HRegionInfo getHRegionInfo() { + return this.hri; + } + + public WALKey getWALKey() { + return this.walKey; + } + + public WALEdit getWALEdits() { + return this.edits; + } + + public boolean isInMemstore() { + return this.inMemstore; + } + } + @Override public void sync() throws IOException { - TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); + TraceScope scope = Trace.startSpan("FSHLog.sync"); try { - SyncFuture future; - boolean scheduleTask; - synchronized (waitingConsumePayloads) { - scheduleTask = shouldScheduleConsumer(); - future = getSyncFuture(nextTxid - 1, scope.detach()); - waitingConsumePayloads.addLast(new Payload(future)); - } - if (scheduleTask) { - eventLoop.execute(consumer); - } - scope = Trace.continueSpan(blockOnSync(future)); + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); } finally { assert scope == NullScope.INSTANCE || !scope.isDetached(); scope.close(); @@ -571,24 +660,70 @@ public class AsyncFSWAL extends AbstractFSWAL { if (highestSyncedTxid.get() >= txid) { return; } - TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); + TraceScope scope = Trace.startSpan("FSHLog.sync"); try { - SyncFuture future = getSyncFuture(txid, scope.detach()); - boolean scheduleTask; + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); + } finally { + assert scope == NullScope.INSTANCE || !scope.isDetached(); + scope.close(); + } + } + + private Span publishSyncThenBlockOnCompletion(Span span) throws IOException { + return blockOnSync(publishSyncOnRingBuffer(span)); + } + + private SyncFuture publishSyncOnRingBuffer(Span span) { + long sequence = this.disruptor.getRingBuffer().next(); + return publishSyncOnRingBuffer(sequence, span); + } + + private SyncFuture publishSyncOnRingBuffer(long sequence, Span span) { + // here we use ring buffer sequence as transaction id + SyncFuture syncFuture = getSyncFuture(sequence, span); + try { + Payload truck = this.disruptor.getRingBuffer().get(sequence); + truck.loadSync(syncFuture); + } finally { + this.disruptor.getRingBuffer().publish(sequence); + } + return syncFuture; + } + + class RingBufferEventHandler implements EventHandler, LifecycleAware { + + @Override + public void onShutdown() { + } + + @Override + public void onStart() { + boolean scheduleTask = shouldScheduleConsumer(); + if (scheduleTask) { + eventLoop.execute(consumer); + } + } + + @Override + public void onEvent(Payload payload, final long sequence, boolean endOfBatch) throws Exception { + // hand over to the event loop + boolean scheduleTask = shouldScheduleConsumer(); synchronized (waitingConsumePayloads) { - scheduleTask = shouldScheduleConsumer(); - waitingConsumePayloads.addLast(new Payload(future)); + // increment the txid + long txid = nextTxid++; + if (payload.entryHolder != null) { + payload.createFSWALEntry(txid); + } else { + payload.setTxid(txid - 1); + } + waitingConsumePayloads.addLast(payload); } if (scheduleTask) { eventLoop.execute(consumer); } - scope = Trace.continueSpan(blockOnSync(future)); - } finally { - assert scope == NullScope.INSTANCE || !scope.isDetached(); - scope.close(); } - } + } @Override public void logRollerExited() { logRollerExited = true; @@ -702,6 +837,21 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override protected void doShutdown() throws IOException { waitForSafePoint(); + if (this.disruptor != null) { + long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000); + try { + this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " + + "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)"); + this.disruptor.halt(); + this.disruptor.shutdown(); + } + } + // With disruptor down, this is safe to let go. + if (this.appendExecutor != null) { + this.appendExecutor.shutdown(); + } this.writer.close(); this.writer = null; closeExecutor.shutdown(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 5ec218a..50604a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -73,6 +73,7 @@ class SyncFuture { SyncFuture(long txid, Span span) { this.t = Thread.currentThread(); + // TODO : This gets reset. So looks ugly this.txid = txid; this.span = span; this.doneTxid = NOT_DONE; @@ -98,6 +99,23 @@ class SyncFuture { this.span = span; return this; } + /** + * Updates the txid. + * @param txid + * @return + */ + // TODO : Better not to pass txid in constructor but looks ugly? + synchronized SyncFuture setTxid(final long txid) { + if (t != null && t != Thread.currentThread()) { + throw new IllegalStateException(); + } + t = Thread.currentThread(); + if (isDone()) { + throw new IllegalStateException("" + txid + " " + Thread.currentThread()); + } + this.txid = txid; + return this; + } @Override public synchronized String toString() {