From 56127c81065cd13ab923f8662cdd5b096021c43c Mon Sep 17 00:00:00 2001 From: Anton Karamanov Date: Tue, 15 Jul 2014 21:42:15 +0400 Subject: [PATCH] KAFKA-1414 Speedup broker shutdown and startup after hard reset; patched by Alexey Ozeritskiy, Dmitry Bugaychenko and Anton Karamanov --- config/server.properties | 4 + core/src/main/scala/kafka/log/LogManager.scala | 206 ++++++++++++++++----- core/src/main/scala/kafka/server/KafkaConfig.scala | 3 + core/src/main/scala/kafka/server/KafkaServer.scala | 1 + .../main/scala/kafka/utils/KafkaScheduler.scala | 18 +- core/src/main/scala/kafka/utils/Utils.scala | 6 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 97 ++-------- .../server/HighwatermarkPersistenceTest.scala | 17 +- .../unit/kafka/server/ReplicaManagerTest.scala | 19 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 25 +++ 10 files changed, 227 insertions(+), 169 deletions(-) diff --git a/config/server.properties b/config/server.properties index f16c84c..cf129d4 100644 --- a/config/server.properties +++ b/config/server.properties @@ -62,6 +62,10 @@ log.dirs=/tmp/kafka-logs # the brokers. num.partitions=1 +# The number of threads to be used when performing io intensive operations such as +# log recovery and log flushing during startup and shutdown. +#log.io.parallelism=8 + ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 1946c94..31e4424 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -23,6 +23,7 @@ import kafka.utils._ import scala.collection._ import kafka.common.{TopicAndPartition, KafkaException} import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint} +import java.util.concurrent.{Executors, ExecutorService, ExecutionException, Future} /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. @@ -39,6 +40,7 @@ class LogManager(val logDirs: Array[File], val topicConfigs: Map[String, LogConfig], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig, + ioThreads: Int, val flushCheckMs: Long, val flushCheckpointMs: Long, val retentionCheckMs: Long, @@ -54,7 +56,7 @@ class LogManager(val logDirs: Array[File], createAndValidateLogDirs(logDirs) private val dirLocks = lockLogDirs(logDirs) private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap - loadLogs(logDirs) + loadLogs() private val cleaner: LogCleaner = if(cleanerConfig.enableCleaner) @@ -92,44 +94,102 @@ class LogManager(val logDirs: Array[File], dirs.map { dir => val lock = new FileLock(new File(dir, LockFile)) if(!lock.tryLock()) - throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + + throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + ". A Kafka instance in another process or thread is using this directory.") lock } } - + + /** + * Runs directory jobs in interleaved order to make it possible + * for threads to run on all directories at the same time. + * + * @param jobs Map of jobs grouped by arbitrary value. + * @param pool Thread pool to run jobs. + * @returns Map of Java futures grouped by the same value. + */ + private def interleavedJobsRun[G](jobs: Map[G, Seq[Runnable]], + pool: ExecutorService): Map[G, Seq[Future[_]]] = { + val longestJobQueue = jobs.values.map(_.size).max + val runningJobs = mutable.Map.empty[G, mutable.Seq[Future[_]]] + for { + i <- 0 until longestJobQueue + (group, groupJobs) <- jobs + // there's no `get` for lists, but `lift` does the same thing + job <- groupJobs.lift(i) + } { + val activeJob = pool.submit(job) + if (runningJobs.get(group).isEmpty) { + runningJobs(group) = mutable.Seq() + } + runningJobs(group) = runningJobs(group) :+ activeJob + } + + runningJobs + } + + /** * Recover and load all logs in the given data directories */ - private def loadLogs(dirs: Seq[File]) { - for(dir <- dirs) { + private def loadLogs(): Unit = { + val pool = Executors.newFixedThreadPool(ioThreads) + val jobsToRun = mutable.Map.empty[File, Seq[Runnable]] + + for (dir <- this.logDirs) { + val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) + + if (cleanShutdownFile.exists) { + debug( + "Found clean shutdown file. " + + "Skipping recovery for all logs in data directory: " + + dir.getAbsolutePath) + } else { + // log recovery itself is being performed by `Log` class during initialization + brokerState.newState(RecoveringFromUncleanShutdown) + } + val recoveryPoints = this.recoveryPointCheckpoints(dir).read - /* load the logs */ - val subDirs = dir.listFiles() - if(subDirs != null) { - val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) - if(cleanShutDownFile.exists()) - info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) - else - brokerState.newState(RecoveringFromUncleanShutdown) - - for(dir <- subDirs) { - if(dir.isDirectory) { - info("Loading log '" + dir.getName + "'") - val topicPartition = Log.parseTopicPartitionName(dir.getName) - val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) - val log = new Log(dir, - config, - recoveryPoints.getOrElse(topicPartition, 0L), - scheduler, - time) - val previous = this.logs.put(topicPartition, log) - if(previous != null) - throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) + + val jobsForDir = for { + dirContent <- Option(dir.listFiles).toList + logDir <- dirContent if logDir.isDirectory + } yield { + Utils.runnable { + debug("Loading log '" + logDir.getName + "'") + + val topicPartition = Log.parseTopicPartitionName(logDir.getName) + val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) + val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) + + val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) + val previous = this.logs.put(topicPartition, current) + + if (previous != null) { + throw new IllegalArgumentException( + "Duplicate log directories found: %s, %s!".format( + current.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } - cleanShutDownFile.delete() } + + jobsToRun(cleanShutdownFile) = jobsForDir.toSeq + } + + val runningJobs = interleavedJobsRun(jobsToRun, pool) + + try { + for ((cleanShutdownFile, dirJobs) <- runningJobs) { + dirJobs.foreach(_.get) + cleanShutdownFile.delete() + } + } catch { + case e: ExecutionException => { + error("There was an error in one of the threads during logs loading: " + e.getCause) + throw e.getCause + } + } finally { + pool.shutdown() } } @@ -160,31 +220,67 @@ class LogManager(val logDirs: Array[File], if(cleanerConfig.enableCleaner) cleaner.startup() } - + /** * Close all the logs */ def shutdown() { info("Shutting down.") + + val pool = Executors.newFixedThreadPool(ioThreads) + val jobsToRun = mutable.Map.empty[File, Seq[Runnable]] + + // stop the cleaner first + if (cleaner != null) { + Utils.swallow(cleaner.shutdown()) + } + + // close logs in each dir + for (dir <- this.logDirs) { + debug("Flushing and closing logs at " + dir) + + val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values + + val jobsForDir = logsInDir map { log => + Utils.runnable { + // flush the log to ensure latest possible recovery point + log.flush() + log.close() + } + } + + jobsToRun(dir) = jobsForDir.toSeq + } + + val runningJobs = interleavedJobsRun(jobsToRun, pool) + try { - // stop the cleaner first - if(cleaner != null) - Utils.swallow(cleaner.shutdown()) - // flush the logs to ensure latest possible recovery point - allLogs.foreach(_.flush()) - // close the logs - allLogs.foreach(_.close()) - // update the last flush point - checkpointRecoveryPointOffsets() - // mark that the shutdown was clean by creating the clean shutdown marker file - logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())) + for ((dir, dirJobs) <- runningJobs) { + dirJobs.foreach(_.get) + + // update the last flush point + debug("Updating recovery points at " + dir) + checkpointLogsInDir(dir) + + // mark that the shutdown was clean by creating marker file + debug("Writing clean shutdown marker at " + dir) + Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()) + } + } catch { + case e: ExecutionException => { + error("There was an error in one of the threads during LogManager shutdown: " + e.getCause) + throw e.getCause + } } finally { + pool.shutdown() // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) } + info("Shutdown complete.") } + /** * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset * @@ -230,14 +326,19 @@ class LogManager(val logDirs: Array[File], * to avoid recovering the whole log on startup. */ def checkpointRecoveryPointOffsets() { - val recoveryPointsByDir = this.logsByTopicPartition.groupBy(_._2.dir.getParent.toString) - for(dir <- logDirs) { - val recoveryPoints = recoveryPointsByDir.get(dir.toString) - if(recoveryPoints.isDefined) - this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) - } + this.logDirs.foreach(checkpointLogsInDir) } - + + /** + * Make a checkpoint for all logs in provided directory. + */ + private def checkpointLogsInDir(dir: File): Unit = { + for { + recoveryPoints <- this.logsByDir.get(dir.toString) + recoveryCheckpoints <- this.recoveryPointCheckpoints.get(dir) + } recoveryCheckpoints.write(recoveryPoints.mapValues(_.recoveryPoint)) + } + /** * Get the log if it exists, otherwise return None */ @@ -366,13 +467,22 @@ class LogManager(val logDirs: Array[File], * Get all the partition logs */ def allLogs(): Iterable[Log] = logs.values - + /** * Get a map of TopicAndPartition => Log */ def logsByTopicPartition = logs.toMap /** + * Map of log dir to logs by topic and partitions in that dir + */ + private def logsByDir = { + this.logsByTopicPartition.groupBy { + case (_, log) => log.dir.getParent + } + } + + /** * Flush any log which has exceeded its flush interval and has unwritten messages. */ private def flushDirtyLogs() = { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 50b09ed..1b3bea4 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -190,6 +190,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */ val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue)) + /* the number of threads to be used when performing io intensive tasks on logs */ + val logIOParallelism = props.getIntInRange("log.io.parallelism", 1, (1, Int.MaxValue)) + /* enable auto creation of topic on the server */ val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index def1dc2..6e3d053 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -303,6 +303,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigs = configs, defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, + ioThreads = config.logIOParallelism, flushCheckMs = config.logFlushSchedulerIntervalMs, flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 8e37505..9a16343 100644 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -93,16 +93,14 @@ class KafkaScheduler(val threads: Int, debug("Scheduling task %s with initial delay %d ms and period %d ms." .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit))) ensureStarted - val runnable = new Runnable { - def run() = { - try { - trace("Begining execution of scheduled task '%s'.".format(name)) - fun() - } catch { - case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t) - } finally { - trace("Completed execution of scheduled task '%s'.".format(name)) - } + val runnable = Utils.runnable { + try { + trace("Begining execution of scheduled task '%s'.".format(name)) + fun() + } catch { + case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t) + } finally { + trace("Completed execution of scheduled task '%s'.".format(name)) } } if(period >= 0) diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 63d3dda..c20f45e 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -49,9 +49,9 @@ object Utils extends Logging { * @param fun A function * @return A Runnable that just executes the function */ - def runnable(fun: () => Unit): Runnable = - new Runnable() { - def run() = fun() + def runnable(fun: => Unit): Runnable = + new Runnable { + def run() = fun } /** diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index d03d4c4..7d4c70c 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -35,21 +35,11 @@ class LogManagerTest extends JUnit3Suite { var logManager: LogManager = null val name = "kafka" val veryLargeLogFlushInterval = 10000000L - val cleanerConfig = CleanerConfig(enableCleaner = false) override def setUp() { super.setUp() logDir = TestUtils.tempDir() - logManager = new LogManager(logDirs = Array(logDir), - topicConfigs = Map(), - defaultConfig = logConfig, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 100000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - time = time, - brokerState = new BrokerState()) + logManager = createLogManager() logManager.startup logDir = logManager.logDirs(0) } @@ -125,18 +115,7 @@ class LogManagerTest extends JUnit3Suite { logManager.shutdown() val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L) - logManager = new LogManager( - logDirs = Array(logDir), - topicConfigs = Map(), - defaultConfig = config, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 100000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - brokerState = new BrokerState(), - time = time - ) + logManager = createLogManager() logManager.startup // create a log @@ -176,18 +155,7 @@ class LogManagerTest extends JUnit3Suite { def testTimeBasedFlush() { logManager.shutdown() val config = logConfig.copy(flushMs = 1000) - logManager = new LogManager( - logDirs = Array(logDir), - topicConfigs = Map(), - defaultConfig = config, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 10000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - brokerState = new BrokerState(), - time = time - ) + logManager = createLogManager() logManager.startup val log = logManager.createLog(TopicAndPartition(name, 0), config) val lastFlush = log.lastFlushTime @@ -209,19 +177,8 @@ class LogManagerTest extends JUnit3Suite { TestUtils.tempDir(), TestUtils.tempDir()) logManager.shutdown() - logManager = new LogManager( - logDirs = dirs, - topicConfigs = Map(), - defaultConfig = logConfig, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 10000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - brokerState = new BrokerState(), - time = time - ) - + logManager = createLogManager() + // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { logManager.createLog(TopicAndPartition("test", partition), logConfig) @@ -237,18 +194,7 @@ class LogManagerTest extends JUnit3Suite { @Test def testTwoLogManagersUsingSameDirFails() { try { - new LogManager( - logDirs = Array(logDir), - topicConfigs = Map(), - defaultConfig = logConfig, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 10000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - brokerState = new BrokerState(), - time = time - ) + createLogManager() fail("Should not be able to create a second log manager instance with the same data directory") } catch { case e: KafkaException => // this is good @@ -270,16 +216,8 @@ class LogManagerTest extends JUnit3Suite { def testRecoveryDirectoryMappingWithTrailingSlash() { logManager.shutdown() logDir = TestUtils.tempDir() - logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)), - topicConfigs = Map(), - defaultConfig = logConfig, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 100000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - time = time, - brokerState = new BrokerState()) + logManager = TestUtils.createLogManager( + logDirs = Array(new File(logDir.getAbsolutePath + File.separator))) logManager.startup verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) } @@ -293,16 +231,7 @@ class LogManagerTest extends JUnit3Suite { logDir = new File("data" + File.separator + logDir.getName) logDir.mkdirs() logDir.deleteOnExit() - logManager = new LogManager(logDirs = Array(logDir), - topicConfigs = Map(), - defaultConfig = logConfig, - cleanerConfig = cleanerConfig, - flushCheckMs = 1000L, - flushCheckpointMs = 100000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - time = time, - brokerState = new BrokerState()) + logManager = createLogManager() logManager.startup verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) } @@ -327,4 +256,12 @@ class LogManagerTest extends JUnit3Suite { } } } + + + private def createLogManager(logDirs: Array[File] = Array(this.logDir)): LogManager = { + TestUtils.createLogManager( + defaultConfig = logConfig, + logDirs = logDirs, + time = this.time) + } } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 558a5d6..e532c28 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -32,16 +32,11 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_)) val topic = "foo" - val logManagers = configs.map(config => new LogManager(logDirs = config.logDirs.map(new File(_)).toArray, - topicConfigs = Map(), - defaultConfig = LogConfig(), - cleanerConfig = CleanerConfig(), - flushCheckMs = 30000, - flushCheckpointMs = 10000L, - retentionCheckMs = 30000, - scheduler = new KafkaScheduler(1), - brokerState = new BrokerState(), - time = new MockTime)) + val logManagers = configs map { config => + TestUtils.createLogManager( + logDirs = config.logDirs.map(new File(_)).toArray, + cleanerConfig = CleanerConfig()) + } @After def teardown() { @@ -147,4 +142,4 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 518d416..a35dc0a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -37,7 +37,7 @@ class ReplicaManagerTest extends JUnit3Suite { val props = TestUtils.createBrokerConfig(1) val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val partition = rm.getOrCreatePartition(topic, 1, 1) @@ -51,26 +51,11 @@ class ReplicaManagerTest extends JUnit3Suite { props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val partition = rm.getOrCreatePartition(topic, 1, 1) partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() } - - private def createLogManager(logDirs: Array[File]): LogManager = { - val time = new MockTime() - return new LogManager(logDirs, - topicConfigs = Map(), - defaultConfig = new LogConfig(), - cleanerConfig = CleanerConfig(enableCleaner = false), - flushCheckMs = 1000L, - flushCheckpointMs = 100000L, - retentionCheckMs = 1000L, - scheduler = time.scheduler, - brokerState = new BrokerState(), - time = time) - } - } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3faa884..20dbaf1 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -39,6 +39,7 @@ import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition import kafka.admin.AdminUtils import kafka.producer.ProducerConfig +import kafka.log._ import junit.framework.AssertionFailedError import junit.framework.Assert._ @@ -689,6 +690,30 @@ object TestUtils extends Logging { def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) } + + + /** + * Create new LogManager instance with default configuration for testing + */ + def createLogManager( + logDirs: Array[File] = Array.empty[File], + defaultConfig: LogConfig = LogConfig(), + cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false), + time: MockTime = new MockTime()) = + { + new LogManager( + logDirs = logDirs, + topicConfigs = Map(), + defaultConfig = defaultConfig, + cleanerConfig = cleanerConfig, + ioThreads = 4, + flushCheckMs = 1000L, + flushCheckpointMs = 10000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time, + brokerState = new BrokerState()) + } } object TestZKUtils { -- 2.0.1