diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 227c90d..79ecb8c 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -86,7 +86,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var zkClient: ZkClient = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]] - private val scheduler = new KafkaScheduler(1) + private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-") private val messageStreamCreated = new AtomicBoolean(false) private var sessionExpirationListener: ZKSessionExpireListener = null @@ -114,8 +114,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (config.autoCommit) { scheduler.startup info("starting auto committer every " + config.autoCommitIntervalMs + " ms") - scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs, - config.autoCommitIntervalMs, false) + scheduler.schedule("kafka-consumer-autocommit", + autoCommit, + delay = config.autoCommitIntervalMs, + period = config.autoCommitIntervalMs, + unit = TimeUnit.MILLISECONDS) } KafkaMetricsReporter.startReporters(config.props) @@ -160,7 +163,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, wildcardTopicWatcher.shutdown() try { if (config.autoCommit) - scheduler.shutdownNow() + scheduler.shutdown() fetcher match { case Some(f) => f.shutdown case None => diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index a12833f..88843cd 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -394,7 +394,7 @@ class Log(val dir: File, maxIndexSize = maxIndexSize) val prev = segments.put(segment.baseOffset, segment) if(prev != null) - throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset)) + throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(dir.getName, newOffset)) segment } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 69ef5ea..c5e0e81 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -18,6 +18,7 @@ package kafka.log import java.io._ +import java.util.concurrent.TimeUnit import kafka.utils._ import scala.collection._ import kafka.common.{TopicAndPartition, KafkaException} @@ -35,12 +36,13 @@ import kafka.server.KafkaConfig * A background thread handles log retention by periodically truncating excess log segments. */ @threadsafe -class LogManager(val config: KafkaConfig, - scheduler: KafkaScheduler, - private val time: Time) extends Logging { +private[kafka] class LogManager(val config: KafkaConfig, + scheduler: Scheduler, + private val time: Time) extends Logging { val CleanShutdownFile = ".kafka_cleanshutdown" val LockFile = ".lock" + val InitialTaskDelayMs = 30*1000 val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray private val logFileSizeMap = config.logFileSizeMap private val logFlushInterval = config.flushInterval @@ -138,15 +140,19 @@ class LogManager(val config: KafkaConfig, def startup() { /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { - info("Starting log cleaner every " + logCleanupIntervalMs + " ms") - scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false) - info("Starting log flusher every " + config.flushSchedulerThreadRate + - " ms with the following overrides " + logFlushIntervals) - scheduler.scheduleWithRate(flushDirtyLogs, - "kafka-logflusher-", - config.flushSchedulerThreadRate, - config.flushSchedulerThreadRate, - isDaemon = false) + info("Starting log cleanup with a period of %d ms.".format(logCleanupIntervalMs)) + scheduler.schedule("kafka-log-cleaner", + cleanupLogs, + delay = InitialTaskDelayMs, + period = logCleanupIntervalMs, + TimeUnit.MILLISECONDS) + info("Starting log flusher with a default period of %d ms with the following overrides: %s." + .format(config.defaultFlushIntervalMs, logFlushIntervals.map(e => e._1.toString + "=" + e._2.toString).mkString(", "))) + scheduler.schedule("kafka-log-flusher", + flushDirtyLogs, + delay = InitialTaskDelayMs, + period = config.flushSchedulerThreadRate, + TimeUnit.MILLISECONDS) } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3488908..1431dbc 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -48,6 +48,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the number of io threads that the server uses for carrying out network requests */ val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue)) + /* the number of threads to use for various background processing tasks */ + val backgroundThreads = props.getIntInRange("background.threads", 4, (1, Int.MaxValue)) + /* the number of queued requests allowed before blocking the network threads */ val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 27bd288..725226a 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -40,7 +40,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var replicaManager: ReplicaManager = null var apis: KafkaApis = null var kafkaController: KafkaController = null - val kafkaScheduler = new KafkaScheduler(4) + val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var zkClient: ZkClient = null /** @@ -53,7 +53,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg shutdownLatch = new CountDownLatch(1) /* start scheduler */ - kafkaScheduler.startup + kafkaScheduler.startup() /* start log manager */ logManager = new LogManager(config, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 42068ca..6a8213c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -38,7 +38,7 @@ object ReplicaManager { class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, - kafkaScheduler: KafkaScheduler, + scheduler: Scheduler, val logManager: LogManager) extends Logging with KafkaMetricsGroup { /* epoch of the controller that last changed the leader */ @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 @@ -72,7 +72,7 @@ class ReplicaManager(val config: KafkaConfig, def startHighWaterMarksCheckPointThread() = { if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) - kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs) + scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.highWaterMarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS) } /** @@ -91,7 +91,7 @@ class ReplicaManager(val config: KafkaConfig, def startup() { // start ISR expiration thread - kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs) + scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaMaxLagTimeMs, unit = TimeUnit.MILLISECONDS) } def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = { diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 5a222b8..ff369f7 100644 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -22,51 +22,97 @@ import atomic._ import collection.mutable.HashMap /** - * A scheduler for running jobs in the background + * A scheduler for running jobs + * + * This interface controls a job scheduler that allows scheduling either repeating background jobs + * that execute periodically or delayed one-time actions that are scheduled in the future. */ -class KafkaScheduler(val numThreads: Int) extends Logging { - private var executor:ScheduledThreadPoolExecutor = null - private val daemonThreadFactory = new ThreadFactory() { - def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, true) - } - private val nonDaemonThreadFactory = new ThreadFactory() { - def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, false) - } - private val threadNamesAndIds = new HashMap[String, AtomicInteger]() +trait Scheduler { + + /** + * Initialize this scheduler so it is ready to accept scheduling of tasks + */ + def startup() + + /** + * Shutdown this scheduler. When this method is complete no more executions of background tasks will occur. + * This includes tasks scheduled with a delayed execution. + */ + def shutdown() + + /** + * Schedule a task + * @param name The name of this task + * @param delay The amount of time to wait before the first execution + * @param period The period with which to execute the task. If < 0 the task will execute only once. + * @param unit The unit for the preceding times. + */ + def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) +} - def startup() = { - executor = new ScheduledThreadPoolExecutor(numThreads) - executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) - executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) +/** + * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor + * + * It has a pool of kafka-scheduler- threads that do the actual work. + * + * @param threads The number of threads in the thread pool + * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it. + * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown. + */ +@threadsafe +class KafkaScheduler(val threads: Int, + val threadNamePrefix: String = "kafka-scheduler-", + daemon: Boolean = true) extends Scheduler with Logging { + @volatile private var executor: ScheduledThreadPoolExecutor = null + private val schedulerThreadId = new AtomicInteger(0) + + override def startup() { + debug("Initializing task scheduler.") + this synchronized { + if(executor != null) + throw new IllegalStateException("This scheduler has already been started!") + executor = new ScheduledThreadPoolExecutor(threads) + executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) + executor.setThreadFactory(new ThreadFactory() { + def newThread(runnable: Runnable): Thread = + Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon) + }) + } } - - def hasShutdown: Boolean = executor.isShutdown - - private def ensureExecutorHasStarted = { - if(executor == null) - throw new IllegalStateException("Kafka scheduler has not been started") + + override def shutdown() { + debug("Shutting down task scheduler.") + ensureStarted + executor.shutdown() + executor.awaitTermination(1, TimeUnit.DAYS) + this.executor = null } - def scheduleWithRate(fun: () => Unit, name: String, delayMs: Long, periodMs: Long, isDaemon: Boolean = true) = { - ensureExecutorHasStarted - if(isDaemon) - executor.setThreadFactory(daemonThreadFactory) + def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) = { + debug("Scheduling task %s with initial delay %d ms and period %d ms." + .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit))) + ensureStarted + val runnable = new Runnable { + def run() = { + try { + trace("Begining execution of scheduled task '%s'.".format(name)) + fun() + } catch { + case t => error("Uncaught exception in scheduled task '" + name +"'", t) + } finally { + trace("Completed execution of scheduled task '%s'.".format(name)) + } + } + } + if(period >= 0) + executor.scheduleAtFixedRate(runnable, delay, period, unit) else - executor.setThreadFactory(nonDaemonThreadFactory) - val threadId = threadNamesAndIds.getOrElseUpdate(name, new AtomicInteger(0)) - executor.scheduleAtFixedRate(Utils.loggedRunnable(fun, name + threadId.incrementAndGet()), delayMs, periodMs, - TimeUnit.MILLISECONDS) - } - - def shutdownNow() { - ensureExecutorHasStarted - executor.shutdownNow() - info("Forcing shutdown of Kafka scheduler") + executor.schedule(runnable, delay, unit) } - - def shutdown() { - ensureExecutorHasStarted - executor.shutdown() - info("Shutdown Kafka scheduler") + + private def ensureStarted = { + if(executor == null) + throw new IllegalStateException("Kafka scheduler has not been started") } } diff --git a/core/src/main/scala/kafka/utils/MockTime.scala b/core/src/main/scala/kafka/utils/MockTime.scala deleted file mode 100644 index 5296aba..0000000 --- a/core/src/main/scala/kafka/utils/MockTime.scala +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import java.util.concurrent._ - -class MockTime(@volatile var currentMs: Long) extends Time { - - def this() = this(System.currentTimeMillis) - - def milliseconds: Long = currentMs - - def nanoseconds: Long = - TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS) - - def sleep(ms: Long): Unit = - currentMs += ms - -} diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 42e3e18..d383f54 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -52,26 +52,6 @@ object Utils extends Logging { new Runnable() { def run() = fun() } - - /** - * Wrap the given function in a java.lang.Runnable that logs any errors encountered - * @param fun A function - * @return A Runnable that just executes the function - */ - def loggedRunnable(fun: () => Unit, name: String): Runnable = - new Runnable() { - def run() = { - Thread.currentThread().setName(name) - try { - fun() - } - catch { - case t => - // log any error and the stack trace - error("error in loggedRunnable", t) - } - } - } /** * Create a daemon thread diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index d9f189e..8ba5c48 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -35,7 +35,6 @@ class LogManagerTest extends JUnit3Suite { var config: KafkaConfig = null val name = "kafka" val veryLargeLogFlushInterval = 10000000L - val scheduler = new KafkaScheduler(2) override def setUp() { super.setUp() @@ -44,14 +43,12 @@ class LogManagerTest extends JUnit3Suite { override val flushInterval = 10000 override val logRetentionHours = maxLogAgeHours } - scheduler.startup - logManager = new LogManager(config, scheduler, time) + logManager = new LogManager(config, time.scheduler, time) logManager.startup logDir = logManager.logDirs(0) } override def tearDown() { - scheduler.shutdown() if(logManager != null) logManager.shutdown() Utils.rm(logDir) @@ -97,10 +94,9 @@ class LogManagerTest extends JUnit3Suite { assertTrue("There should be more than one segment now.", log.numberOfSegments > 1) // update the last modified time of all log segments - log.logSegments.foreach(_.log.file.setLastModified(time.currentMs)) + log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds)) - time.currentMs += maxLogAgeHours*60*60*1000 + 1 - logManager.cleanupLogs() + time.sleep(maxLogAgeHours*60*60*1000 + 1) assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments) assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes) @@ -120,18 +116,14 @@ class LogManagerTest extends JUnit3Suite { @Test def testCleanupSegmentsToMaintainSize() { val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes - val retentionHours = 1 - val retentionMs = 1000 * 60 * 60 * retentionHours val props = TestUtils.createBrokerConfig(0, -1) logManager.shutdown() config = new KafkaConfig(props) { override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] - override val logRetentionHours = retentionHours - override val flushInterval = 100 override val logRollHours = maxRollInterval } - logManager = new LogManager(config, scheduler, time) + logManager = new LogManager(config, time.scheduler, time) logManager.startup // create a log @@ -149,7 +141,7 @@ class LogManagerTest extends JUnit3Suite { assertEquals("There should be example 100 segments.", 100, log.numberOfSegments) // this cleanup shouldn't find any expired segments but should delete some to reduce size - logManager.cleanupLogs() + time.sleep(logManager.InitialTaskDelayMs) assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments) assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes) try { @@ -170,11 +162,11 @@ class LogManagerTest extends JUnit3Suite { val props = TestUtils.createBrokerConfig(0, -1) logManager.shutdown() config = new KafkaConfig(props) { - override val flushSchedulerThreadRate = 5 - override val defaultFlushIntervalMs = 5 + override val flushSchedulerThreadRate = 1000 + override val defaultFlushIntervalMs = 1000 override val flushInterval = Int.MaxValue } - logManager = new LogManager(config, scheduler, SystemTime) + logManager = new LogManager(config, time.scheduler, time) logManager.startup val log = logManager.getOrCreateLog(name, 0) val lastFlush = log.lastFlushTime @@ -182,7 +174,7 @@ class LogManagerTest extends JUnit3Suite { var set = TestUtils.singleMessageSet("test".getBytes()) log.append(set) } - Thread.sleep(config.flushSchedulerThreadRate) + time.sleep(logManager.InitialTaskDelayMs) assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime) } @@ -198,7 +190,7 @@ class LogManagerTest extends JUnit3Suite { TestUtils.tempDir().getAbsolutePath) props.put("log.directories", dirs.mkString(",")) logManager.shutdown() - logManager = new LogManager(new KafkaConfig(props), scheduler, time) + logManager = new LogManager(new KafkaConfig(props), time.scheduler, time) // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { @@ -214,7 +206,7 @@ class LogManagerTest extends JUnit3Suite { */ def testTwoLogManagersUsingSameDirFails() { try { - new LogManager(logManager.config, scheduler, time) + new LogManager(logManager.config, time.scheduler, time) fail("Should not be able to create a second log manager instance with the same data directory") } catch { case e: KafkaException => // this is good diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index ce44b01..87a89ee 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -66,7 +66,7 @@ class LogTest extends JUnitSuite { // create a log val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time) - time.currentMs += rollMs + 1 + time.sleep(rollMs + 1) // segment age is less than its limit log.append(set) @@ -76,13 +76,13 @@ class LogTest extends JUnitSuite { assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments) for(numSegments <- 2 until 4) { - time.currentMs += rollMs + 1 + time.sleep(rollMs + 1) log.append(set) assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments) } val numSegments = log.numberOfSegments - time.currentMs += rollMs + 1 + time.sleep(rollMs + 1) log.append(new ByteBufferMessageSet()) assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments) } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index f30b097..b609585 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -24,7 +24,7 @@ import org.junit._ import org.junit.Assert._ import kafka.common.KafkaException import kafka.cluster.Replica -import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils} +import kafka.utils._ class HighwatermarkPersistenceTest extends JUnit3Suite { diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala new file mode 100644 index 0000000..724ffcf --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils + +import scala.collection._ +import java.util.concurrent.TimeUnit + +/** + * A mock scheduler that executes tasks synchronously using a mock time instance. Tasks are executed synchronously when + * the time is advanced. This class is meant to be used in conjunction with MockTime. + * + * Example usage + * + * val time = new MockTime + * time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000) + * time.sleep(1001) // this should cause our scheduled task to fire + * + * + * Two gotchas: + *
    + *
  1. Incrementing the time by more than one task period will result in the correct number of executions of each scheduled task + * but the order of these executions is not specified. + *
  2. Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time) + *
+ */ +class MockScheduler(val time: Time) extends Scheduler { + + var tasks = mutable.ArrayBuffer[MockScheduled]() + + def startup() {} + + def shutdown() { + tasks.clear() + } + + /** + * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs + * when this method is called and the execution happens synchronously in the calling thread. + * If you are using the scheduler associated with a MockTime instance this call will happen automatically. + */ + def tick() { + var tasks = mutable.ArrayBuffer[MockScheduled]() + val now = time.milliseconds + for(task <- this.tasks) { + if(task.nextExecution <= now) { + if(task.period >= 0) { + val executions = (now - task.nextExecution) / task.period + for(i <- 0 to executions.toInt) + task.fun() + task.nextExecution += (executions + 1) * task.period + tasks += task + } else { + task.fun() + } + } else { + tasks += task + } + } + this.tasks = tasks + } + + def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) { + tasks += MockScheduled(name, fun, time.milliseconds + delay, period = period) + tick() + } + +} + +case class MockScheduled(val name: String, val fun: () => Unit, var nextExecution: Long, val period: Long) \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/MockTime.scala b/core/src/test/scala/unit/kafka/utils/MockTime.scala new file mode 100644 index 0000000..ee65748 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/MockTime.scala @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import java.util.concurrent._ + +/** + * A class used for unit testing things which depend on the Time interface. + * + * This class never manually advances the clock, it only does so when you call + * sleep(ms) + * + * It also comes with an associated scheduler instance for managing background tasks in + * a deterministic way. + */ +class MockTime(@volatile private var currentMs: Long) extends Time { + + val scheduler = new MockScheduler(this) + + def this() = this(System.currentTimeMillis) + + def milliseconds: Long = currentMs + + def nanoseconds: Long = + TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS) + + def sleep(ms: Long) { + this.currentMs += ms + scheduler.tick() + } + + override def toString() = "MockTime(%d)".format(milliseconds) + +} diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala new file mode 100644 index 0000000..ae16a71 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils + +import junit.framework.Assert._ +import java.util.concurrent.atomic._ +import org.junit.{Test, After, Before} +import kafka.utils.TestUtils.retry + +class SchedulerTest { + + val scheduler = new KafkaScheduler(1) + val mockTime = new MockTime + val counter1 = new AtomicInteger(0) + val counter2 = new AtomicInteger(0) + + @Before + def setup() { + scheduler.startup() + } + + @After + def teardown() { + scheduler.shutdown() + } + + @Test + def testMockSchedulerNonPeriodicTask() { + mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1) + mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=100) + assertEquals("Counter1 should not be incremented prior to task running.", 0, counter1.get) + assertEquals("Counter2 should not be incremented prior to task running.", 0, counter2.get) + mockTime.sleep(1) + assertEquals("Counter1 should be incremented", 1, counter1.get) + assertEquals("Counter2 should not be incremented", 0, counter2.get) + mockTime.sleep(100000) + assertEquals("More sleeping should not result in more incrementing on counter1.", 1, counter1.get) + assertEquals("Counter2 should now be incremented.", 1, counter2.get) + } + + @Test + def testMockSchedulerPeriodicTask() { + mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1, period=1) + mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=100, period=100) + assertEquals("Counter1 should not be incremented prior to task running.", 0, counter1.get) + assertEquals("Counter2 should not be incremented prior to task running.", 0, counter2.get) + mockTime.sleep(1) + assertEquals("Counter1 should be incremented", 1, counter1.get) + assertEquals("Counter2 should not be incremented", 0, counter2.get) + mockTime.sleep(100) + assertEquals("Counter1 should be incremented 101 times", 101, counter1.get) + assertEquals("Counter2 should not be incremented once", 1, counter2.get) + } + + @Test + def testNonPeriodicTask() { + scheduler.schedule("test", counter1.getAndIncrement, delay = 0) + retry(30000, () => assertEquals(counter1.get, 1)) + Thread.sleep(5) + assertEquals("Should only run once", 1, counter1.get) + } + + @Test + def testPeriodicTask() { + scheduler.schedule("test", counter1.getAndIncrement, delay = 0, period = 5) + retry(30000, () => assertTrue("Should count to 20", counter1.get >= 20)) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 6863565..4ef5bd4 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -433,17 +433,21 @@ object TestUtils extends Logging { * Execute the given block. If it throws an assert error, retry. Repeat * until no error is thrown or the time limit ellapses */ - def retry(waitTime: Long, block: () => Unit) { + def retry(maxWaitMs: Long, block: () => Unit) { + var wait = 1L val startTime = System.currentTimeMillis() while(true) { try { block() + return } catch { case e: AssertionError => - if(System.currentTimeMillis - startTime > waitTime) + if(System.currentTimeMillis - startTime > maxWaitMs) { throw e - else - Thread.sleep(100) + } else { + Thread.sleep(wait) + wait += math.min(wait, 1000) + } } } }