From 2ac5d6c8376ed1d434c1da2dc7664ca36afe34b2 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 17 Nov 2016 20:27:36 +0800 Subject: [PATCH] Add delay, use large buf size --- .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 4 +- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 76 +++++++++++++++++----- .../hadoop/hbase/regionserver/wal/SyncFuture.java | 15 +++-- 3 files changed, 69 insertions(+), 26 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 02ffcd5..cd287b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -307,7 +307,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.summer = summer; this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); this.alloc = alloc; - this.buf = alloc.directBuffer(); + this.buf = alloc.directBuffer(80 * 1024); this.state = State.STREAMING; setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); } @@ -472,7 +472,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } }); int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum(); - ByteBuf newBuf = alloc.directBuffer().ensureWritable(trailingPartialChunkLen); + ByteBuf newBuf = alloc.directBuffer(80 * 1024).ensureWritable(trailingPartialChunkLen); if (trailingPartialChunkLen != 0) { buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 20c43aa..333f203 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -25,13 +25,13 @@ import com.lmax.disruptor.Sequence; import com.lmax.disruptor.Sequencer; import io.netty.channel.EventLoop; +import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.concurrent.SingleThreadEventExecutor; import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.Field; import java.util.ArrayDeque; -import java.util.Comparator; import java.util.Deque; import java.util.Iterator; import java.util.List; @@ -40,6 +40,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -51,7 +52,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; @@ -126,22 +126,20 @@ import org.apache.htrace.TraceScope; * For a broken writer roll request, the only difference is that we can bypass the wait for safe * point stage. See the comments in the {@link #syncFailed(Throwable)} method for more details. */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +@InterfaceAudience.Private public class AsyncFSWAL extends AbstractFSWAL { private static final Log LOG = LogFactory.getLog(AsyncFSWAL.class); - private static final Comparator SEQ_COMPARATOR = (o1, o2) -> { - int c = Long.compare(o1.getTxid(), o2.getTxid()); - return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); - }; - public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries"; public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10; + public static final String ASYNC_WAL_SYNC_MAX_DELAY_MS = "hbase.wal.async.sync.max.delay.ms"; + public static final int DEFAULT_ASYNC_WAL_SYNC_MAX_DELAY_MS = 1; + private final EventLoop eventLoop; private final Lock consumeLock = new ReentrantLock(); @@ -169,6 +167,8 @@ public class AsyncFSWAL extends AbstractFSWAL { private final long batchSize; + private final long syncMaxDelayNs; + private final int createMaxRetries; private final ExecutorService closeExecutor = Executors.newCachedThreadPool( @@ -180,7 +180,10 @@ public class AsyncFSWAL extends AbstractFSWAL { private final Deque unackedAppends = new ArrayDeque<>(); - private final SortedSet syncFutures = new TreeSet(SEQ_COMPARATOR); + private final SortedSet syncFutures = new TreeSet<>((o1, o2) -> { + int c = Long.compare(o1.getTxid(), o2.getTxid()); + return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); + }); // the highest txid of WAL entries being processed private long highestProcessedAppendTxid; @@ -190,6 +193,8 @@ public class AsyncFSWAL extends AbstractFSWAL { private long highestProcessedAppendTxidAtLastSync; + private ScheduledFuture delayedConsumerTaskFuture; + public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoop eventLoop) @@ -198,7 +203,6 @@ public class AsyncFSWAL extends AbstractFSWAL { this.eventLoop = eventLoop; Supplier hasConsumerTask; if (eventLoop instanceof SingleThreadEventExecutor) { - try { Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); field.setAccessible(true); @@ -225,6 +229,8 @@ public class AsyncFSWAL extends AbstractFSWAL { waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); + syncMaxDelayNs = TimeUnit.MILLISECONDS + .toNanos(conf.getInt(ASYNC_WAL_SYNC_MAX_DELAY_MS, DEFAULT_ASYNC_WAL_SYNC_MAX_DELAY_MS)); createMaxRetries = conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); rollWriter(); @@ -398,7 +404,11 @@ public class AsyncFSWAL extends AbstractFSWAL { } } - private void appendAndSync() { + // used to locate in syncFutures. + private final SyncFuture dummySyncFuture = new SyncFuture(); + + // return the delayed time in nanoseconds if we have decided to delay a sync request, or 0 if not. + private long appendAndSync() { final AsyncWriter writer = this.writer; // maybe a sync request is not queued when we issue a sync, so check here to see if we could // finish some. @@ -446,7 +456,7 @@ public class AsyncFSWAL extends AbstractFSWAL { if (writer.getLength() - fileLengthAtLastSync >= batchSize) { // sync because buffer size limit. sync(writer); - return; + return 0; } if (writer.getLength() == fileLengthAtLastSync) { // we haven't written anything out, just advance the highestSyncedSequence since we may only @@ -456,17 +466,41 @@ public class AsyncFSWAL extends AbstractFSWAL { finishSync(false); trySetReadyForRolling(); } - return; + return 0; + } + // we have some unsynced data in writer but there is no sync request + if (syncFutures.isEmpty()) { + return 0; + } + dummySyncFuture.txid = highestProcessedAppendTxidAtLastSync; + Iterator iter = syncFutures.tailSet(dummySyncFuture).iterator(); + if (!iter.hasNext()) { + // no new sync request + return 0; + } + SyncFuture sync = iter.next(); + if (sync.getTxid() == highestProcessedAppendTxidAtLastSync) { + // the txid is same + if (!iter.hasNext()) { + return 0; + } + sync = iter.next(); } - // we have some unsynced data but haven't reached the batch size yet - if (!syncFutures.isEmpty() - && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) { - // we have at least one sync request + long delayNs = syncMaxDelayNs - (System.nanoTime() - sync.getStartNs()); + if (delayNs <= 0) { + // issue a sync sync(writer); + return 0; } + // The time elapsed from the earliest sync request has not reached the syncMaxDelayNs yet. + return delayNs; } private void consume() { + if (delayedConsumerTaskFuture != null) { + delayedConsumerTaskFuture.cancel(false); + delayedConsumerTaskFuture = null; + } consumeLock.lock(); try { if (writerBroken) { @@ -507,7 +541,7 @@ public class AsyncFSWAL extends AbstractFSWAL { } waitingConsumePayloadsGatingSequence.set(nextCursor); } - appendAndSync(); + long syncDelayedNs = appendAndSync(); if (hasConsumerTask.get()) { return; } @@ -521,6 +555,12 @@ public class AsyncFSWAL extends AbstractFSWAL { // give up scheduling the consumer task. // 3. we set consumerScheduled to false and also give up scheduling consumer task. if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { + if (syncDelayedNs > 0) { + delayedConsumerTaskFuture = eventLoop.schedule(() -> { + delayedConsumerTaskFuture = null; + consume(); + }, syncDelayedNs, TimeUnit.NANOSECONDS); + } return; } else { // maybe someone has grabbed this before us diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index bc2e62e..2b127e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -53,7 +53,7 @@ class SyncFuture { /** * The transaction id of this operation, monotonically increases. */ - private long txid; + long txid; /** * The transaction id that was set in here when we were marked done. Should be equal or > txnId. @@ -73,6 +73,8 @@ class SyncFuture { */ private Span span; + private long startNs; + /** * Call this method to clear old usage and get it ready for new deploy. * @param txid the new transaction id @@ -92,6 +94,7 @@ class SyncFuture { this.txid = txid; this.span = span; this.throwable = null; + this.startNs = System.nanoTime(); return this; } @@ -146,13 +149,9 @@ class SyncFuture { return true; } - boolean cancel(boolean mayInterruptIfRunning) { - throw new UnsupportedOperationException(); - } - synchronized long get(long timeoutNs) throws InterruptedException, ExecutionException, TimeoutIOException { - final long done = System.nanoTime() + timeoutNs; + final long done = startNs + timeoutNs; while (!isDone()) { wait(1000); if (System.nanoTime() >= done) { @@ -178,4 +177,8 @@ class SyncFuture { synchronized Throwable getThrowable() { return this.throwable; } + + synchronized long getStartNs() { + return this.startNs; + } } -- 1.9.1