diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 1415773..141819b 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -17,22 +17,25 @@ package kafka.integration -import junit.framework.Assert._ import kafka.utils.{ZKGroupTopicDirs, Logging} import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer} import kafka.server._ -import org.apache.log4j.{Level, Logger} -import org.scalatest.junit.JUnit3Suite import kafka.utils.TestUtils import kafka.serializer._ import kafka.producer.{Producer, KeyedMessage} +import org.junit.Test +import org.apache.log4j.{Level, Logger} +import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ + class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging { + val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0))) + val topic = "test_topic" val group = "default_group" val testConsumer = "consumer" - val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0))) val NumMessages = 10 val LargeOffset = 10000 val SmallOffset = -1 @@ -51,69 +54,37 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L super.tearDown } - // fake test so that this test can pass - def testResetToEarliestWhenOffsetTooHigh() = - assertTrue(true) - - /* Temporarily disable those tests due to failures. -kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooHigh(AutoOffsetResetTest.scala:55) - - -kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToEarliestWhenOffsetTooLow(AutoOffsetResetTest.scala:58) - - -kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooHigh(AutoOffsetResetTest.scala:61) - - -kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow FAILED - java.lang.RuntimeException: Timing out after 1000 ms since leader is not elected or changed for partition [test_topic,0] - at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:478) - at kafka.integration.AutoOffsetResetTest.resetAndConsume(AutoOffsetResetTest.scala:71) - at kafka.integration.AutoOffsetResetTest.testResetToLatestWhenOffsetTooLow(AutoOffsetResetTest.scala:64) - + @Test def testResetToEarliestWhenOffsetTooHigh() = assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", LargeOffset)) - + + @Test def testResetToEarliestWhenOffsetTooLow() = assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset)) + @Test def testResetToLatestWhenOffsetTooHigh() = assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset)) + @Test def testResetToLatestWhenOffsetTooLow() = assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset)) - */ /* Produce the given number of messages, create a consumer with the given offset policy, * then reset the offset to the given value and consume until we get no new messages. * Returns the count of messages received. */ def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = { - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - - val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), - new DefaultEncoder(), new StringEncoder()) + val producer: Producer[String, Array[Byte]] = TestUtils.createProducer( + TestUtils.getBrokerListStrFromConfigs(configs), + keyEncoder = classOf[StringEncoder].getName) for(i <- 0 until numMessages) producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes)) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - // update offset in zookeeper for consumer to jump "forward" in time val dirs = new ZKGroupTopicDirs(group, topic) - var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer) + val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer) consumerProps.put("auto.offset.reset", resetTo) consumerProps.put("consumer.timeout.ms", "2000") consumerProps.put("fetch.wait.max.ms", "0") diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 9e1a3b7..7b8d336 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -83,9 +83,9 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { def sendMessages(messagesPerNode: Int): Int = { var count = 0 for(conf <- configs) { - val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), - new DefaultEncoder(), - new StringEncoder()) + val producer: Producer[String, Array[Byte]] = TestUtils.createProducer( + TestUtils.getBrokerListStrFromConfigs(configs), + keyEncoder = classOf[StringEncoder].getName) val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray messages += conf.brokerId -> ms producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*) diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala index 731ee59..108c2e7 100644 --- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala @@ -19,8 +19,10 @@ package kafka.integration import kafka.consumer.SimpleConsumer import org.scalatest.junit.JUnit3Suite -import kafka.producer.{ProducerConfig, Producer} -import kafka.utils.TestUtils +import kafka.producer.Producer +import kafka.utils.{StaticPartitioner, TestUtils} +import kafka.serializer.StringEncoder + trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness { val port: Int val host = "localhost" @@ -29,8 +31,10 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes override def setUp() { super.setUp - val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner") - producer = new Producer(new ProducerConfig(props)) + producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[StaticPartitioner].getName) consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 1bf9462..cf2aaca 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -251,10 +251,9 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { } private def produceMessage(topic: String, message: String) = { - val props = new Properties() - props.put("request.required.acks", String.valueOf(-1)) - val producer: Producer[String, Array[Byte]] = createProducer(getBrokerListStrFromConfigs(configs), - new DefaultEncoder(), new StringEncoder(), props) + val producer: Producer[String, Array[Byte]] = createProducer( + getBrokerListStrFromConfigs(configs), + keyEncoder = classOf[StringEncoder].getName) producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) producer.close() } diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 16e7164..4fe0b04 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -79,7 +79,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar compressed: CompressionCodec): List[String] = { var messages: List[String] = Nil val producer: kafka.producer.Producer[Int, String] = - TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder(), new IntEncoder()) + TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName) val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer) for (partition <- 0 until numParts) { val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index ddb2402..3218d4f 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -26,6 +26,7 @@ import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness import kafka.common._ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} +import kafka.serializer.StringEncoder class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -52,9 +53,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) - producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString) - producerProps.put("request.required.acks", "-1") - + producerProps.put("key.serializer.class", classOf[IntEncoder].getName) + producerProps.put("serializer.class", classOf[StringEncoder].getName) + override def tearDown() { for(server <- servers) { server.shutdown() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 5305167..0bff414 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -56,9 +56,9 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { } // send test messages to leader - val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), - new StringEncoder(), - new StringEncoder()) + val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName) val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m)) producer.send(messages:_*) producer.close() diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 1651822..917bc1c 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -24,11 +24,11 @@ import kafka.message.ByteBufferMessageSet import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.producer._ -import kafka.utils.IntEncoder +import kafka.utils.{StaticPartitioner, IntEncoder, TestUtils, Utils} import kafka.utils.TestUtils._ import kafka.admin.AdminUtils import kafka.api.FetchRequestBuilder -import kafka.utils.{TestUtils, Utils} +import kafka.serializer.StringEncoder class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val port = TestUtils.choosePort @@ -44,9 +44,9 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { def testCleanShutdown() { var server = new KafkaServer(config) server.startup() - val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config))) - producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString) - var producer = new Producer[Int, String](new ProducerConfig(producerConfig)) + var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName) // create topic AdminUtils.createTopic(zkClient, topic, 1, 1) @@ -71,7 +71,9 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { // wait for the broker to receive the update metadata request after startup TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000) - producer = new Producer[Int, String](new ProducerConfig(producerConfig)) + producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName) val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") var fetchedMessage: ByteBufferMessageSet = null diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 4bd5964..55c3809 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -321,27 +321,26 @@ object TestUtils extends Logging { * Create a producer for the given host and port */ def createProducer[K, V](brokerList: String, - encoder: Encoder[V] = new DefaultEncoder(), - keyEncoder: Encoder[K] = new DefaultEncoder(), - props: Properties = new Properties()): Producer[K, V] = { - props.put("metadata.broker.list", brokerList) - props.put("send.buffer.bytes", "65536") - props.put("connect.timeout.ms", "100000") - props.put("reconnect.interval", "10000") - props.put("serializer.class", encoder.getClass.getCanonicalName) - props.put("key.serializer.class", keyEncoder.getClass.getCanonicalName) + encoder: String = classOf[DefaultEncoder].getName, + keyEncoder: String = classOf[DefaultEncoder].getName, + partitioner: String = classOf[DefaultPartitioner].getName): Producer[K, V] = { + val props = getProducerConfig(brokerList) + props.put("serializer.class", encoder) + props.put("key.serializer.class", keyEncoder) + props.put("partitioner.class", partitioner) new Producer[K, V](new ProducerConfig(props)) } - def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = { + def getProducerConfig(brokerList: String): Properties = { val props = new Properties() props.put("metadata.broker.list", brokerList) - props.put("partitioner.class", partitioner) props.put("message.send.max.retries", "3") props.put("retry.backoff.ms", "1000") props.put("request.timeout.ms", "500") props.put("request.required.acks", "-1") - props.put("serializer.class", classOf[StringEncoder].getName.toString) + props.put("send.buffer.bytes", "65536") + props.put("connect.timeout.ms", "100000") + props.put("reconnect.interval", "10000") props } @@ -352,7 +351,7 @@ object TestUtils extends Logging { props.put("port", port.toString) props.put("request.timeout.ms", "500") props.put("request.required.acks", "1") - props.put("serializer.class", classOf[StringEncoder].getName.toString) + props.put("serializer.class", classOf[StringEncoder].getName) props }