From d74ba974708f99a0be9984f6e6f2e1d3489c1f49 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 1 Nov 2016 17:01:30 +0800 Subject: [PATCH] HBASE-16890 try removing contention in append and sync with RingBuffer --- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 422 ++++++++++++--------- .../wal/AsyncFSWALRingBufferTruck.java | 93 +++++ .../hadoop/hbase/regionserver/wal/FSHLog.java | 12 +- .../regionserver/wal/FSHLogRingBufferTruck.java | 105 +++++ .../hbase/regionserver/wal/RingBufferTruck.java | 116 ------ .../main/java/org/apache/hadoop/hbase/wal/WAL.java | 4 +- .../regionserver/wal/AbstractTestWALReplay.java | 4 +- .../hbase/regionserver/wal/TestAsyncFSWAL.java | 3 +- .../hbase/regionserver/wal/TestAsyncWALReplay.java | 3 +- 9 files changed, 455 insertions(+), 307 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWALRingBufferTruck.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLogRingBufferTruck.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 279a6ae..56b2acb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -17,13 +17,14 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.Sequencer; import io.netty.channel.EventLoop; -import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; @@ -38,6 +39,9 @@ import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -138,66 +142,24 @@ public class AsyncFSWAL extends AbstractFSWAL { public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries"; public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10; - public static final String ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = - "hbase.wal.async.logroller.exited.check.interval.ms"; + public static final String ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = "hbase.wal.async.logroller.exited.check.interval.ms"; public static final long DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = 1000; - /** - * Carry things that we want to pass to the consume task in event loop. Only one field can be - * non-null. - *

- * TODO: need to unify this and {@link RingBufferTruck}. There are mostly the same thing. - */ - private static final class Payload { - - // a wal entry which need to be appended - public final FSWALEntry entry; - - // indicate that we need to sync our wal writer. - public final SyncFuture sync; - - // incidate that we want to roll the writer. - public final Promise roll; - - public Payload(FSWALEntry entry) { - this.entry = entry; - this.sync = null; - this.roll = null; - } - - public Payload(SyncFuture sync) { - this.entry = null; - this.sync = sync; - this.roll = null; - } - - public Payload(Promise roll) { - this.entry = null; - this.sync = null; - this.roll = roll; - } - - @Override - public String toString() { - return "Payload [entry=" + entry + ", sync=" + sync + ", roll=" + roll + "]"; - } - } - private final EventLoop eventLoop; - private final Deque waitingConsumePayloads; + private final Lock consumeLock = new ReentrantLock(); - // like the ringbuffer sequence. Every FSWALEntry and SyncFuture will be assigned a txid and - // then added to waitingConsumePayloads. - private long nextTxid = 1L; + private final RingBuffer waitingConsumePayloads; - private boolean consumerScheduled; + private final Sequence waitingConsumePayloadsGatingSequence; + + private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); // new writer is created and we are waiting for old writer to be closed. - private boolean waitingRoll; + private volatile boolean waitingRoll; // writer is broken and rollWriter is needed. - private boolean writerBroken; + private volatile boolean writerBroken; private final long batchSize; @@ -214,13 +176,13 @@ public class AsyncFSWAL extends AbstractFSWAL { private final Deque unackedEntries = new ArrayDeque(); - private final PriorityQueue syncFutures = - new PriorityQueue(11, SEQ_COMPARATOR); + private final PriorityQueue syncFutures = new PriorityQueue(11, + SEQ_COMPARATOR); private Promise rollPromise; // the highest txid of WAL entries being processed - private long highestProcessedTxid; + private long highestProcessedAppendTxid; // file length when we issue last sync request on the writer private long fileLengthAtLastSync; @@ -237,6 +199,52 @@ public class AsyncFSWAL extends AbstractFSWAL { this.future = future; } + private void cleanup() { + unackedEntries.clear(); + waitingAppendEntries.forEach(entry -> { + try { + entry.stampRegionSequenceId(); + } catch (IOException e) { + throw new AssertionError("should not happen", e); + } + }); + waitingAppendEntries.clear(); + IOException error = new IOException("sync failed but log roller exited"); + for (SyncFuture sync; (sync = syncFutures.peek()) != null;) { + sync.done(sync.getTxid(), error); + syncFutures.remove(); + } + consumeLock.lock(); + try { + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; + for (long cursorBound = waitingConsumePayloads + .getCursor(); nextCursor <= cursorBound; nextCursor++) { + if (!waitingConsumePayloads.isPublished(nextCursor)) { + break; + } + AsyncFSWALRingBufferTruck truck = waitingConsumePayloads.get(nextCursor); + switch (truck.type()) { + case APPEND: + try { + truck.unloadAppend().stampRegionSequenceId(); + } catch (IOException e) { + throw new AssertionError("should not happen", e); + } + break; + case SYNC: + SyncFuture sync = truck.unloadSync(); + sync.done(sync.getTxid(), error); + break; + default: + break; + } + waitingConsumePayloadsGatingSequence.set(nextCursor); + } + } finally { + consumeLock.unlock(); + } + } + @Override public void run() { if (!logRollerExited) { @@ -250,27 +258,7 @@ public class AsyncFSWAL extends AbstractFSWAL { return; } } - unackedEntries.clear(); - waitingAppendEntries.clear(); - IOException error = new IOException("sync failed but log roller exited"); - for (SyncFuture future; (future = syncFutures.peek()) != null;) { - future.done(highestProcessedTxid, error); - syncFutures.remove(); - } - synchronized (waitingConsumePayloads) { - for (Payload p : waitingConsumePayloads) { - if (p.entry != null) { - try { - p.entry.stampRegionSequenceId(); - } catch (IOException e) { - throw new AssertionError("should not happen", e); - } - } else if (p.sync != null) { - p.sync.done(nextTxid, error); - } - } - waitingConsumePayloads.clear(); - } + cleanup(); } public synchronized void cancel() { @@ -287,11 +275,20 @@ public class AsyncFSWAL extends AbstractFSWAL { throws FailedLogCloseException, IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); this.eventLoop = eventLoop; - int maxHandlersCount = conf.getInt(REGION_SERVER_HANDLER_COUNT, 200); - waitingConsumePayloads = new ArrayDeque(maxHandlersCount * 3); + int preallocatedEventCount = this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", + 1024 * 16); + waitingConsumePayloads = RingBuffer.createMultiProducer(AsyncFSWALRingBufferTruck::new, + preallocatedEventCount); + waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); + + // inrease the ringbuffer sequence so our txid is start from 1 + waitingConsumePayloads.publish(waitingConsumePayloads.next()); + waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); + batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); - createMaxRetries = - conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); + createMaxRetries = conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, + DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS, DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS); rollWriter(); @@ -394,15 +391,16 @@ public class AsyncFSWAL extends AbstractFSWAL { future.setSpan(scope.detach()); } - private int finishSync(boolean addSyncTrace) { - long doneTxid = highestSyncedTxid.get(); + private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) { int finished = 0; - for (SyncFuture future; (future = syncFutures.peek()) != null;) { - if (future.getTxid() <= doneTxid) { - future.done(doneTxid, null); + for (SyncFuture sync; (sync = syncFutures.peek()) != null;) { + if (sync.getTxid() <= txid) { + sync.done(txid, null); syncFutures.remove(); finished++; - addTimeAnnotation(future, "writer synced"); + if (addSyncTrace) { + addTimeAnnotation(sync, "writer synced"); + } } else { break; } @@ -410,7 +408,38 @@ public class AsyncFSWAL extends AbstractFSWAL { return finished; } - private void consume() { + private int finishSync(boolean addSyncTrace) { + long doneTxid = highestSyncedTxid.get(); + if (doneTxid >= highestProcessedAppendTxid) { + if (waitingAppendEntries.isEmpty()) { + // all outstanding appends have been acked, just finish all syncs. + long maxSyncTxid = doneTxid; + for (SyncFuture sync : syncFutures) { + maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); + sync.done(maxSyncTxid, null); + if (addSyncTrace) { + addTimeAnnotation(sync, "writer synced"); + } + } + highestSyncedTxid.set(maxSyncTxid); + int finished = syncFutures.size(); + syncFutures.clear(); + return finished; + } else { + // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so + // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between + // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. + long lowestUnprocessedAppendTxid = waitingAppendEntries.peek().getTxid(); + assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; + highestSyncedTxid.set(lowestUnprocessedAppendTxid - 1); + return finishSyncLowerThanTxid(lowestUnprocessedAppendTxid - 1, addSyncTrace); + } + } else { + return finishSyncLowerThanTxid(doneTxid, addSyncTrace); + } + } + + private void appendAndSync() { final AsyncWriter writer = this.writer; // maybe a sync request is not queued when we issue a sync, so check here to see if we could // finish some. @@ -436,9 +465,9 @@ public class AsyncFSWAL extends AbstractFSWAL { // if we have a newer transaction id, update it. // otherwise, use the previous transaction id. if (newHighestProcessedTxid > 0) { - highestProcessedTxid = newHighestProcessedTxid; + highestProcessedAppendTxid = newHighestProcessedTxid; } else { - newHighestProcessedTxid = highestProcessedTxid; + newHighestProcessedTxid = highestProcessedAppendTxid; } if (writer.getLength() - fileLengthAtLastSync >= batchSize) { // sync because buffer size limit. @@ -457,89 +486,114 @@ public class AsyncFSWAL extends AbstractFSWAL { } } - private static final Comparator SEQ_COMPARATOR = (o1, o2) -> { - int c = Long.compare(o1.getTxid(), o2.getTxid()); - return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); - }; - - private final Runnable consumer = new Runnable() { - - @Override - public void run() { - synchronized (waitingConsumePayloads) { - assert consumerScheduled; - if (writerBroken) { - // waiting for reschedule after rollWriter. - consumerScheduled = false; + private void consume() { + consumeLock.lock(); + try { + if (writerBroken) { + consumerScheduled.set(false); + return; + } + if (waitingRoll) { + // we may have toWriteEntries if the consume method does not write all pending entries + // out, this is usually happen if we have too many toWriteEntries that exceeded the + // batchSize limit. + if (waitingAppendEntries.isEmpty()) { + consumerScheduled.set(false); return; } - if (waitingRoll) { - // we may have toWriteEntries if the consume method does not write all pending entries - // out, this is usually happen if we have too many toWriteEntries that exceeded the - // batchSize limit. - if (waitingAppendEntries.isEmpty()) { - consumerScheduled = false; - return; + } else { + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; + LOOP: for (long cursorBound = waitingConsumePayloads + .getCursor(); nextCursor <= cursorBound; nextCursor++) { + if (!waitingConsumePayloads.isPublished(nextCursor)) { + break; } - } else { - for (Payload p; (p = waitingConsumePayloads.pollFirst()) != null;) { - if (p.entry != null) { - waitingAppendEntries.addLast(p.entry); - } else if (p.sync != null) { - syncFutures.add(p.sync); - } else { - rollPromise = p.roll; + AsyncFSWALRingBufferTruck truck = waitingConsumePayloads.get(nextCursor); + switch (truck.type()) { + case APPEND: + waitingAppendEntries.addLast(truck.unloadAppend()); + break; + case SYNC: + syncFutures.add(truck.unloadSync()); + break; + case ROLL: + rollPromise = truck.unloadRoll(); waitingRoll = true; + waitingConsumePayloadsGatingSequence.set(nextCursor); + break LOOP; + default: break; - } } + waitingConsumePayloadsGatingSequence.set(nextCursor); } } - consume(); - synchronized (waitingConsumePayloads) { - if (waitingRoll) { - if (waitingAppendEntries.isEmpty()) { - consumerScheduled = false; - return; - } - } else { - if (waitingConsumePayloads.isEmpty() && waitingAppendEntries.isEmpty()) { - consumerScheduled = false; - return; + } finally { + consumeLock.unlock(); + } + appendAndSync(); + consumeLock.lock(); + try { + if (waitingRoll) { + if (waitingAppendEntries.isEmpty()) { + consumerScheduled.set(false); + return; + } + } else { + if (waitingAppendEntries.isEmpty()) { + if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { + consumerScheduled.set(false); + // recheck here since in append and sync we do not hold the consumeLock. Thing may + // happen like + // 1. we check cursor, no new entry + // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and + // give up scheduling the consumer task. + // 3. we set consumerScheduled to false and also give up scheduling consumer task. + if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { + return; + } else { + // maybe someone has grabbed this before us + if (!consumerScheduled.compareAndSet(false, true)) { + return; + } + } } } } - // reschedule if we still have something to write. - eventLoop.execute(this); + } finally { + consumeLock.unlock(); } + // reschedule if we still have something to write. + eventLoop.execute(consumer); + } + + private static final Comparator SEQ_COMPARATOR = (o1, o2) -> { + int c = Long.compare(o1.getTxid(), o2.getTxid()); + return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); }; + private final Runnable consumer = this::consume; + private boolean shouldScheduleConsumer() { if (writerBroken || waitingRoll) { return false; } - if (consumerScheduled) { - return false; - } - consumerScheduled = true; - return true; + return consumerScheduled.compareAndSet(false, true); } @Override public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { - boolean scheduleTask; - long txid; - synchronized (waitingConsumePayloads) { - if (this.closed) { - throw new IOException("Cannot append; log is closed"); - } - txid = nextTxid++; - FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); - scheduleTask = shouldScheduleConsumer(); - waitingConsumePayloads.add(new Payload(entry)); + if (closed) { + throw new IOException("Cannot append; log is closed"); } - if (scheduleTask) { + long txid = waitingConsumePayloads.next(); + try { + AsyncFSWALRingBufferTruck truck = waitingConsumePayloads.get(txid); + truck.load(new FSWALEntry(txid, key, edits, hri, inMemstore)); + } finally { + waitingConsumePayloads.publish(txid); + } + if (shouldScheduleConsumer()) { eventLoop.execute(consumer); } return txid; @@ -549,14 +603,16 @@ public class AsyncFSWAL extends AbstractFSWAL { public void sync() throws IOException { TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); try { + long txid = waitingConsumePayloads.next(); SyncFuture future; - boolean scheduleTask; - synchronized (waitingConsumePayloads) { - scheduleTask = shouldScheduleConsumer(); - future = getSyncFuture(nextTxid - 1, scope.detach()); - waitingConsumePayloads.addLast(new Payload(future)); + try { + future = getSyncFuture(txid, scope.detach()); + AsyncFSWALRingBufferTruck truck = waitingConsumePayloads.get(txid); + truck.load(future); + } finally { + waitingConsumePayloads.publish(txid); } - if (scheduleTask) { + if (shouldScheduleConsumer()) { eventLoop.execute(consumer); } scope = Trace.continueSpan(blockOnSync(future)); @@ -573,13 +629,17 @@ public class AsyncFSWAL extends AbstractFSWAL { } TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); try { - SyncFuture future = getSyncFuture(txid, scope.detach()); - boolean scheduleTask; - synchronized (waitingConsumePayloads) { - scheduleTask = shouldScheduleConsumer(); - waitingConsumePayloads.addLast(new Payload(future)); + // here we do not use ring buffer sequence as txid + long sequence = waitingConsumePayloads.next(); + SyncFuture future; + try { + future = getSyncFuture(txid, scope.detach()); + AsyncFSWALRingBufferTruck truck = waitingConsumePayloads.get(sequence); + truck.load(future); + } finally { + waitingConsumePayloads.publish(sequence); } - if (scheduleTask) { + if (shouldScheduleConsumer()) { eventLoop.execute(consumer); } scope = Trace.continueSpan(blockOnSync(future)); @@ -630,22 +690,24 @@ public class AsyncFSWAL extends AbstractFSWAL { } private void waitForSafePoint() { - Future roll; + Promise roll = eventLoop.newPromise(); + long sequence = waitingConsumePayloads.next(); boolean scheduleTask; - synchronized (waitingConsumePayloads) { - if (!writerBroken && this.writer != null) { - Promise promise = eventLoop.newPromise(); - if (consumerScheduled) { - scheduleTask = false; - } else { - scheduleTask = consumerScheduled = true; - } - waitingConsumePayloads.addLast(new Payload(promise)); - roll = promise; - } else { - roll = eventLoop.newSucceededFuture(null); - scheduleTask = false; + consumeLock.lock(); + try { + if (writerBroken || this.writer == null) { + return; } + AsyncFSWALRingBufferTruck truck = waitingConsumePayloads.get(sequence); + truck.load(roll); + scheduleTask = consumerScheduled.compareAndSet(false, true); + } finally { + // always publish the sequence even if we do not load anything. The consumer will just ignore + // it. We can not move the waitingConsumePayloads.next() into the locking section because it + // may block and then prevent consume method to consume the waitingConsumePayloads. This is a + // dead lock. + waitingConsumePayloads.publish(sequence); + consumeLock.unlock(); } if (scheduleTask) { eventLoop.execute(consumer); @@ -664,21 +726,25 @@ public class AsyncFSWAL extends AbstractFSWAL { } this.fileLengthAtLastSync = 0L; boolean scheduleTask; - synchronized (waitingConsumePayloads) { + consumeLock.lock(); + try { writerBroken = waitingRoll = false; if (logRollerExitedChecker != null) { logRollerExitedChecker.cancel(); logRollerExitedChecker = null; } - if (consumerScheduled) { + if (consumerScheduled.get()) { scheduleTask = false; } else { - if (waitingConsumePayloads.isEmpty() && waitingAppendEntries.isEmpty()) { + if (waitingConsumePayloads.getCursor() <= waitingConsumePayloadsGatingSequence.get() + && waitingAppendEntries.isEmpty()) { scheduleTask = false; } else { - scheduleTask = consumerScheduled = true; + scheduleTask = consumerScheduled.compareAndSet(false, true); } } + } finally { + consumeLock.unlock(); } if (scheduleTask) { eventLoop.execute(consumer); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWALRingBufferTruck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWALRingBufferTruck.java new file mode 100644 index 0000000..3d433d4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWALRingBufferTruck.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import io.netty.util.concurrent.Promise; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Carry things that we want to pass to the consume task in event loop. Only one field can be + * non-null. + *

+ * TODO: need to unify this and {@link FSHLogRingBufferTruck}. There are mostly the same thing. + */ +@InterfaceAudience.Private +final class AsyncFSWALRingBufferTruck { + + public enum Type { + APPEND, SYNC, ROLL, EMPTY + } + + private Type type = Type.EMPTY; + + // a wal entry which need to be appended + private FSWALEntry entry; + + // indicate that we need to sync our wal writer. + private SyncFuture sync; + + // incidate that we want to roll the writer. + private Promise roll; + + void load(FSWALEntry entry) { + this.entry = entry; + this.type = Type.APPEND; + } + + void load(SyncFuture sync) { + this.sync = sync; + this.type = Type.SYNC; + } + + void load(Promise roll) { + this.roll = roll; + this.type = Type.ROLL; + } + + Type type() { + return type; + } + + FSWALEntry unloadAppend() { + FSWALEntry entry = this.entry; + this.entry = null; + this.type = Type.EMPTY; + return entry; + } + + SyncFuture unloadSync() { + SyncFuture sync = this.sync; + this.sync = null; + this.type = Type.EMPTY; + return sync; + } + + Promise unloadRoll() { + Promise roll = this.roll; + this.roll = null; + this.type = Type.EMPTY; + return roll; + } + + @Override + public String toString() { + return "AsyncFSWALRingBufferTruck [type=" + type + ", entry=" + entry + ", sync=" + sync + + ", roll=" + roll + "]"; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 142ab63..61d36ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -117,7 +117,7 @@ public class FSHLog extends AbstractFSWAL { * and sometimes we don't want to sync or we want to async the sync). The ring is where we make * sure of our ordering and it is also where we do batching up of handler sync calls. */ - private final Disruptor disruptor; + private final Disruptor disruptor; /** * An executorservice that runs the disruptor AppendEventHandler append executor. @@ -240,7 +240,7 @@ public class FSHLog extends AbstractFSWAL { .getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense // spinning as other strategies do. - this.disruptor = new Disruptor(RingBufferTruck.EVENT_FACTORY, + this.disruptor = new Disruptor(FSHLogRingBufferTruck::new, preallocatedEventCount, this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy()); // Advance the ring buffer sequence so that it starts from 1 instead of 0, @@ -467,7 +467,7 @@ public class FSHLog extends AbstractFSWAL { FSWALEntry entry = null; long sequence = this.disruptor.getRingBuffer().next(); try { - RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); + FSHLogRingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the // edit with its edit/sequence id. // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. @@ -751,7 +751,7 @@ public class FSHLog extends AbstractFSWAL { // here we use ring buffer sequence as transaction id SyncFuture syncFuture = getSyncFuture(sequence, span); try { - RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); + FSHLogRingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); truck.loadPayload(syncFuture); } finally { this.disruptor.getRingBuffer().publish(sequence); @@ -942,7 +942,7 @@ public class FSHLog extends AbstractFSWAL { * syncs and then hand them off to the sync thread seemed like a decent compromise. See HBASE-8755 * for more detail. */ - class RingBufferEventHandler implements EventHandler, LifecycleAware { + class RingBufferEventHandler implements EventHandler, LifecycleAware { private final SyncRunner[] syncRunners; private final SyncFuture[] syncFutures; // Had 'interesting' issues when this was non-volatile. On occasion, we'd not pass all @@ -1007,7 +1007,7 @@ public class FSHLog extends AbstractFSWAL { @Override // We can set endOfBatch in the below method if at end of our this.syncFutures array - public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch) + public void onEvent(final FSHLogRingBufferTruck truck, final long sequence, boolean endOfBatch) throws Exception { // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll // add appends to dfsclient as they come in. Batching appends doesn't give any significant diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLogRingBufferTruck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLogRingBufferTruck.java new file mode 100644 index 0000000..d6d46d6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLogRingBufferTruck.java @@ -0,0 +1,105 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.htrace.Span; + +/** + * A 'truck' to carry a payload across the {@link FSHLog} ring buffer from Handler to WAL. + * Has EITHER a {@link FSWALEntry} for making an append OR it has a {@link SyncFuture} to + * represent a 'sync' invocation. Truck instances are reused by the disruptor when it gets + * around to it so their payload references must be discarded on consumption to release them + * to GC. + */ +@InterfaceAudience.Private +final class FSHLogRingBufferTruck { + /** + * Either this syncFuture is set or entry is set, but not both. + */ + private SyncFuture syncFuture; + private FSWALEntry entry; + + /** + * The tracing span for this entry. Can be null. + * TODO: Fix up tracing. + */ + private Span span; + + /** + * Load the truck with a {@link FSWALEntry} and associated {@link Span}. + */ + void loadPayload(final FSWALEntry entry, final Span span) { + this.entry = entry; + this.span = span; + this.syncFuture = null; + } + + /** + * Load the truck with a {@link SyncFuture}. + */ + void loadPayload(final SyncFuture syncFuture) { + this.syncFuture = syncFuture; + this.entry = null; + this.span = null; + } + + /** + * return {@code true} when this truck is carrying a {@link FSWALEntry}, + * {@code false} otherwise. + */ + boolean hasFSWALEntryPayload() { + return this.entry != null; + } + + /** + * return {@code true} when this truck is carrying a {@link SyncFuture}, + * {@code false} otherwise. + */ + boolean hasSyncFuturePayload() { + return this.syncFuture != null; + } + + /** + * Unload the truck of its {@link FSWALEntry} payload. The internal refernce is released. + */ + FSWALEntry unloadFSWALEntryPayload() { + FSWALEntry ret = this.entry; + this.entry = null; + return ret; + } + + /** + * Unload the truck of its {@link SyncFuture} payload. The internal refernce is released. + */ + SyncFuture unloadSyncFuturePayload() { + SyncFuture ret = this.syncFuture; + this.syncFuture = null; + return ret; + } + + /** + * Unload the truck of its {@link Span} payload. The internal reference is released. + */ + Span unloadSpanPayload() { + Span ret = this.span; + this.span = null; + return ret; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java deleted file mode 100644 index 25c2111..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.wal; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.htrace.Span; - -import com.lmax.disruptor.EventFactory; - -/** - * A 'truck' to carry a payload across the {@link FSHLog} ring buffer from Handler to WAL. - * Has EITHER a {@link FSWALEntry} for making an append OR it has a {@link SyncFuture} to - * represent a 'sync' invocation. Truck instances are reused by the disruptor when it gets - * around to it so their payload references must be discarded on consumption to release them - * to GC. - */ -@InterfaceAudience.Private -class RingBufferTruck { - /** - * Either this syncFuture is set or entry is set, but not both. - */ - private SyncFuture syncFuture; - private FSWALEntry entry; - - /** - * The tracing span for this entry. Can be null. - * TODO: Fix up tracing. - */ - private Span span; - - /** - * Load the truck with a {@link FSWALEntry} and associated {@link Span}. - */ - void loadPayload(final FSWALEntry entry, final Span span) { - this.entry = entry; - this.span = span; - this.syncFuture = null; - } - - /** - * Load the truck with a {@link SyncFuture}. - */ - void loadPayload(final SyncFuture syncFuture) { - this.syncFuture = syncFuture; - this.entry = null; - this.span = null; - } - - /** - * return {@code true} when this truck is carrying a {@link FSWALEntry}, - * {@code false} otherwise. - */ - boolean hasFSWALEntryPayload() { - return this.entry != null; - } - - /** - * return {@code true} when this truck is carrying a {@link SyncFuture}, - * {@code false} otherwise. - */ - boolean hasSyncFuturePayload() { - return this.syncFuture != null; - } - - /** - * Unload the truck of its {@link FSWALEntry} payload. The internal refernce is released. - */ - FSWALEntry unloadFSWALEntryPayload() { - FSWALEntry ret = this.entry; - this.entry = null; - return ret; - } - - /** - * Unload the truck of its {@link SyncFuture} payload. The internal refernce is released. - */ - SyncFuture unloadSyncFuturePayload() { - SyncFuture ret = this.syncFuture; - this.syncFuture = null; - return ret; - } - - /** - * Unload the truck of its {@link Span} payload. The internal reference is released. - */ - Span unloadSpanPayload() { - Span ret = this.span; - this.span = null; - return ret; - } - - /** - * Factory for making a bunch of these. Needed by the ringbuffer/disruptor. - */ - final static EventFactory EVENT_FACTORY = new EventFactory() { - public RingBufferTruck newInstance() { - return new RingBufferTruck(); - } - }; -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index c38c262..13ab85e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -20,13 +20,12 @@ package org.apache.hadoop.hbase.wal; import com.google.common.annotations.VisibleForTesting; + import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.util.Set; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; /** * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index f9d962c..ca2ec85 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -374,12 +374,12 @@ public abstract class AbstractTestWALReplay { Path f = new Path(basedir, "hfile"); HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), Bytes.toBytes("z"), 10); - List > hfs= new ArrayList>(1); + List> hfs = new ArrayList>(1); hfs.add(Pair.newPair(family, f.toString())); region.bulkLoadHFiles(hfs, true, null); // Add an edit so something in the WAL - byte [] row = tableName.getName(); + byte[] row = tableName.getName(); region.put((new Put(row)).addColumn(family, family, family)); wal.sync(); final int rowsInsertedCount = 11; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 721ee85..a55df68 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Threads; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @@ -42,7 +43,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { @BeforeClass public static void setUpBeforeClass() throws Exception { - GROUP = new NioEventLoopGroup(); + GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncFSWAL")); AbstractTestFSWAL.setUpBeforeClass(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java index 7f0c035..e008b37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.AfterClass; @@ -41,7 +42,7 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay { @BeforeClass public static void setUpBeforeClass() throws Exception { - GROUP = new NioEventLoopGroup(); + GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncWALReplay")); Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); AbstractTestWALReplay.setUpBeforeClass(); -- 2.7.4