diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 7ffd3daeb5..ea9a0d8920 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -407,6 +407,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { waitingAckQueue.removeFirst(); return; } + // TODO: we should perhaps measure time taken per DN here; + // we could collect statistics per DN, and/or exclude bad nodes in createOutput. datanodeList.forEach(ch -> { ch.write(headerBuf.retainedDuplicate()); ch.write(checksumBuf.retainedDuplicate()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index f09f251947..5ec982dcde 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -116,8 +116,13 @@ public abstract class AbstractFSWAL implements WAL { /** Don't log blocking regions more frequently than this. */ private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5); + private static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.hlog.slowsync.ms"; protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms + private static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.hlog.roll.on.sync.ms"; + protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms + + private static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.hlog.sync.timeout"; private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min /** @@ -172,7 +177,8 @@ public abstract class AbstractFSWAL implements WAL { */ protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); - protected final long slowSyncNs; + /** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */ + protected final long slowSyncNs, rollOnSyncNs; private final long walSyncTimeoutNs; @@ -429,10 +435,13 @@ public abstract class AbstractFSWAL implements WAL { LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); - this.slowSyncNs = TimeUnit.MILLISECONDS - .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)); - this.walSyncTimeoutNs = TimeUnit.MILLISECONDS - .toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)); + this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos( + conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS)); + this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos( + conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS)); + this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos( + conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS)); + this.cachedSyncFutures = new ThreadLocal() { @Override protected SyncFuture initialValue() { @@ -988,7 +997,7 @@ public abstract class AbstractFSWAL implements WAL { return len; } - protected final void postSync(final long timeInNanos, final int handlerSyncs) { + protected final boolean postSync(long timeInNanos, int handlerSyncs) { if (timeInNanos > this.slowSyncNs) { String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000) .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString(); @@ -1000,6 +1009,11 @@ public abstract class AbstractFSWAL implements WAL { listener.postSync(timeInNanos, handlerSyncs); } } + if (timeInNanos > this.rollOnSyncNs) { + LOG.info("Trying to request a roll due to a very log sync ({} ms)", timeInNanos / 1000000); + return true; + } + return false; } protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index b0c25496a2..81308ad530 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -328,13 +328,14 @@ public class AsyncFSWAL extends AbstractFSWAL { break; } } - postSync(System.nanoTime() - startTimeNs, finishSync(true)); + + boolean doRequestRoll = postSync(System.nanoTime() - startTimeNs, finishSync(true)); if (trySetReadyForRolling()) { // we have just finished a roll, then do not need to check for log rolling, the writer will be // closed soon. return; } - if (writer.getLength() < logrollsize || rollRequested) { + if ((!doRequestRoll && writer.getLength() < logrollsize) || rollRequested) { return; } rollRequested = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index baa87a4c7d..68cd338984 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -257,7 +257,10 @@ public class FSHLog extends AbstractFSWAL { long startTimeNanos = System.nanoTime(); try { nextWriter.sync(useHsync); - postSync(System.nanoTime() - startTimeNanos, 0); + boolean doRequestRoll = postSync(System.nanoTime() - startTimeNanos, 0); + if (doRequestRoll) { + LOG.info("Ignoring a roll request after a sync for a new file"); + } } catch (IOException e) { // optimization failed, no need to abort here. LOG.warn("pre-sync failed but an optimization so keep going", e); @@ -576,6 +579,7 @@ public class FSHLog extends AbstractFSWAL { //TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); long start = System.nanoTime(); Throwable lastException = null; + boolean wasRollRequested = false; try { TraceUtil.addTimelineAnnotation("syncing writer"); writer.sync(useHsync); @@ -596,12 +600,16 @@ public class FSHLog extends AbstractFSWAL { // Can we release other syncs? syncCount += releaseSyncFutures(currentSequence, lastException); if (lastException != null) { + wasRollRequested = true; requestLogRoll(); } else { - checkLogRoll(); + wasRollRequested = checkLogRoll(); } } - postSync(System.nanoTime() - start, syncCount); + boolean doRequestRoll = postSync(System.nanoTime() - start, syncCount); + if (!wasRollRequested && doRequestRoll) { + requestLogRoll(); + } } catch (InterruptedException e) { // Presume legit interrupt. Thread.currentThread().interrupt(); @@ -615,10 +623,10 @@ public class FSHLog extends AbstractFSWAL { /** * Schedule a log roll if needed. */ - private void checkLogRoll() { + private boolean checkLogRoll() { // Will return immediately if we are in the middle of a WAL log roll currently. if (!rollWriterLock.tryLock()) { - return; + return false; } boolean lowReplication; try { @@ -628,7 +636,9 @@ public class FSHLog extends AbstractFSWAL { } if (lowReplication || (writer != null && writer.getLength() > logrollsize)) { requestLogRoll(lowReplication); + return true; } + return false; } /**