diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 1415773..65722cd 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -17,22 +17,25 @@ package kafka.integration -import junit.framework.Assert._ import kafka.utils.{ZKGroupTopicDirs, Logging} import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer} import kafka.server._ -import org.apache.log4j.{Level, Logger} -import org.scalatest.junit.JUnit3Suite import kafka.utils.TestUtils import kafka.serializer._ import kafka.producer.{Producer, KeyedMessage} +import org.junit.Test +import org.apache.log4j.{Level, Logger} +import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ + class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging { + val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0))) + val topic = "test_topic" val group = "default_group" val testConsumer = "consumer" - val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0))) val NumMessages = 10 val LargeOffset = 10000 val SmallOffset = -1 @@ -51,69 +54,39 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L super.tearDown } - // fake test so that this test can pass - def testResetToEarliestWhenOffsetTooHigh() = - assertTrue(true) - - /* Temporarily disable those tests due to failures. -kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooHigh(AutoOffsetResetTest.scala:55) - - -kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooLow(AutoOffsetResetTest.scala:58) - - -kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooHigh(AutoOffsetResetTest.scala:61) - - -kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooLow(AutoOffsetResetTest.scala:64) - + @Test def testResetToEarliestWhenOffsetTooHigh() = assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", LargeOffset)) - + + @Test def testResetToEarliestWhenOffsetTooLow() = assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset)) + @Test def testResetToLatestWhenOffsetTooHigh() = assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset)) + @Test def testResetToLatestWhenOffsetTooLow() = assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset)) - */ /* Produce the given number of messages, create a consumer with the given offset policy, * then reset the offset to the given value and consume until we get no new messages. * Returns the count of messages received. */ def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = { - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - - val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), + val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new DefaultEncoder(), new StringEncoder()) for(i <- 0 until numMessages) producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes)) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, newLeaderOpt = Some(0)) // update offset in zookeeper for consumer to jump "forward" in time val dirs = new ZKGroupTopicDirs(group, topic) - var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer) + val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer) consumerProps.put("auto.offset.reset", resetTo) consumerProps.put("consumer.timeout.ms", "2000") consumerProps.put("fetch.wait.max.ms", "0")