From e1b4d816e17647ca2892df84071e89a564b98b9d Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 30 Apr 2019 15:22:54 -0700 Subject: [PATCH] HBASE-22301 Consider rolling the WAL if the HDFS write pipeline is slow --- .../regionserver/wal/MetricsWALSource.java | 19 +- .../wal/MetricsWALSourceImpl.java | 30 ++- .../hadoop/hbase/regionserver/LogRoller.java | 2 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 162 +++++++++++--- .../hbase/regionserver/wal/MetricsWAL.java | 19 +- .../regionserver/wal/WALActionsListener.java | 16 +- .../hadoop/hbase/wal/DisabledWALProvider.java | 2 +- .../regionserver/wal/TestLogRolling.java | 203 +++++++++++++++++- .../regionserver/wal/TestMetricsWAL.java | 16 +- 9 files changed, 417 insertions(+), 52 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java index 2be1d0d079..598764243f 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java @@ -58,10 +58,19 @@ public interface MetricsWALSource extends BaseSource { String SYNC_TIME = "syncTime"; String SYNC_TIME_DESC = "The time it took to sync the WAL to HDFS."; String ROLL_REQUESTED = "rollRequest"; - String ROLL_REQUESTED_DESC = "How many times a log roll has been requested total"; + String ROLL_REQUESTED_DESC = "How many times a roll has been requested total"; + String ERROR_ROLL_REQUESTED = "errorRollRequest"; + String ERROR_ROLL_REQUESTED_DESC = + "How many times a roll was requested due to I/O or other errors."; String LOW_REPLICA_ROLL_REQUESTED = "lowReplicaRollRequest"; String LOW_REPLICA_ROLL_REQUESTED_DESC = - "How many times a log roll was requested due to too few DN's in the write pipeline."; + "How many times a roll was requested due to too few datanodes in the write pipeline."; + String SLOW_SYNC_ROLL_REQUESTED = "slowSyncRollRequest"; + String SLOW_SYNC_ROLL_REQUESTED_DESC = + "How many times a roll was requested due to sync too slow on the write pipeline."; + String SIZE_ROLL_REQUESTED = "sizeRollRequest"; + String SIZE_ROLL_REQUESTED_DESC = + "How many times a roll was requested due to file size roll threshold."; String WRITTEN_BYTES = "writtenBytes"; String WRITTEN_BYTES_DESC = "Size (in bytes) of the data written to the WAL."; @@ -92,8 +101,14 @@ public interface MetricsWALSource extends BaseSource { void incrementLogRollRequested(); + void incrementErrorLogRoll(); + void incrementLowReplicationLogRoll(); + void incrementSlowSyncLogRoll(); + + void incrementSizeLogRoll(); + void incrementWrittenBytes(long val); long getWrittenBytes(); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java index 1299637d30..190069425e 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java @@ -39,7 +39,10 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo private final MutableFastCounter appendCount; private final MutableFastCounter slowAppendCount; private final MutableFastCounter logRollRequested; - private final MutableFastCounter lowReplicationLogRollRequested; + private final MutableFastCounter errorRollRequested; + private final MutableFastCounter lowReplicationRollRequested; + private final MutableFastCounter slowSyncRollRequested; + private final MutableFastCounter sizeRollRequested; private final MutableFastCounter writtenBytes; public MetricsWALSourceImpl() { @@ -61,8 +64,14 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo syncTimeHisto = this.getMetricsRegistry().newTimeHistogram(SYNC_TIME, SYNC_TIME_DESC); logRollRequested = this.getMetricsRegistry().newCounter(ROLL_REQUESTED, ROLL_REQUESTED_DESC, 0L); - lowReplicationLogRollRequested = this.getMetricsRegistry() + errorRollRequested = this.getMetricsRegistry() + .newCounter(ERROR_ROLL_REQUESTED, ERROR_ROLL_REQUESTED_DESC, 0L); + lowReplicationRollRequested = this.getMetricsRegistry() .newCounter(LOW_REPLICA_ROLL_REQUESTED, LOW_REPLICA_ROLL_REQUESTED_DESC, 0L); + slowSyncRollRequested = this.getMetricsRegistry() + .newCounter(SLOW_SYNC_ROLL_REQUESTED, SLOW_SYNC_ROLL_REQUESTED_DESC, 0L); + sizeRollRequested = this.getMetricsRegistry() + .newCounter(SIZE_ROLL_REQUESTED, SIZE_ROLL_REQUESTED_DESC, 0L); writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0l); } @@ -96,9 +105,24 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo logRollRequested.incr(); } + @Override + public void incrementErrorLogRoll() { + errorRollRequested.incr(); + } + @Override public void incrementLowReplicationLogRoll() { - lowReplicationLogRollRequested.incr(); + lowReplicationRollRequested.incr(); + } + + @Override + public void incrementSlowSyncLogRoll() { + slowSyncRollRequested.incr(); + } + + @Override + public void incrementSizeLogRoll() { + sizeRollRequested.incr(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 246c02cf58..fd208c25b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -71,7 +71,7 @@ public class LogRoller extends HasThread { if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { wal.registerWALActionsListener(new WALActionsListener.Base() { @Override - public void logRollRequested(boolean lowReplicas) { + public void logRollRequested(WALActionsListener.RollRequestReason reason) { walNeedsRoll.put(wal, Boolean.TRUE); // TODO logs will contend with each other here, replace with e.g. DelayedQueue synchronized(rollLog) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index b46ad0fd87..882a2d53f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; import java.io.FileNotFoundException; @@ -160,9 +164,18 @@ public class FSHLog implements WAL { private static final Log LOG = LogFactory.getLog(FSHLog.class); - private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms + static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.wal.slowsync.ms"; + static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms + static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.hlog.roll.on.sync.ms"; + static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms + static final String SLOW_SYNC_ROLL_THRESHOLD = "hbase.regionserver.wal.slowsync.roll.threshold"; + static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings + static final String SLOW_SYNC_ROLL_INTERVAL_MS = + "hbase.regionserver.wal.slowsync.roll.interval.ms"; + static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute - private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min + static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; + static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min /** * The nexus at which all incoming handlers meet. Does appends and sync with an ordering. @@ -280,7 +293,10 @@ public class FSHLog implements WAL { private final boolean useHsync; - private final int slowSyncNs; + private final long slowSyncNs, rollOnSyncNs; + private final int slowSyncRollThreshold; + private final int slowSyncCheckInterval; + private final AtomicInteger slowSyncCount = new AtomicInteger(); private final long walSyncTimeout; @@ -350,9 +366,13 @@ public class FSHLog implements WAL { private final AtomicInteger closeErrorCount = new AtomicInteger(); + protected volatile boolean rollRequested; + // Last time to check low replication on hlog's pipeline private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); + // Last time we asked to roll the log due to a slow sync + private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); /** * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. @@ -540,11 +560,16 @@ public class FSHLog implements WAL { // rollWriter sets this.hdfs_out if it can. rollWriter(); - this.slowSyncNs = - 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", - DEFAULT_SLOW_SYNC_TIME_MS); - this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout", - DEFAULT_WAL_SYNC_TIMEOUT_MS); + this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, + conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS))); + this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, + DEFAULT_ROLL_ON_SYNC_TIME_MS)); + this.slowSyncRollThreshold = conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, + DEFAULT_SLOW_SYNC_ROLL_THRESHOLD); + this.slowSyncCheckInterval = conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, + DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS); + this.walSyncTimeout = conf.getLong(WAL_SYNC_TIMEOUT_MS, + conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)); // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. @@ -720,6 +745,11 @@ public class FSHLog implements WAL { // NewPath could be equal to oldPath if replaceWriter fails. newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut); tellListenersAboutPostLogRoll(oldPath, newPath); + // Reset rollRequested status + rollRequested = false; + // We got a new writer, so reset the slow sync count + lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); + slowSyncCount.set(0); // Can we delete any of the old log files? if (getNumRolledLogFiles() > 0) { cleanOldLogs(); @@ -1303,8 +1333,11 @@ public class FSHLog implements WAL { syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); // Can we release other syncs? syncCount += releaseSyncFutures(currentSequence, lastException); - if (lastException != null) requestLogRoll(); - else checkLogRoll(); + if (lastException != null) { + requestLogRoll(); + } else { + checkLogRoll(); + } } postSync(System.nanoTime() - start, syncCount); } catch (InterruptedException e) { @@ -1321,24 +1354,37 @@ public class FSHLog implements WAL { * Schedule a log roll if needed. */ public void checkLogRoll() { + // If we have already requested a roll, do nothing + if (isLogRollRequested()) { + return; + } // Will return immediately if we are in the middle of a WAL log roll currently. - if (!rollWriterLock.tryLock()) return; - boolean lowReplication; - try { - lowReplication = checkLowReplication(); - } finally { - rollWriterLock.unlock(); + if (!rollWriterLock.tryLock()) { + return; } try { - if (lowReplication || (writer != null && writer.getLength() > logrollsize)) { - requestLogRoll(lowReplication); + if (checkLowReplication()) { + LOG.warn("Requesting log roll because of low replication, current pipeline: " + + Arrays.toString(getPipeLine())); + requestLogRoll(LOW_REPLICATION); + } else if (writer != null && writer.getLength() > logrollsize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Requesting log roll because of file size threshold; length=" + + writer.getLength() + ", logrollsize=" + logrollsize); + } + requestLogRoll(SIZE); + } else if (checkSlowSync()) { + // We log this already in checkSlowSync + requestLogRoll(SLOW_SYNC); } } catch (IOException e) { LOG.warn("Writer.getLength() failed; continuing", e); + } finally { + rollWriterLock.unlock(); } } - /* + /** * @return true if number of replicas for the WAL is lower than threshold */ private boolean checkLowReplication() { @@ -1389,6 +1435,41 @@ public class FSHLog implements WAL { return logRollNeeded; } + /** + * @return true if we exceeded the slow sync roll threshold over the last check + * interval + */ + private boolean checkSlowSync() { + boolean result = false; + long now = EnvironmentEdgeManager.currentTime(); + long elapsedTime = now - lastTimeCheckSlowSync; + if (elapsedTime >= slowSyncCheckInterval) { + if (slowSyncCount.get() >= slowSyncRollThreshold) { + if (elapsedTime >= (2 * slowSyncCheckInterval)) { + // If two or more slowSyncCheckInterval have elapsed this is a corner case + // where a train of slow syncs almost triggered us but then there was a long + // interval from then until the one more that pushed us over. If so, we + // should do nothing and let the count reset. + if (LOG.isDebugEnabled()) { + LOG.debug("checkSlowSync triggered but we decided to ignore it; " + + "count=" + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + + ", elapsedTime=" + elapsedTime + " ms, slowSyncCheckInterval=" + + slowSyncCheckInterval + " ms"); + } + // Fall through to count reset below + } else { + LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" + + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + + ", current pipeline: " + Arrays.toString(getPipeLine())); + result = true; + } + } + lastTimeCheckSlowSync = now; + slowSyncCount.set(0); + } + return result; + } + private SyncFuture publishSyncOnRingBuffer(long sequence) { return publishSyncOnRingBuffer(sequence, null, false); } @@ -1452,10 +1533,23 @@ public class FSHLog implements WAL { if (timeInNanos > this.slowSyncNs) { String msg = new StringBuilder().append("Slow sync cost: ") - .append(timeInNanos / 1000000).append(" ms, current pipeline: ") + .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)) + .append(" ms, current pipeline: ") .append(Arrays.toString(getPipeLine())).toString(); Trace.addTimelineAnnotation(msg); LOG.info(msg); + // A single sync took too long. + // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative + // effects. Here we have a single data point that indicates we should take immediate + // action, so do so. + if (timeInNanos > this.rollOnSyncNs) { + LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" + + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" + + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " + + Arrays.toString(getPipeLine())); + requestLogRoll(SLOW_SYNC); + } + slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this } if (!listeners.isEmpty()) { for (WALActionsListener listener : listeners) { @@ -1539,15 +1633,24 @@ public class FSHLog implements WAL { } } + protected boolean isLogRollRequested() { + return rollRequested; + } + // public only until class moves to o.a.h.h.wal public void requestLogRoll() { - requestLogRoll(false); + requestLogRoll(ERROR); } - private void requestLogRoll(boolean tooFewReplicas) { + private void requestLogRoll(final WALActionsListener.RollRequestReason reason) { + // If we have already requested a roll, don't do it again + if (rollRequested) { + return; + } if (!this.listeners.isEmpty()) { + rollRequested = true; // No point to assert this unless there is a registered listener for (WALActionsListener i: this.listeners) { - i.logRollRequested(tooFewReplicas); + i.logRollRequested(reason); } } } @@ -1599,8 +1702,7 @@ public class FSHLog implements WAL { public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + - ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG)); - + ClassSize.ATOMIC_INTEGER + (3 * Bytes.SIZEOF_INT) + (4 * Bytes.SIZEOF_LONG)); private static void split(final Configuration conf, final Path p) throws IOException { FileSystem fs = FSUtils.getWALFileSystem(conf); @@ -2083,4 +2185,14 @@ public class FSHLog implements WAL { public long getLastTimeCheckLowReplication() { return this.lastTimeCheckLowReplication; } + + @VisibleForTesting + Writer getWriter() { + return this.writer; + } + + @VisibleForTesting + void setWriter(Writer writer) { + this.writer = writer; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index 69a31cdd6f..c2b6996a8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -72,10 +72,23 @@ public class MetricsWAL extends WALActionsListener.Base { } @Override - public void logRollRequested(boolean underReplicated) { + public void logRollRequested(WALActionsListener.RollRequestReason reason) { source.incrementLogRollRequested(); - if (underReplicated) { - source.incrementLowReplicationLogRoll(); + switch (reason) { + case ERROR: + source.incrementErrorLogRoll(); + break; + case LOW_REPLICATION: + source.incrementLowReplicationLogRoll(); + break; + case SIZE: + source.incrementSizeLogRoll(); + break; + case SLOW_SYNC: + source.incrementSlowSyncLogRoll(); + break; + default: + break; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index 60ab7b8057..78da53fe44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -34,6 +34,18 @@ import org.apache.hadoop.hbase.wal.WALKey; @InterfaceAudience.Private public interface WALActionsListener { + /** The reason for the log roll request. */ + static enum RollRequestReason { + /** The length of the log exceeds the roll size threshold. */ + SIZE, + /** Too few replicas in the writer pipeline. */ + LOW_REPLICATION, + /** Too much time spent waiting for sync. */ + SLOW_SYNC, + /** I/O or other error. */ + ERROR + }; + /** * The WAL is going to be rolled. The oldPath can be null if this is * the first log file from the regionserver. @@ -67,7 +79,7 @@ public interface WALActionsListener { /** * A request was made that the WAL be rolled. */ - void logRollRequested(boolean tooFewReplicas); + void logRollRequested(RollRequestReason reason); /** * The WAL is about to close. @@ -130,7 +142,7 @@ public interface WALActionsListener { public void postLogArchive(Path oldPath, Path newPath) throws IOException {} @Override - public void logRollRequested(boolean tooFewReplicas) {} + public void logRollRequested(RollRequestReason reason) {} @Override public void logCloseRequested() {} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 8931c0804e..adbd450b2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -119,7 +119,7 @@ class DisabledWALProvider implements WALProvider { public byte[][] rollWriter() { if (!listeners.isEmpty()) { for (WALActionsListener listener : listeners) { - listener.logRollRequested(false); + listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR); } for (WALActionsListener listener : listeners) { try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 68200d2944..f799786571 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; @@ -64,7 +66,9 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -146,9 +150,15 @@ public class TestLogRolling { TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); // the namenode might still try to choose the recently-dead datanode // for a pipeline, so try to a new pipeline multiple times - TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); + TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); + + // For slow sync threshold test: roll after 5 slow syncs in 10 seconds + TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); + TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); + // For slow sync threshold test: roll once after a sync above this threshold + TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); } @Before @@ -223,19 +233,187 @@ public class TestLogRolling { LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) + " log files"); - // flush all regions - for (Region r: server.getOnlineRegionsLocalContext()) { - r.flush(true); - } + // flush all regions + for (Region r: server.getOnlineRegionsLocalContext()) { + r.flush(true); + } - // Now roll the log - log.rollWriter(); + // Now roll the log + log.rollWriter(); int count = DefaultWALProvider.getNumRolledLogFiles(log); LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); assertTrue(("actual count: " + count), count <= 2); } + @Test + public void testSlowSyncLogRolling() throws Exception { + // Create the test table + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(getName())); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); + int row = 1; + try { + assertTrue(((HTable) table).isAutoFlush()); + + // Get a reference to the FSHLog + server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); + HRegionInfo region = server.getOnlineRegions(desc.getTableName()).get(0).getRegionInfo(); + final FSHLog log = (FSHLog) server.getWAL(region); + + // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested + + final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); + log.registerWALActionsListener(new WALActionsListener.Base() { + @Override + public void logRollRequested(WALActionsListener.RollRequestReason reason) { + switch (reason) { + case SLOW_SYNC: + slowSyncHookCalled.lazySet(true); + break; + default: + break; + } + } + }); + + // Write some data + + for (int i = 0; i < 10; i++) { + writeData(table, row++); + } + + assertFalse("Should not have triggered log roll due to SLOW_SYNC", + slowSyncHookCalled.get()); + + // Set up for test + slowSyncHookCalled.set(false); + + // Wrap the current writer with the anonymous class below that adds 200 ms of + // latency to any sync on the hlog. This should be more than sufficient to trigger + // slow sync warnings. + final Writer oldWriter1 = log.getWriter(); + final Writer newWriter1 = new Writer() { + @Override + public void close() throws IOException { + oldWriter1.close(); + } + @Override + public void sync(boolean forceSync) throws IOException { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + InterruptedIOException ex = new InterruptedIOException(); + ex.initCause(e); + throw ex; + } + oldWriter1.sync(forceSync); + } + @Override + public void append(Entry entry) throws IOException { + oldWriter1.append(entry); + } + @Override + public long getLength() throws IOException { + return oldWriter1.getLength(); + } + }; + log.setWriter(newWriter1); + + // Write some data. + // We need to write at least 5 times, but double it. We should only request + // a SLOW_SYNC roll once in the current interval. + for (int i = 0; i < 10; i++) { + writeData(table, row++); + } + + // Wait for our wait injecting writer to get rolled out, as needed. + + TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return log.getWriter() != newWriter1; + } + @Override + public String explainFailure() throws Exception { + return "Waited too long for our test writer to get rolled out"; + } + }); + + assertTrue("Should have triggered log roll due to SLOW_SYNC", + slowSyncHookCalled.get()); + + // Set up for test + slowSyncHookCalled.set(false); + + // Wrap the current writer with the anonymous class below that adds 5000 ms of + // latency to any sync on the hlog. + // This will trip the other threshold. + final Writer oldWriter2 = log.getWriter(); + final Writer newWriter2 = new Writer() { + @Override + public void close() throws IOException { + oldWriter2.close(); + } + @Override + public void sync(boolean forceSync) throws IOException { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + InterruptedIOException ex = new InterruptedIOException(); + ex.initCause(e); + throw ex; + } + oldWriter2.sync(forceSync); + } + @Override + public void append(Entry entry) throws IOException { + oldWriter2.append(entry); + } + @Override + public long getLength() throws IOException { + return oldWriter2.getLength(); + } + }; + log.setWriter(newWriter2); + + // Write some data. Should only take one sync. + + writeData(table, row++); + + // Wait for our wait injecting writer to get rolled out, as needed. + + TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return log.getWriter() != newWriter2; + } + @Override + public String explainFailure() throws Exception { + return "Waited too long for our test writer to get rolled out"; + } + }); + + assertTrue("Should have triggered log roll due to SLOW_SYNC", + slowSyncHookCalled.get()); + + // Set up for test + slowSyncHookCalled.set(false); + + // Write some data + for (int i = 0; i < 10; i++) { + writeData(table, row++); + } + + assertFalse("Should not have triggered log roll due to SLOW_SYNC", + slowSyncHookCalled.get()); + + } finally { + table.close(); + } + } + private String getName() { return "TestLogRolling-" + name.getMethodName(); } @@ -316,12 +494,15 @@ public class TestLogRolling { HRegionInfo region = server.getOnlineRegions(desc.getTableName()).get(0).getRegionInfo(); final FSHLog log = (FSHLog) server.getWAL(region); final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); - log.registerWALActionsListener(new WALActionsListener.Base() { @Override - public void logRollRequested(boolean lowReplication) { - if (lowReplication) { - lowReplicationHookCalled.lazySet(true); + public void logRollRequested(WALActionsListener.RollRequestReason reason) { + switch (reason) { + case LOW_REPLICATION: + lowReplicationHookCalled.lazySet(true); + break; + default: + break; } } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java index 9a7d494d8b..b36336c22c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java @@ -35,13 +35,21 @@ public class TestMetricsWAL { public void testLogRollRequested() throws Exception { MetricsWALSource source = mock(MetricsWALSourceImpl.class); MetricsWAL metricsWAL = new MetricsWAL(source); - metricsWAL.logRollRequested(false); - metricsWAL.logRollRequested(true); + metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.ERROR); + metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.LOW_REPLICATION); + metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SLOW_SYNC); + metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SIZE); - // Log roll was requested twice - verify(source, times(2)).incrementLogRollRequested(); + // Log roll was requested four times + verify(source, times(4)).incrementLogRollRequested(); + // One was because of an IO error. + verify(source, times(1)).incrementErrorLogRoll(); // One was because of low replication on the hlog. verify(source, times(1)).incrementLowReplicationLogRoll(); + // One was because of slow sync on the hlog. + verify(source, times(1)).incrementSlowSyncLogRoll(); + // One was because of hlog file length limit. + verify(source, times(1)).incrementSizeLogRoll(); } @Test -- 2.21.0