.../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 268 +++++++++++++++------ 1 file changed, 188 insertions(+), 80 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index d5bccf0..95b96cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -21,6 +21,13 @@ import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; import io.netty.channel.EventLoop; import io.netty.util.concurrent.Future; @@ -46,17 +53,22 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog.RingBufferEventHandler; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog.RingBufferExceptionHandler; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.ipc.RemoteException; import org.apache.htrace.NullScope; +import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -151,22 +163,26 @@ public class AsyncFSWAL extends AbstractFSWAL { */ private static final class Payload { + public Payload() { + + } + // a wal entry which need to be appended - public final FSWALEntry entry; + public FSWALEntry entry; // indicate that we need to sync our wal writer. - public final SyncFuture sync; + public SyncFuture sync; // incidate that we want to roll the writer. - public final Promise roll; + public Promise roll; - public Payload(FSWALEntry entry) { + public void loadWALEntry(FSWALEntry entry) { this.entry = entry; this.sync = null; this.roll = null; } - public Payload(SyncFuture sync) { + public void loadSync(SyncFuture sync) { this.entry = null; this.sync = sync; this.roll = null; @@ -177,11 +193,27 @@ public class AsyncFSWAL extends AbstractFSWAL { this.sync = null; this.roll = roll; } - + //TODO : Need to handle log roll + public void loadRoll(Promise roll) { + this.entry = null; + this.sync = null; + this.roll = roll; + } @Override public String toString() { return "Payload [entry=" + entry + ", sync=" + sync + ", roll=" + roll + "]"; } + + /** + * Factory for making a bunch of these. Needed by the ringbuffer/disruptor. + */ + final static EventFactory EVENT_FACTORY = new EventFactory() { + public Payload newInstance() { + // Getting created every time?? + System.out.println("Getting created every time"); + return new Payload(); + } + }; } private final EventLoop eventLoop; @@ -283,6 +315,11 @@ public class AsyncFSWAL extends AbstractFSWAL { private LogRollerExitedChecker logRollerExitedChecker; + private final Disruptor disruptor; + private final ExecutorService appendExecutor; + + private final RingBufferEventHandler ringBufferEventHandler; + public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoop eventLoop) throws FailedLogCloseException, @@ -291,13 +328,28 @@ public class AsyncFSWAL extends AbstractFSWAL { this.eventLoop = eventLoop; int maxHandlersCount = conf.getInt(REGION_SERVER_HANDLER_COUNT, 200); waitingConsumePayloads = new ArrayDeque(maxHandlersCount * 3); + // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense + rollWriter(); + // spinning as other strategies do. + String hostingThreadName = Thread.currentThread().getName(); + this.appendExecutor = Executors + .newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append")); + final int preallocatedEventCount = + this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); + this.disruptor = new Disruptor(Payload.EVENT_FACTORY, preallocatedEventCount, + this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy()); + this.disruptor.getRingBuffer().next(); + this.ringBufferEventHandler = new RingBufferEventHandler(); + this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler()); + this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler }); + // Starting up threads in constructor is a no no; Interface should have an init call. + this.disruptor.start(); batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); createMaxRetries = conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS, DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS); - rollWriter(); } private void tryFinishRoll() { @@ -404,6 +456,22 @@ public class AsyncFSWAL extends AbstractFSWAL { finished++; addTimeAnnotation(future, "writer synced"); } else { + // Big hack here. - Just temporary + // Trying to mark the highestSyncedTxid if the syncFuture's txid + // is one greater than the last appened txid. + // Since now we use a ringbuffer every sync call will have an unique + // txid. So setting the highestSyncedTxid based on that is not easy here + // We need to do the FSHLog way to do it, in a such way that the actual sync + // should happen based on the syncFuture call and not along with the append + // as it happens now + if (future.getTxid() - 1 == doneTxid) { + highestSyncedTxid.set(future.getTxid()); + future.done(future.getTxid(), null); + finished++; + syncFutures.remove(); + // breaking after this + break; + } break; } } @@ -416,45 +484,46 @@ public class AsyncFSWAL extends AbstractFSWAL { // finish some. finishSync(false); long newHighestProcessedTxid = -1L; - for (Iterator iter = waitingAppendEntries.iterator(); iter.hasNext();) { - FSWALEntry entry = iter.next(); - boolean appended; - try { - appended = append(writer, entry); - } catch (IOException e) { - throw new AssertionError("should not happen", e); - } - newHighestProcessedTxid = entry.getTxid(); - iter.remove(); - if (appended) { - unackedEntries.addLast(entry); - if (writer.getLength() - fileLengthAtLastSync >= batchSize) { - break; + for (Iterator iter = waitingAppendEntries.iterator(); iter.hasNext();) { + FSWALEntry entry = iter.next(); + boolean appended; + try { + appended = append(writer, entry); + } catch (IOException e) { + throw new AssertionError("should not happen", e); + } + newHighestProcessedTxid = entry.getTxid(); + iter.remove(); + if (appended) { + unackedEntries.addLast(entry); + if (writer.getLength() - fileLengthAtLastSync >= batchSize) { + break; + } } } - } - // if we have a newer transaction id, update it. - // otherwise, use the previous transaction id. - if (newHighestProcessedTxid > 0) { - highestProcessedTxid = newHighestProcessedTxid; - } else { - newHighestProcessedTxid = highestProcessedTxid; - } - if (writer.getLength() - fileLengthAtLastSync >= batchSize) { - // sync because buffer size limit. - sync(writer, newHighestProcessedTxid); - } else if ((!syncFutures.isEmpty() || rollPromise != null) - && writer.getLength() > fileLengthAtLastSync) { - // first we should have at least one sync request or a roll request - // second we should have some unsynced data. - sync(writer, newHighestProcessedTxid); - } else if (writer.getLength() == fileLengthAtLastSync) { - // we haven't written anything out, just advance the highestSyncedSequence since we may only - // stamped some region sequence id. - highestSyncedTxid.set(newHighestProcessedTxid); - finishSync(false); - tryFinishRoll(); - } + // if we have a newer transaction id, update it. + // otherwise, use the previous transaction id. + if (newHighestProcessedTxid > 0) { + highestProcessedTxid = newHighestProcessedTxid; + } else { + newHighestProcessedTxid = highestProcessedTxid; + } + if (writer.getLength() - fileLengthAtLastSync >= batchSize) { + // sync because buffer size limit. + sync(writer, newHighestProcessedTxid); + } else if ((!syncFutures.isEmpty() || rollPromise != null) + && writer.getLength() > fileLengthAtLastSync) { + // first we should have at least one sync request or a roll request + // second we should have some unsynced data. + sync(writer, newHighestProcessedTxid); + } else if (writer.getLength() == fileLengthAtLastSync) { + // we haven't written anything out, just advance the highestSyncedSequence since we may only + // stamped some region sequence id. + highestSyncedTxid.set(newHighestProcessedTxid); + finishSync(false); + tryFinishRoll(); + } + } private static final Comparator SEQ_COMPARATOR = new Comparator() { @@ -532,38 +601,23 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { - boolean scheduleTask; - long txid; - synchronized (waitingConsumePayloads) { - if (this.closed) { - throw new IOException("Cannot append; log is closed"); - } - txid = nextTxid++; - FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); - scheduleTask = shouldScheduleConsumer(); - waitingConsumePayloads.add(new Payload(entry)); - } - if (scheduleTask) { - eventLoop.execute(consumer); + long sequence = this.disruptor.getRingBuffer().next(); + FSWALEntry entry; + try { + Payload truck = this.disruptor.getRingBuffer().get(sequence); + entry = new FSWALEntry(sequence, key, edits, hri, inMemstore); + truck.loadWALEntry(entry); + } finally { + this.disruptor.getRingBuffer().publish(sequence); } - return txid; + return sequence; } @Override public void sync() throws IOException { - TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); + TraceScope scope = Trace.startSpan("FSHLog.sync"); try { - SyncFuture future; - boolean scheduleTask; - synchronized (waitingConsumePayloads) { - scheduleTask = shouldScheduleConsumer(); - future = getSyncFuture(nextTxid - 1, scope.detach()); - waitingConsumePayloads.addLast(new Payload(future)); - } - if (scheduleTask) { - eventLoop.execute(consumer); - } - scope = Trace.continueSpan(blockOnSync(future)); + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); } finally { assert scope == NullScope.INSTANCE || !scope.isDetached(); scope.close(); @@ -575,24 +629,63 @@ public class AsyncFSWAL extends AbstractFSWAL { if (highestSyncedTxid.get() >= txid) { return; } - TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); + TraceScope scope = Trace.startSpan("FSHLog.sync"); + try { + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); + } finally { + assert scope == NullScope.INSTANCE || !scope.isDetached(); + scope.close(); + } + } + + private Span publishSyncThenBlockOnCompletion(Span span) throws IOException { + return blockOnSync(publishSyncOnRingBuffer(span)); + } + + private SyncFuture publishSyncOnRingBuffer(Span span) { + long sequence = this.disruptor.getRingBuffer().next(); + return publishSyncOnRingBuffer(sequence, span); + } + + private SyncFuture publishSyncOnRingBuffer(long sequence, Span span) { + // here we use ring buffer sequence as transaction id + SyncFuture syncFuture = getSyncFuture(sequence, span); try { - SyncFuture future = getSyncFuture(txid, scope.detach()); - boolean scheduleTask; + Payload truck = this.disruptor.getRingBuffer().get(sequence); + truck.loadSync(syncFuture); + } finally { + this.disruptor.getRingBuffer().publish(sequence); + } + return syncFuture; + } + + class RingBufferEventHandler implements EventHandler, LifecycleAware { + + @Override + public void onShutdown() { + } + + @Override + public void onStart() { + boolean scheduleTask = shouldScheduleConsumer(); + if (scheduleTask) { + eventLoop.execute(consumer); + } + } + + @Override + public void onEvent(Payload payload, final long sequence, boolean endOfBatch) throws Exception { + // hand over to the event loop + boolean scheduleTask = shouldScheduleConsumer(); synchronized (waitingConsumePayloads) { - scheduleTask = shouldScheduleConsumer(); - waitingConsumePayloads.addLast(new Payload(future)); + waitingConsumePayloads.addLast(payload); } if (scheduleTask) { eventLoop.execute(consumer); } - scope = Trace.continueSpan(blockOnSync(future)); - } finally { - assert scope == NullScope.INSTANCE || !scope.isDetached(); - scope.close(); } - } + } @Override public void logRollerExited() { logRollerExited = true; @@ -710,6 +803,21 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override protected void doShutdown() throws IOException { waitForSafePoint(); + if (this.disruptor != null) { + long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000); + try { + this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " + + "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)"); + this.disruptor.halt(); + this.disruptor.shutdown(); + } + } + // With disruptor down, this is safe to let go. + if (this.appendExecutor != null) { + this.appendExecutor.shutdown(); + } this.writer.close(); this.writer = null; closeExecutor.shutdown();