From cd3dcdfc1f633cbc54f048d68262060c93277cd9 Mon Sep 17 00:00:00 2001 From: futtre Date: Sun, 1 Mar 2015 15:39:13 -0800 Subject: [PATCH] KAFKA-1416 Unify sendMessages/getMessages in unit tests Unified get and send messages in TestUtils.scala and its users --- .../consumer/ZookeeperConsumerConnectorTest.scala | 58 +++++----- .../scala/unit/kafka/integration/FetcherTest.scala | 13 +-- .../integration/UncleanLeaderElectionTest.scala | 45 +++----- .../consumer/ZookeeperConsumerConnectorTest.scala | 44 +++----- .../scala/unit/kafka/metrics/MetricsTest.scala | 4 +- .../scala/unit/kafka/server/LogRecoveryTest.scala | 13 +-- .../test/scala/unit/kafka/utils/TestUtils.scala | 122 +++++++++++++-------- 7 files changed, 139 insertions(+), 160 deletions(-) diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 155fd04..79c379b 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -82,7 +82,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar // also the iterator should support re-entrant, so loop it twice for (i <- 0 until 2) { try { - getMessages(nMessages*2, topicMessageStreams0) + getMessages(topicMessageStreams0, nMessages * 2) fail("should get an exception") } catch { case e: ConsumerTimeoutException => // this is ok @@ -93,8 +93,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar zkConsumerConnector0.shutdown // send some messages to each broker - val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages) ++ - sendMessagesToPartition(configs, topic, 1, nMessages) + val sentMessages1 = sendMessages(configs, topic, nMessages, 0) ++ + sendMessages(configs, topic, nMessages, 1) // wait to make sure the topic and partition have a leader for the successful case waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) @@ -108,7 +108,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2) assertEquals(sentMessages1.sorted, receivedMessages1.sorted) // also check partition ownership @@ -127,13 +127,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages) ++ - sendMessagesToPartition(configs, topic, 1, nMessages) + val sentMessages2 = sendMessages(configs, topic, nMessages, 0) ++ + sendMessages(configs, topic, nMessages, 1) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages2.sorted, receivedMessages2.sorted) // also check partition ownership @@ -148,13 +148,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) // send some messages to each broker - val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages) ++ - sendMessagesToPartition(configs, topic, 1, nMessages) + val sentMessages3 = sendMessages(configs, topic, nMessages, 0) ++ + sendMessages(configs, topic, nMessages, 1) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages3.sorted, receivedMessages3.sorted) // also check partition ownership @@ -182,8 +182,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages1 = sendMessages(configs, topic, nMessages, 0, GZIPCompressionCodec) ++ + sendMessages(configs, topic, nMessages, 1, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -196,7 +196,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2) assertEquals(sentMessages1.sorted, receivedMessages1.sorted) // also check partition ownership @@ -215,13 +215,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages2 = sendMessages(configs, topic, nMessages, 0, GZIPCompressionCodec) ++ + sendMessages(configs, topic, nMessages, 1, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages2.sorted, receivedMessages2.sorted) // also check partition ownership @@ -236,13 +236,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages3 = sendMessages(configs, topic, nMessages, 0, GZIPCompressionCodec) ++ + sendMessages(configs, topic, nMessages, 1, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages3.sorted, receivedMessages3.sorted) // also check partition ownership @@ -258,8 +258,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testCompressionSetConsumption() { // send some messages to each broker - val sentMessages = sendMessagesToPartition(configs, topic, 0, 200, DefaultCompressionCodec) ++ - sendMessagesToPartition(configs, topic, 1, 200, DefaultCompressionCodec) + val sentMessages = sendMessages(configs, topic, 200, 0, DefaultCompressionCodec) ++ + sendMessages(configs, topic, 200, 1, DefaultCompressionCodec) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -267,7 +267,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages = getMessages(400, topicMessageStreams1) + val receivedMessages = getMessages(topicMessageStreams1, 400) assertEquals(sentMessages.sorted, receivedMessages.sorted) // also check partition ownership @@ -284,8 +284,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages = sendMessagesToPartition(configs, topic, 0, nMessages, NoCompressionCodec) ++ - sendMessagesToPartition(configs, topic, 1, nMessages, NoCompressionCodec) + val sentMessages = sendMessages(configs, topic, nMessages, 0, NoCompressionCodec) ++ + sendMessages(configs, topic, nMessages, 1, NoCompressionCodec) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -325,7 +325,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) // send some messages to each broker - val sentMessages1 = sendMessages(configs, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1) + val sentMessages1 = sendMessages(configs, topic, nMessages) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -343,7 +343,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val expected_1 = List( ("0", "group1_consumer1-0")) assertEquals(expected_1, actual_1) - val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages) assertEquals(sentMessages1, receivedMessages1) zkConsumerConnector1.shutdown() zkClient.close() @@ -351,8 +351,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testConsumerRebalanceListener() { // Send messages to create topic - sendMessagesToPartition(configs, topic, 0, nMessages) - sendMessagesToPartition(configs, topic, 1, nMessages) + sendMessages(configs, topic, nMessages, 0) + sendMessages(configs, topic, nMessages, 1) val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) @@ -388,7 +388,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // Consume messages from consumer 1 to make sure it has finished rebalance - getMessages(nMessages, topicMessageStreams1) + getMessages(topicMessageStreams1, nMessages) val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir) val expected_2 = List(("0", "group1_consumer1-0"), diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 3093e45..52d0d08 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -80,18 +80,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { def assertQueueEmpty(): Unit = assertEquals(0, queue.size) def sendMessages(messagesPerNode: Int): Int = { - var count = 0 - for(conf <- configs) { - val producer: Producer[String, Array[Byte]] = TestUtils.createProducer( - TestUtils.getBrokerListStrFromConfigs(configs), - keyEncoder = classOf[StringEncoder].getName) - val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray - messages += conf.brokerId -> ms - producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*) - producer.close() - count += ms.size - } - count + TestUtils.sendMessages(configs, topic, messagesPerNode).size } def fetch(expected: Int) { diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 8342cae..2a0bcdd 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -29,7 +29,7 @@ import kafka.admin.AdminUtils import kafka.common.FailedToSendMessageException import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} import kafka.producer.{KeyedMessage, Producer} -import kafka.serializer.{DefaultEncoder, StringEncoder} +import kafka.serializer.{StringDecoder, DefaultEncoder, StringEncoder} import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.Utils import kafka.utils.TestUtils._ @@ -174,14 +174,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(topic, "first") + sendMessage(configs, topic, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - produceMessage(topic, "second") + sendMessage(configs, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -191,7 +191,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // wait until new leader is (uncleanly) elected waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId)) - produceMessage(topic, "third") + sendMessage(configs, topic, "third") // second message was lost due to unclean election assertEquals(List("first", "third"), consumeAllMessages(topic)) @@ -209,14 +209,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(topic, "first") + sendMessage(configs, topic, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - produceMessage(topic, "second") + sendMessage(configs, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -228,7 +228,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // message production and consumption should both fail while leader is down intercept[FailedToSendMessageException] { - produceMessage(topic, "third") + sendMessage(configs, topic, "third") } assertEquals(List.empty[String], consumeAllMessages(topic)) @@ -236,7 +236,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup()) waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId)) - produceMessage(topic, "third") + sendMessage(configs, topic, "third") waitUntilMetadataIsPropagated(servers, topic, partitionId) servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) @@ -252,33 +252,16 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { server.awaitShutdown() } - private def produceMessage(topic: String, message: String) = { - val producer: Producer[String, Array[Byte]] = createProducer( - getBrokerListStrFromConfigs(configs), - keyEncoder = classOf[StringEncoder].getName) - producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) - producer.close() - } - private def consumeAllMessages(topic: String) : List[String] = { // use a fresh consumer group every time so that we don't need to mess with disabling auto-commit or // resetting the ZK offset val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id", 1000) val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) - val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head - - val messages = new MutableList[String] - val iter = messageStream.iterator - try { - while(iter.hasNext()) { - messages += new String(iter.next.message) // will throw a timeout exception if the message isn't there - } - } catch { - case e: ConsumerTimeoutException => - debug("consumer timed out after receiving " + messages.length + " message(s).") - } finally { - consumerConnector.shutdown - } - messages.toList + val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + + val messages = getMessages(messageStream) + consumerConnector.shutdown + + messages } } diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 3d0fc9d..fdcc9c2 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -65,7 +65,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar TestUtils.createTopic(zkClient, topic, numParts, 1, servers) // send some messages to each broker - val sentMessages1 = sendMessages(nMessages, "batch1") + val sentMessages1 = sendMessages(configs, nMessages, "batch1") // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1)) @@ -87,51 +87,37 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.ERROR) } - def sendMessages(conf: KafkaConfig, + def sendMessages(configs: Seq[KafkaConfig], messagesPerNode: Int, header: String, - compressed: CompressionCodec): List[String] = { + compressed: CompressionCodec = NoCompressionCodec): List[String] = { var messages: List[String] = Nil val producer: kafka.producer.Producer[Int, String] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[IntEncoder].getName) + val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer) - for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) - messages ++= ms - import JavaConversions._ - javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]]) + for(conf <- configs) { + for (partition <- 0 until numParts) { + val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) + messages ++= ms + import JavaConversions._ + javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]]) + } } javaProducer.close messages } - def sendMessages(messagesPerNode: Int, - header: String, - compressed: CompressionCodec = NoCompressionCodec): List[String] = { - var messages: List[String] = Nil - for(conf <- configs) - messages ++= sendMessages(conf, messagesPerNode, header, compressed) - messages - } - def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { + var messages: List[String] = Nil import scala.collection.JavaConversions._ - val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams - for ((topic, messageStreams) <- topicMessageStreams) { - for (messageStream <- messageStreams) { - val iterator = messageStream.iterator - for (i <- 0 until nMessagesPerThread) { - assertTrue(iterator.hasNext) - val message = iterator.next.message - messages ::= message - debug("received message: " + message) - } - } - } + val topicMessageStreams = jTopicMessageStreams.mapValues(_.toList) + messages = TestUtils.getMessages(topicMessageStreams, nMessagesPerThread) + messages } diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 0f58ad8..bfac656 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -80,12 +80,12 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging { } def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = { - val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1) + val sentMessages1 = sendMessages(configs, topic, nMessages) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages) zkConsumerConnector1.shutdown() } diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 92d6b2c..5b5c652 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -19,12 +19,9 @@ package kafka.server import java.util.Properties import kafka.utils.TestUtils._ -import kafka.utils.IntEncoder import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness import kafka.common._ -import kafka.producer.{KeyedMessage, Producer} -import kafka.serializer.StringEncoder import java.io.File @@ -56,7 +53,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val message = "hello" - var producer: Producer[Int, String] = null var hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename)) var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename)) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] @@ -71,15 +67,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic with 1 partition, 2 replicas, one on each broker createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) - - // create the producer - producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(configs), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName) } override def tearDown() { - producer.close() for(server <- servers) { server.shutdown() Utils.rm(server.config.logDirs(0)) @@ -208,7 +198,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { } private def sendMessages(n: Int = 1) { - for(i <- 0 until n) - producer.send(new KeyedMessage[Int, String](topic, 0, message)) + TestUtils.sendMessages(configs, topic, n, partition = 0) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 1682a77..ee38daf 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -35,7 +35,7 @@ import kafka.producer._ import kafka.message._ import kafka.api._ import kafka.cluster.Broker -import kafka.consumer.{KafkaStream, ConsumerConfig} +import kafka.consumer.{ConsumerTimeoutException, KafkaStream, ConsumerConfig} import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition import kafka.admin.AdminUtils @@ -755,73 +755,105 @@ object TestUtils extends Logging { brokerState = new BrokerState()) } - def sendMessagesToPartition(configs: Seq[KafkaConfig], - topic: String, - partition: Int, - numMessages: Int, - compression: CompressionCodec = NoCompressionCodec): List[String] = { - val header = "test-%d".format(partition) + def sendMessages(configs: Seq[KafkaConfig], + topic: String, + numMessages: Int, + partition: Int = -1, + compression: CompressionCodec = NoCompressionCodec): List[String] = { + val props = new Properties() props.put("compression.codec", compression.codec.toString) - val producer: Producer[Int, String] = - createProducer(TestUtils.getBrokerListStrFromConfigs(configs), + val brokerList = TestUtils.getBrokerListStrFromConfigs(configs) + + val header = "test-%d".format(partition) + val messages = 0.until(numMessages).map(x => header + "-" + x) + + // Specific Partition + if (partition >= 0) { + val producer: Producer[Int, String] = createProducer( + brokerList = brokerList, encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName, + keyEncoder = classOf[IntEncoder].getName, partitioner = classOf[FixedValuePartitioner].getName, producerProps = props) - val ms = 0.until(numMessages).map(x => header + "-" + x) - producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) - debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition)) - producer.close() - ms.toList - } + producer.send(messages.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) + producer.close() + debug("Sent %d messages for partition [%s,%d]".format(messages.size, topic, partition)) - def sendMessages(configs: Seq[KafkaConfig], - topic: String, - producerId: String, - messagesPerNode: Int, - header: String, - compression: CompressionCodec, - numParts: Int): List[String]= { - var messages: List[String] = Nil - val props = new Properties() - props.put("compression.codec", compression.codec.toString) - props.put("client.id", producerId) - val producer: Producer[Int, String] = - createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs), + // Default Partition + } else { + val producer: Producer[String, String] = createProducer( + brokerList = brokerList, encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName, - partitioner = classOf[FixedValuePartitioner].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[DefaultPartitioner].getName, producerProps = props) - for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x) - producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) - messages ++= ms - debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition)) + producer.send(messages.map(m => new KeyedMessage[String, String](topic, topic, m)):_*) + producer.close() + debug("Sent %d messages for topic [%s]".format(messages.size, topic)) } + + messages.toList + } + + def sendMessage(configs: Seq[KafkaConfig], + topic: String, + message: String) = { + + val producer: Producer[String, String] = + createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName(), + keyEncoder = classOf[StringEncoder].getName()) + + producer.send(new KeyedMessage[String, String](topic, topic, message)) producer.close() - messages } - def getMessages(nMessagesPerThread: Int, - topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String] = { + /** + * Consume all messages (or a strict number of messages) + * @param topicMessageStreams the Topic Message Streams + * @param nMessagesPerThread an optional field to specify the exact number of messages to be returned. + * ConsumerTimeoutException will be thrown if there are no messages to be consumed. + * If not specified, then all available messages will be consumed, and no exception is thrown. + * + * + * @return the list of messages consumed. + */ + def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]], + nMessagesPerThread: Int = -1): List[String] = { + var messages: List[String] = Nil + val shouldGetAllMessages = nMessagesPerThread < 0 for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { - val iterator = messageStream.iterator - for (i <- 0 until nMessagesPerThread) { - assertTrue(iterator.hasNext) - val message = iterator.next.message - messages ::= message - debug("received message: " + message) + val iterator = messageStream.iterator() + try { + var i = 0 + while ((shouldGetAllMessages && iterator.hasNext()) || (i < nMessagesPerThread)) { + assertTrue(iterator.hasNext) + val message = iterator.next.message // will throw a timeout exception if the message isn't there + messages ::= message + debug("received message: " + message) + i += 1 + } + } catch { + case e: ConsumerTimeoutException => + if (shouldGetAllMessages) { + // swallow the exception + debug("consumer timed out after receiving " + messages.length + " message(s).") + } else { + throw e + } } } } + messages.reverse } + def verifyTopicDeletion(zkClient: ZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) { val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _)) // wait until admin path for delete topic is deleted, signaling completion of topic deletion -- 2.2.0