Index: core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (working copy) @@ -55,20 +55,14 @@ val logDirZkPath = propsZk.getProperty("log.dir") logDirZk = new File(logDirZkPath) serverZk = TestUtils.createServer(new KafkaConfig(propsZk)); - - Thread.sleep(100) - simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024) } @After override def tearDown() { simpleConsumerZk.close - serverZk.shutdown Utils.rm(logDirZk) - - Thread.sleep(500) super.tearDown() } @@ -149,8 +143,6 @@ for(i <- 1 to 5) info("test") - Thread.sleep(500) - val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build()) val fetchMessage = response.messageSet("test-topic", 0) Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -100,8 +100,8 @@ * USING THIS IS A SIGN YOU ARE NOT WRITING A REAL UNIT TEST * @param config The configuration of the server */ - def createServer(config: KafkaConfig): KafkaServer = { - val server = new KafkaServer(config) + def createServer(config: KafkaConfig, time: Time = SystemTime): KafkaServer = { + val server = new KafkaServer(config, time) server.startup() server } @@ -422,7 +422,6 @@ return true if (System.currentTimeMillis() > startTime + waitTime) return false - Thread.sleep(100) } // should never hit here throw new RuntimeException("unexpected error") Index: core/src/test/scala/unit/kafka/log/LogTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/log/LogTest.scala (working copy) @@ -22,13 +22,14 @@ import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} -import kafka.utils.{Utils, TestUtils, Range} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.common.{KafkaException, OffsetOutOfRangeException} +import kafka.utils.{MockTime, Utils, TestUtils, Range} class LogTest extends JUnitSuite { var logDir: File = null + val time = new MockTime @Before def setUp() { @@ -48,14 +49,14 @@ @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - new Log(logDir, 1024, 1000, false) + new Log(logDir, 1024, 1000, false, time) } @Test def testLoadInvalidLogsFails() { createEmptyLogs(logDir, 0, 15) try { - new Log(logDir, 1024, 1000, false) + new Log(logDir, 1024, 1000, false, time) fail("Allowed load of corrupt logs without complaint.") } catch { case e: KafkaException => "This is good" @@ -64,7 +65,7 @@ @Test def testAppendAndRead() { - val log = new Log(logDir, 1024, 1000, false) + val log = new Log(logDir, 1024, 1000, false, time) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 10) log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) @@ -81,7 +82,7 @@ @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) - val log = new Log(logDir, 1024, 1000, false) + val log = new Log(logDir, 1024, 1000, false, time) assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes) try { log.read(0, 1024) @@ -101,7 +102,7 @@ @Test def testLogRolls() { /* create a multipart log with 100 messages */ - val log = new Log(logDir, 100, 1000, false) + val log = new Log(logDir, 100, 1000, false, time) val numMessages = 100 for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes())) @@ -156,7 +157,7 @@ def testEdgeLogRolls() { { // first test a log segment starting at 0 - val log = new Log(logDir, 100, 1000, false) + val log = new Log(logDir, 100, 1000, false, time) val curOffset = log.logEndOffset assertEquals(curOffset, 0) @@ -169,7 +170,7 @@ { // second test an empty log segment starting at none-zero - val log = new Log(logDir, 100, 1000, false) + val log = new Log(logDir, 100, 1000, false, time) val numMessages = 1 for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes())) Index: core/src/test/scala/unit/kafka/log/LogOffsetTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (working copy) @@ -43,6 +43,7 @@ var logSize: Int = 100 val brokerPort: Int = 9099 var simpleConsumer: SimpleConsumer = null + var time: Time = new MockTime() @Before override def setUp() { @@ -50,8 +51,8 @@ val config: Properties = createBrokerConfig(1, brokerPort) val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) - - server = TestUtils.createServer(new KafkaConfig(config)) + time = new MockTime() + server = TestUtils.createServer(new KafkaConfig(config), time) simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024) } @@ -90,7 +91,6 @@ log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) log.flush() - Thread.sleep(100) val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.LatestTime, 10) @@ -148,15 +148,16 @@ log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) log.flush() - val now = System.currentTimeMillis - Thread.sleep(100) + time.sleep(20) + val now = time.milliseconds val offsetRequest = new OffsetRequest(topic, part, now, 10) val offsets = log.getOffsetsBefore(offsetRequest) - assertTrue((Array(216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long])) + println("Offsets = " + offsets.mkString(",")) + assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long])) val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10) - assertTrue((Array(216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long])) + assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long])) } @Test @@ -175,8 +176,6 @@ log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) log.flush() - Thread.sleep(100) - val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.EarliestTime, 10) val offsets = log.getOffsetsBefore(offsetRequest) Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogManagerTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala (working copy) @@ -118,7 +118,6 @@ val retentionMs = 1000 * 60 * 60 * retentionHours val props = TestUtils.createBrokerConfig(0, -1) logManager.shutdown() - Thread.sleep(100) config = new KafkaConfig(props) { override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] // keep exactly 6 segments + 1 roll over @@ -138,9 +137,8 @@ log.append(set) offset += set.sizeInBytes } - // flush to make sure it's written to disk, then sleep to confirm + // flush to make sure it's written to disk log.flush - Thread.sleep(2000) // should be exactly 100 full segments + 1 new empty one assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments) @@ -163,7 +161,6 @@ def testTimeBasedFlush() { val props = TestUtils.createBrokerConfig(0, -1) logManager.shutdown() - Thread.sleep(100) config = new KafkaConfig(props) { override val logFileSize = 1024 *1024 *1024 override val flushSchedulerThreadRate = 50 @@ -186,7 +183,6 @@ def testConfigurablePartitions() { val props = TestUtils.createBrokerConfig(0, -1) logManager.shutdown() - Thread.sleep(100) config = new KafkaConfig(props) { override val logFileSize = 256 override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2") Index: core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala =================================================================== --- core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala (working copy) @@ -47,20 +47,20 @@ brokers(0).shutdown() brokers(1).shutdown() brokers(3).shutdown() - Thread.sleep(1000) - + assertTrue("Controller not elected", TestUtils.waitUntilTrue(() => + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) != null, zookeeper.tickTime)) var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) - assertEquals(curController, "2") + assertEquals("Controller should move to broker 2", "2", curController) brokers(1).startup() brokers(2).shutdown() - Thread.sleep(1000) + assertTrue("Controller not elected", TestUtils.waitUntilTrue(() => + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) != null, zookeeper.tickTime)) curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) - assertEquals(curController, "1") + assertEquals("Controller should move to broker 1", "1", curController) } def testControllerCommandSend(){ - Thread.sleep(1000) for(broker <- brokers){ if(broker.kafkaController.isActive){ val leaderAndISRRequest = ControllerTestUtils.createSampleLeaderAndISRRequest() Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (working copy) @@ -31,8 +31,8 @@ import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import scala.collection._ -import kafka.admin.CreateTopicCommand import kafka.common.{ErrorMapping, InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException} +import kafka.admin.{AdminUtils, CreateTopicCommand} /** * End to end tests of the primitive apis against a local server @@ -109,7 +109,6 @@ val stringProducer1 = new Producer[String, String](config) stringProducer1.send(new ProducerData[String, String](topic, Array("test-message"))) - Thread.sleep(200) val request = new FetchRequestBuilder() .correlationId(100) @@ -138,7 +137,6 @@ val stringProducer1 = new Producer[String, String](config) stringProducer1.send(new ProducerData[String, String](topic, Array("test-message"))) - Thread.sleep(200) var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) val messageSet = fetched.messageSet(topic, 0) @@ -167,7 +165,6 @@ } // wait a bit for produced message to be available - Thread.sleep(700) val request = builder.build() val response = consumer.fetch(request) for( (topic, partition) <- topics) { @@ -235,7 +232,6 @@ } // wait a bit for produced message to be available - Thread.sleep(200) val request = builder.build() val response = consumer.fetch(request) for( (topic, partition) <- topics) { @@ -303,7 +299,6 @@ producer.send(produceList: _*) // wait a bit for produced message to be available - Thread.sleep(200) val request = builder.build() val response = consumer.fetch(request) for( (topic, partition) <- topics) { @@ -328,7 +323,6 @@ producer.send(produceList: _*) // wait a bit for produced message to be available - Thread.sleep(200) val request = builder.build() val response = consumer.fetch(request) for( (topic, partition) <- topics) { @@ -337,10 +331,12 @@ } } - def testConsumerNotExistTopic() { + def testConsumerEmptyTopic() { val newTopic = "new-topic" CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString) - Thread.sleep(200) + assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => + AdminUtils.getTopicMetaDataFromZK(List(newTopic), + zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime)) val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build()) assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) } Index: core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala (working copy) @@ -41,6 +41,7 @@ props.put("reconnect.interval", "10000") props.put("producer.retry.backoff.ms", "1000") props.put("producer.num.retries", "3") + props.put("producer.request.required.acks", "-1") producer = new Producer(new ProducerConfig(props)) consumer = new SimpleConsumer(host, port, Index: core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (working copy) @@ -49,7 +49,6 @@ val producerData = new ProducerData[String, Message](topic, topic, List(new Message("hello".getBytes()))) producer.send(producerData) - Thread.sleep(200) // corrupt the file on disk val logFile = new File(config.logDir + File.separator + topic + "-" + partition, Log.nameFromOffset(0)) @@ -61,7 +60,6 @@ channel.force(true) channel.close - Thread.sleep(500) // test SimpleConsumer val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, 10000).build()) try { Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (working copy) @@ -93,7 +93,6 @@ } // wait a bit for produced message to be available - Thread.sleep(200) val request = builder.build() val response = consumer.fetch(request) for( (topic, offset) <- topicOffsets) { @@ -138,7 +137,6 @@ producer.send(produceList: _*) // wait a bit for produced message to be available - Thread.sleep(200) val request = builder.build() val response = consumer.fetch(request) for(topic <- topics) { @@ -166,7 +164,6 @@ producer.send(produceList: _*) // wait a bit for produced message to be available - Thread.sleep(750) val request = builder.build() val response = consumer.fetch(request) for(topic <- topics) { Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (working copy) @@ -29,12 +29,12 @@ import kafka.producer.async._ import kafka.serializer.{StringEncoder, StringDecoder, Encoder} import kafka.server.KafkaConfig -import kafka.utils.{FixedValuePartitioner, NegativePartitioner, TestZKUtils, TestUtils} import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import scala.collection.Map import scala.collection.mutable.ListBuffer +import kafka.utils._ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) @@ -106,13 +106,6 @@ } } - def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = { - val producerDataList = new ListBuffer[ProducerData[String,String]] - for (i <- 0 until nEvents) - producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg" + i))) - producerDataList - } - @Test def testBatchSize() { /** @@ -150,11 +143,13 @@ EasyMock.expectLastCall EasyMock.replay(mockHandler) + val mockTime = new MockTime val queue = new LinkedBlockingQueue[ProducerData[String,String]](10) val producerSendThread = new ProducerSendThread[String,String]("thread1", queue, mockHandler, 200, 5) producerSendThread.start() + mockTime.sleep(300) for (producerData <- producerDataList) queue.put(producerData) @@ -446,7 +441,7 @@ // entirely. The second request will succeed for partition 1 but fail for partition 0. // On the third try for partition 0, let it succeed. val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0) - val response1 = + val response1 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L)) val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs)) val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)) @@ -530,6 +525,13 @@ } } + def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = { + val producerDataList = new ListBuffer[ProducerData[String,String]] + for (i <- 0 until nEvents) + producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg" + i))) + producerDataList + } + private def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = { val encoder = new StringEncoder new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*) @@ -565,11 +567,4 @@ val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort) new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1)))) } - - class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) { - override def send(produceRequest: ProducerRequest): ProducerResponse = { - Thread.sleep(1000) - null - } - } } Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala (working copy) @@ -68,8 +68,6 @@ // temporarily set request handler logger to a higher level requestHandlerLogger.setLevel(Level.FATAL) - - Thread.sleep(500) } override def tearDown() { @@ -81,7 +79,6 @@ server2.awaitShutdown() Utils.rm(server1.config.logDir) Utils.rm(server2.config.logDir) - Thread.sleep(500) super.tearDown() } @@ -151,6 +148,7 @@ props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("producer.request.timeout.ms", "2000") +// props.put("producer.request.required.acks", "-1") props.put("zk.connect", TestZKUtils.zookeeperConnect) // create topic @@ -166,7 +164,6 @@ // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only // on broker 0 producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) - Thread.sleep(100) } catch { case e => fail("Unexpected exception: " + e) } @@ -174,12 +171,10 @@ // kill the broker server1.shutdown server1.awaitShutdown() - Thread.sleep(100) try { // These sends should fail since there are no available brokers producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) - Thread.sleep(100) fail("Should fail since no leader exists for the partition.") } catch { case e => // success @@ -187,8 +182,6 @@ // restart server 1 server1.startup() - Thread.sleep(100) - try { // cross check if broker 1 got the messages val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) @@ -222,7 +215,6 @@ try { // this message should be assigned to partition 0 whose leader is on broker 0 producer.send(new ProducerData[String, String]("new-topic", "test", Array("test"))) - Thread.sleep(100) // cross check if brokers got the messages val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) val messageSet1 = response1.messageSet("new-topic", 0).iterator Index: core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala =================================================================== --- core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (working copy) @@ -39,17 +39,11 @@ } var testData: String = null - testData = ZkUtils.readData(zkClient, "/tmp/zktest") Assert.assertNotNull(testData) - zkClient.close - - Thread.sleep(zkSessionTimeoutMs) - zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) - val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest") Assert.assertFalse(nodeExists) } Index: core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (working copy) @@ -123,8 +123,8 @@ sendMessages() // give some time for follower 1 to record leader HW of 60 - TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 60L, 500) - + assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => + server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000)) // shutdown the servers to allow the hw to be checkpointed servers.map(server => server.shutdown()) producer.close() @@ -165,7 +165,8 @@ assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) sendMessages(20) // give some time for follower 1 to record leader HW of 600 - TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 600L, 500) + assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => + server2.getReplica(topic, 0).get.highWatermark() == 600L, 1000)) // shutdown the servers to allow the hw to be checkpointed servers.map(server => server.shutdown()) producer.close() @@ -209,7 +210,8 @@ sendMessages(2) // allow some time for the follower to get the leader HW - TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000) + assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => + server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000)) // kill the server hosting the preferred replica server1.shutdown() server2.shutdown() @@ -231,7 +233,8 @@ sendMessages(2) // allow some time for the follower to get the leader HW - TestUtils.waitUntilTrue(() => server1.getReplica(topic, 0).get.highWatermark() == 120L, 1000) + assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => + server1.getReplica(topic, 0).get.highWatermark() == 120L, 1000)) // shutdown the servers to allow the hw to be checkpointed servers.map(server => server.shutdown()) producer.close() Index: core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (working copy) @@ -41,8 +41,7 @@ // create replica manager val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler) replicaManager.startup() - // sleep until flush ms - Thread.sleep(configs.head.defaultFlushIntervalMs) + replicaManager.checkpointHighwaterMarks() var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) assertEquals(0L, fooPartition0Hw) val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet) @@ -51,8 +50,7 @@ // create leader and follower replicas val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet) val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0) - // sleep until flush ms - Thread.sleep(configs.head.defaultFlushIntervalMs) + replicaManager.checkpointHighwaterMarks() fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw) try { @@ -65,8 +63,7 @@ partition0.leaderId(Some(leaderReplicaPartition0.brokerId)) // set the highwatermark for local replica partition0.leaderHW(Some(5L)) - // sleep until flush interval - Thread.sleep(configs.head.defaultFlushIntervalMs) + replicaManager.checkpointHighwaterMarks() fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw) EasyMock.verify(zkClient) @@ -85,8 +82,7 @@ // create replica manager val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler) replicaManager.startup() - // sleep until flush ms - Thread.sleep(configs.head.defaultFlushIntervalMs) + replicaManager.checkpointHighwaterMarks() var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) assertEquals(0L, topic1Partition0Hw) val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, configs.map(_.brokerId).toSet) @@ -94,16 +90,14 @@ val topic1Log0 = getMockLog // create leader and follower replicas val leaderReplicaTopic1Partition0 = replicaManager.addLocalReplica(topic1, 0, topic1Log0, configs.map(_.brokerId).toSet) - // sleep until flush ms - Thread.sleep(configs.head.defaultFlushIntervalMs) + replicaManager.checkpointHighwaterMarks() topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) assertEquals(leaderReplicaTopic1Partition0.highWatermark(), topic1Partition0Hw) // set the leader topic1Partition0.leaderId(Some(leaderReplicaTopic1Partition0.brokerId)) // set the highwatermark for local replica topic1Partition0.leaderHW(Some(5L)) - // sleep until flush interval - Thread.sleep(configs.head.defaultFlushIntervalMs) + replicaManager.checkpointHighwaterMarks() topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark()) assertEquals(5L, topic1Partition0Hw) @@ -113,8 +107,7 @@ val topic2Log0 = getMockLog // create leader and follower replicas val leaderReplicaTopic2Partition0 = replicaManager.addLocalReplica(topic2, 0, topic2Log0, configs.map(_.brokerId).toSet) - // sleep until flush ms - Thread.sleep(configs.head.defaultFlushIntervalMs) + replicaManager.checkpointHighwaterMarks() var topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0) assertEquals(leaderReplicaTopic2Partition0.highWatermark(), topic2Partition0Hw) // set the leader @@ -125,8 +118,7 @@ // change the highwatermark for topic1 topic1Partition0.leaderHW(Some(10L)) assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark()) - // sleep until flush interval - Thread.sleep(configs.head.defaultFlushIntervalMs) + replicaManager.checkpointHighwaterMarks() // verify checkpointed hw for topic 2 topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0) assertEquals(15L, topic2Partition0Hw) Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (working copy) @@ -54,7 +54,6 @@ // send some messages producer.send(new ProducerData[Int, Message](topic, 0, sent1)) - Thread.sleep(200) // do a clean shutdown server.shutdown() val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile) @@ -86,8 +85,6 @@ // send some more messages producer.send(new ProducerData[Int, Message](topic, 0, sent2)) - Thread.sleep(200) - fetchedMessage = null while(fetchedMessage == null || fetchedMessage.validBytes == 0) { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build()) Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (revision 1366778) +++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -148,7 +148,6 @@ val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) // send some messages to each broker - Thread.sleep(200) val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum) @@ -234,7 +233,6 @@ val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) // send some messages to each broker - Thread.sleep(200) val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum) @@ -265,7 +263,6 @@ // shutdown one server servers.last.shutdown - Thread.sleep(500) // send some messages to each broker val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) Index: core/src/test/scala/other/kafka/TestLogPerformance.scala =================================================================== --- core/src/test/scala/other/kafka/TestLogPerformance.scala (revision 1366778) +++ core/src/test/scala/other/kafka/TestLogPerformance.scala (working copy) @@ -18,7 +18,7 @@ package kafka.log import kafka.message._ -import kafka.utils.{TestUtils, Utils} +import kafka.utils.{SystemTime, TestUtils, Utils} object TestLogPerformance { @@ -30,7 +30,7 @@ val batchSize = args(2).toInt val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt) val dir = TestUtils.tempDir() - val log = new Log(dir, 50*1024*1024, 5000000, false) + val log = new Log(dir, 50*1024*1024, 5000000, false, SystemTime) val bytes = new Array[Byte](messageSize) new java.util.Random().nextBytes(bytes) val message = new Message(bytes) Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1366778) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -115,7 +115,7 @@ * An append-only log for storing messages. */ @threadsafe -private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) +private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean, time: Time) extends Logging { import kafka.log.Log._ @@ -288,7 +288,7 @@ else { // If the last segment to be deleted is empty and we roll the log, the new segment will have the same // file name. So simply reuse the last segment and reset the modified time. - view(numToDelete - 1).file.setLastModified(SystemTime.milliseconds) + view(numToDelete - 1).file.setLastModified(time.milliseconds) numToDelete -=1 } } @@ -348,10 +348,10 @@ lock synchronized { debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " + - System.currentTimeMillis) + time.milliseconds) segments.view.last.messageSet.flush() unflushed.set(0) - lastflushedTime.set(System.currentTimeMillis) + lastflushedTime.set(time.milliseconds) } } @@ -366,7 +366,7 @@ for (i <- 0 until segsArray.length) offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified) if (segsArray.last.size > 0) - offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.sizeInBytes(), SystemTime.milliseconds) + offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.sizeInBytes(), time.milliseconds) var startIndex = -1 request.time match { Index: core/src/main/scala/kafka/log/LogManager.scala =================================================================== --- core/src/main/scala/kafka/log/LogManager.scala (revision 1366778) +++ core/src/main/scala/kafka/log/LogManager.scala (working copy) @@ -60,7 +60,7 @@ warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") } else { info("Loading log '" + dir.getName() + "'") - val log = new Log(dir, maxSize, flushInterval, needRecovery) + val log = new Log(dir, maxSize, flushInterval, needRecovery, time) val topicPartion = Utils.getTopicPartition(dir.getName) logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]()) val parts = logs.get(topicPartion._1) @@ -100,7 +100,7 @@ logCreationLock synchronized { val d = new File(logDir, topic + "-" + partition) d.mkdirs() - new Log(d, maxSize, flushInterval, false) + new Log(d, maxSize, flushInterval, false, time) } } Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1366778) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -70,7 +70,7 @@ /* start log manager */ logManager = new LogManager(config, kafkaScheduler, - SystemTime, + time, 1000L * 60 * config.logCleanupIntervalMinutes, 1000L * 60 * 60 * config.logRetentionHours, needRecovery) Index: core/src/main/scala/kafka/server/ReplicaManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaManager.scala (revision 1366778) +++ core/src/main/scala/kafka/server/ReplicaManager.scala (working copy) @@ -267,7 +267,7 @@ /** * Flushes the highwatermark value for all partitions to the highwatermark file */ - private def checkpointHighwaterMarks() { + def checkpointHighwaterMarks() { val highwaterMarksForAllPartitions = allReplicas.map { partition => val topic = partition._1._1 val partitionId = partition._1._2