From b97710098837843ddf1f3c9e4e2fafe62d096eeb Mon Sep 17 00:00:00 2001 From: Anton Karamanov Date: Tue, 15 Jul 2014 21:42:15 +0400 Subject: [PATCH] KAFKA-1414 Speedup broker shutdown and startup after hard reset; patched by Jay Kreps, Alexey Ozeritskiy, Dmitry Bugaychenko and Anton Karamanov --- config/server.properties | 6 + core/src/main/scala/kafka/log/LogConfig.scala | 30 +++- core/src/main/scala/kafka/log/LogManager.scala | 161 +++++++++++++++------ core/src/main/scala/kafka/server/KafkaConfig.scala | 6 + core/src/main/scala/kafka/server/KafkaServer.scala | 6 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 125 +++++----------- .../server/HighwatermarkPersistenceTest.scala | 4 +- .../unit/kafka/server/ReplicaManagerTest.scala | 2 + 8 files changed, 200 insertions(+), 140 deletions(-) diff --git a/config/server.properties b/config/server.properties index f16c84c..ff2987d 100644 --- a/config/server.properties +++ b/config/server.properties @@ -62,6 +62,12 @@ log.dirs=/tmp/kafka-logs # the brokers. num.partitions=1 +# The number of threads to be used when recovering unflushed segments of logs +#log.recovery.threads=1 + +# The number of threads to be used when shutting down +#log.shutdown.threads=1 + ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 5746ad4..af4845b 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -36,6 +36,8 @@ object Defaults { val MinCleanableDirtyRatio = 0.5 val Compact = false val UncleanLeaderElectionEnable = true + val RecoveryThreads = 1 + val ShutdownThreads = 1 } /** @@ -54,6 +56,8 @@ object Defaults { * @param compact Should old segments in this log be deleted or deduplicated? * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property * but included here for topic-specific configuration validation purposes + * @param recoveryThreads The number of threads to be used when recovering unflushed segments + * @param shutdownThreads The number of threads to be used during shutdown */ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val segmentMs: Long = Defaults.SegmentMs, @@ -68,8 +72,10 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val deleteRetentionMs: Long = Defaults.DeleteRetentionMs, val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, val compact: Boolean = Defaults.Compact, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) { - + val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, + val recoveryThreads: Int = Defaults.RecoveryThreads, + val shutdownThreads: Int = Defaults.ShutdownThreads) { + def toProps: Properties = { val props = new Properties() import LogConfig._ @@ -87,9 +93,11 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) props.put(CleanupPolicyProp, if(compact) "compact" else "delete") props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) + props.put(RecoveryThreads, recoveryThreads.toString) + props.put(ShutdownThreads, shutdownThreads.toString) props } - + } object LogConfig { @@ -107,6 +115,8 @@ object LogConfig { val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" val CleanupPolicyProp = "cleanup.policy" val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" + val RecoveryThreads = "recovery.threads" + val ShutdownThreads = "shutdown.threads" val ConfigNames = Set(SegmentBytesProp, SegmentMsProp, @@ -121,9 +131,11 @@ object LogConfig { DeleteRetentionMsProp, MinCleanableDirtyRatioProp, CleanupPolicyProp, - UncleanLeaderElectionEnableProp) - - + UncleanLeaderElectionEnableProp, + RecoveryThreads, + ShutdownThreads) + + /** * Parse the given properties instance into a LogConfig object */ @@ -144,9 +156,11 @@ object LogConfig { compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete") .trim.toLowerCase != "delete", uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp, - Defaults.UncleanLeaderElectionEnable.toString).toBoolean) + Defaults.UncleanLeaderElectionEnable.toString).toBoolean, + recoveryThreads = props.getProperty(RecoveryThreads, Defaults.RecoveryThreads.toString).toInt, + shutdownThreads = props.getProperty(ShutdownThreads, Defaults.ShutdownThreads.toString).toInt) } - + /** * Create a log config instance using the given properties and defaults */ diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 1946c94..edd5998 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -23,6 +23,7 @@ import kafka.utils._ import scala.collection._ import kafka.common.{TopicAndPartition, KafkaException} import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint} +import java.util.concurrent.{Executors, ExecutorService, ExecutionException} /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. @@ -39,6 +40,8 @@ class LogManager(val logDirs: Array[File], val topicConfigs: Map[String, LogConfig], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig, + recoveryThreads: Int, + shutdownThreads: Int, val flushCheckMs: Long, val flushCheckpointMs: Long, val retentionCheckMs: Long, @@ -101,35 +104,55 @@ class LogManager(val logDirs: Array[File], /** * Recover and load all logs in the given data directories */ - private def loadLogs(dirs: Seq[File]) { - for(dir <- dirs) { - val recoveryPoints = this.recoveryPointCheckpoints(dir).read - /* load the logs */ - val subDirs = dir.listFiles() - if(subDirs != null) { - val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) - if(cleanShutDownFile.exists()) - info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) - else - brokerState.newState(RecoveringFromUncleanShutdown) - - for(dir <- subDirs) { - if(dir.isDirectory) { - info("Loading log '" + dir.getName + "'") - val topicPartition = Log.parseTopicPartitionName(dir.getName) - val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) - val log = new Log(dir, - config, - recoveryPoints.getOrElse(topicPartition, 0L), - scheduler, - time) - val previous = this.logs.put(topicPartition, log) - if(previous != null) - throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) - } + private def loadLogs(dirs: Seq[File]): Unit = { + var pool: ExecutorService = null + + try { + val jobs = dirs.map { dir => + Utils.runnable { () => + loadDir(dir) + } + } + val poolSize = recoveryThreads + pool = Executors.newFixedThreadPool(poolSize) + jobs.map(pool.submit).foreach(_.get) + } catch { + case e: ExecutionException => { + error("There was an error in one of the threads during logs loading: " + e.getCause) + throw e.getCause + } + } finally { + pool.shutdown() + } + } + + private def loadDir(dir: File): Unit = { + val recoveryPoints = this.recoveryPointCheckpoints(dir).read + /* load the logs */ + val subDirs = dir.listFiles() + if (subDirs != null) { + val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) + if (cleanShutDownFile.exists()) { + debug("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) + } else { + brokerState.newState(RecoveringFromUncleanShutdown) + } + + for (dir <- subDirs if dir.isDirectory) { + debug("Loading log '" + dir.getName + "'") + val topicPartition = Log.parseTopicPartitionName(dir.getName) + val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) + val log = new Log(dir, + config, + recoveryPoints.getOrElse(topicPartition, 0L), + scheduler, + time) + val previous = this.logs.put(topicPartition, log) + if (previous != null) { + throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } - cleanShutDownFile.delete() } + cleanShutDownFile.delete() } } @@ -160,31 +183,65 @@ class LogManager(val logDirs: Array[File], if(cleanerConfig.enableCleaner) cleaner.startup() } - + /** * Close all the logs */ def shutdown() { info("Shutting down.") + try { // stop the cleaner first - if(cleaner != null) + if (cleaner != null) { Utils.swallow(cleaner.shutdown()) - // flush the logs to ensure latest possible recovery point - allLogs.foreach(_.flush()) - // close the logs - allLogs.foreach(_.close()) - // update the last flush point - checkpointRecoveryPointOffsets() - // mark that the shutdown was clean by creating the clean shutdown marker file - logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())) + } + + var pool: ExecutorService = null + + // close logs in each dir + for (dir <- this.logDirs) { + debug("Flushing and closing logs at " + dir) + + val dirLogs = logsByDir.getOrElse(dir.toString, Map()).values + + try { + val jobs = dirLogs map { log => + Utils.runnable { () => + // flush the log to ensure latest possible recovery point + log.flush() + log.close() + } + } + + val poolSize = shutdownThreads + pool = Executors.newFixedThreadPool(poolSize) + jobs.map(pool.submit).foreach(_.get) + } catch { + case e: ExecutionException => { + error("There was an error in one of the threads during LogManager shutdown: " + e.getCause) + throw e.getCause + } + } finally { + pool.shutdown() + } + + // update the last flush point + debug("Updating recovery points at " + dir) + checkpointLogsInDir(dir) + + // mark that the shutdown was clean by creating marker file + debug("Writing clean shutdown marker at " + dir) + Utils.swallow(new File(dir.toString, Log.CleanShutdownFile).createNewFile()) + } } finally { // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) } + info("Shutdown complete.") } + /** * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset * @@ -226,16 +283,21 @@ class LogManager(val logDirs: Array[File], } /** - * Write out the current recovery point for all logs to a text file in the log directory + * Write out the current recovery point for all logs to a text file in the log directory * to avoid recovering the whole log on startup. */ def checkpointRecoveryPointOffsets() { - val recoveryPointsByDir = this.logsByTopicPartition.groupBy(_._2.dir.getParent.toString) - for(dir <- logDirs) { - val recoveryPoints = recoveryPointsByDir.get(dir.toString) - if(recoveryPoints.isDefined) - this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) - } + this.logDirs.foreach(checkpointLogsInDir) + } + + /** + * Make a checkpoint for all logs in provided directory. + */ + def checkpointLogsInDir(dir: File): Unit = { + for { + recoveryPoints <- this.logsByDir.get(dir.toString) + recoveryCheckpoints <- this.recoveryPointCheckpoints.get(dir) + } recoveryCheckpoints.write(recoveryPoints.mapValues(_.recoveryPoint)) } /** @@ -366,13 +428,22 @@ class LogManager(val logDirs: Array[File], * Get all the partition logs */ def allLogs(): Iterable[Log] = logs.values - + /** * Get a map of TopicAndPartition => Log */ def logsByTopicPartition = logs.toMap /** + * Map of log dir to logs by topic and partitions in that dir + */ + private def logsByDir = { + this.logsByTopicPartition.groupBy { + case (_, log) => log.dir.getParent + } + } + + /** * Flush any log which has exceeded its flush interval and has unwritten messages. */ private def flushDirtyLogs() = { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 50b09ed..b3ca7c7 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -190,6 +190,12 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */ val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue)) + /* the number of threads to be used when recovering unflushed segments */ + val logRecoveryThreads = props.getIntInRange("log.recovery.threads", 1, (1, Int.MaxValue)) + + /* the number of threads to be used during shutdown */ + val logShutdownThreads = props.getIntInRange("log.recovery.threads", 1, (1, Int.MaxValue)) + /* enable auto creation of topic on the server */ val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index def1dc2..9daf128 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -287,7 +287,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg deleteRetentionMs = config.logCleanerDeleteRetentionMs, fileDeleteDelayMs = config.logDeleteDelayMs, minCleanableRatio = config.logCleanerMinCleanRatio, - compact = config.logCleanupPolicy.trim.toLowerCase == "compact") + compact = config.logCleanupPolicy.trim.toLowerCase == "compact", + recoveryThreads = config.logRecoveryThreads, + shutdownThreads = config.logShutdownThreads) val defaultProps = defaultLogConfig.toProps val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) // read the log configurations from zookeeper @@ -303,6 +305,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigs = configs, defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, + recoveryThreads = config.logRecoveryThreads, + shutdownThreads = config.logShutdownThreads, flushCheckMs = config.logFlushSchedulerIntervalMs, flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index d03d4c4..d10f469 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -40,16 +40,7 @@ class LogManagerTest extends JUnit3Suite { override def setUp() { super.setUp() logDir = TestUtils.tempDir() - logManager = new LogManager(logDirs = Array(logDir), - topicConfigs = Map(), - defaultConfig = logConfig, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 100000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - time = time, - brokerState = new BrokerState()) + logManager = createLogManager(logDirs = Array(logDir)) logManager.startup logDir = logManager.logDirs(0) } @@ -61,7 +52,7 @@ class LogManagerTest extends JUnit3Suite { logManager.logDirs.map(Utils.rm(_)) super.tearDown() } - + /** * Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log. */ @@ -97,9 +88,9 @@ class LogManagerTest extends JUnit3Suite { offset = info.lastOffset } assertTrue("There should be more than one segment now.", log.numberOfSegments > 1) - + log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds)) - + time.sleep(maxLogAgeMs + 1) assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) @@ -125,18 +116,7 @@ class LogManagerTest extends JUnit3Suite { logManager.shutdown() val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L) - logManager = new LogManager( - logDirs = Array(logDir), - topicConfigs = Map(), - defaultConfig = config, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 100000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - brokerState = new BrokerState(), - time = time - ) + logManager = createLogManager(logDirs = Array(logDir)) logManager.startup // create a log @@ -176,18 +156,9 @@ class LogManagerTest extends JUnit3Suite { def testTimeBasedFlush() { logManager.shutdown() val config = logConfig.copy(flushMs = 1000) - logManager = new LogManager( + logManager = createLogManager( logDirs = Array(logDir), - topicConfigs = Map(), - defaultConfig = config, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 10000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - brokerState = new BrokerState(), - time = time - ) + flushCheckpointMs = 10000L) logManager.startup val log = logManager.createLog(TopicAndPartition(name, 0), config) val lastFlush = log.lastFlushTime @@ -198,30 +169,21 @@ class LogManagerTest extends JUnit3Suite { time.sleep(logManager.InitialTaskDelayMs) assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime) } - + /** * Test that new logs that are created are assigned to the least loaded log directory */ @Test def testLeastLoadedAssignment() { // create a log manager with multiple data directories - val dirs = Array(TestUtils.tempDir(), - TestUtils.tempDir(), + val dirs = Array(TestUtils.tempDir(), + TestUtils.tempDir(), TestUtils.tempDir()) logManager.shutdown() - logManager = new LogManager( + logManager = createLogManager( logDirs = dirs, - topicConfigs = Map(), - defaultConfig = logConfig, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 10000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - brokerState = new BrokerState(), - time = time - ) - + flushCheckpointMs = 10000L) + // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { logManager.createLog(TopicAndPartition("test", partition), logConfig) @@ -230,28 +192,19 @@ class LogManagerTest extends JUnit3Suite { assertTrue("Load should balance evenly", counts.max <= counts.min + 1) } } - + /** * Test that it is not possible to open two log managers using the same data directory */ @Test def testTwoLogManagersUsingSameDirFails() { try { - new LogManager( + createLogManager( logDirs = Array(logDir), - topicConfigs = Map(), - defaultConfig = logConfig, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 10000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - brokerState = new BrokerState(), - time = time - ) + flushCheckpointMs = 10000L) fail("Should not be able to create a second log manager instance with the same data directory") } catch { - case e: KafkaException => // this is good + case e: KafkaException => // this is good } } @@ -270,16 +223,8 @@ class LogManagerTest extends JUnit3Suite { def testRecoveryDirectoryMappingWithTrailingSlash() { logManager.shutdown() logDir = TestUtils.tempDir() - logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)), - topicConfigs = Map(), - defaultConfig = logConfig, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 100000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - time = time, - brokerState = new BrokerState()) + logManager = createLogManager( + logDirs = Array(new File(logDir.getAbsolutePath + File.separator))) logManager.startup verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) } @@ -293,16 +238,7 @@ class LogManagerTest extends JUnit3Suite { logDir = new File("data" + File.separator + logDir.getName) logDir.mkdirs() logDir.deleteOnExit() - logManager = new LogManager(logDirs = Array(logDir), - topicConfigs = Map(), - defaultConfig = logConfig, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 100000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - time = time, - brokerState = new BrokerState()) + logManager = createLogManager(logDirs = Array(logDir)) logManager.startup verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) } @@ -327,4 +263,23 @@ class LogManagerTest extends JUnit3Suite { } } } + + + private def createLogManager(logDirs: Array[File], + flushCheckpointMs: Long = 100000L) = + { + new LogManager( + logDirs = logDirs, + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + recoveryThreads = 4, + shutdownThreads = 4, + flushCheckMs = 1000L, + flushCheckpointMs = flushCheckpointMs, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time, + brokerState = new BrokerState()) + } } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 558a5d6..8b1d243 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -36,6 +36,8 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { topicConfigs = Map(), defaultConfig = LogConfig(), cleanerConfig = CleanerConfig(), + recoveryThreads = 4, + shutdownThreads = 4, flushCheckMs = 30000, flushCheckpointMs = 10000L, retentionCheckMs = 30000, @@ -147,4 +149,4 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 518d416..3453951 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -65,6 +65,8 @@ class ReplicaManagerTest extends JUnit3Suite { topicConfigs = Map(), defaultConfig = new LogConfig(), cleanerConfig = CleanerConfig(enableCleaner = false), + recoveryThreads = 4, + shutdownThreads = 4, flushCheckMs = 1000L, flushCheckpointMs = 100000L, retentionCheckMs = 1000L, -- 2.0.1