From e419a0d6fdaf66000a46e67bee3207eee51d261b Mon Sep 17 00:00:00 2001 From: futtre Date: Sun, 1 Mar 2015 15:39:13 -0800 Subject: [PATCH 1/3] KAFKA-1416 Unify sendMessages/getMessages in unit tests Unified get and send messages in TestUtils.scala and its users --- .../consumer/ZookeeperConsumerConnectorTest.scala | 58 +++++----- .../scala/unit/kafka/integration/FetcherTest.scala | 13 +-- .../integration/UncleanLeaderElectionTest.scala | 45 +++----- .../consumer/ZookeeperConsumerConnectorTest.scala | 44 +++----- .../scala/unit/kafka/metrics/MetricsTest.scala | 4 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 122 +++++++++++++-------- 6 files changed, 135 insertions(+), 151 deletions(-) diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index f3e76db..7f9fca3 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -79,7 +79,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar // also the iterator should support re-entrant, so loop it twice for (i <- 0 until 2) { try { - getMessages(nMessages*2, topicMessageStreams0) + getMessages(topicMessageStreams0, nMessages * 2) fail("should get an exception") } catch { case e: ConsumerTimeoutException => // this is ok @@ -90,8 +90,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar zkConsumerConnector0.shutdown // send some messages to each broker - val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages) ++ - sendMessagesToPartition(servers, topic, 1, nMessages) + val sentMessages1 = sendMessages(servers, topic, nMessages, 0) ++ + sendMessages(servers, topic, nMessages, 1) // wait to make sure the topic and partition have a leader for the successful case waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) @@ -105,7 +105,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2) assertEquals(sentMessages1.sorted, receivedMessages1.sorted) // also check partition ownership @@ -124,13 +124,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages) ++ - sendMessagesToPartition(servers, topic, 1, nMessages) + val sentMessages2 = sendMessages(servers, topic, nMessages, 0) ++ + sendMessages(servers, topic, nMessages, 1) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages2.sorted, receivedMessages2.sorted) // also check partition ownership @@ -145,13 +145,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) // send some messages to each broker - val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages) ++ - sendMessagesToPartition(servers, topic, 1, nMessages) + val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++ + sendMessages(servers, topic, nMessages, 1) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages3.sorted, receivedMessages3.sorted) // also check partition ownership @@ -179,8 +179,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages1 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++ + sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -193,7 +193,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2) assertEquals(sentMessages1.sorted, receivedMessages1.sorted) // also check partition ownership @@ -212,13 +212,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages2 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++ + sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages2.sorted, receivedMessages2.sorted) // also check partition ownership @@ -233,13 +233,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++ + sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages3.sorted, receivedMessages3.sorted) // also check partition ownership @@ -255,8 +255,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testCompressionSetConsumption() { // send some messages to each broker - val sentMessages = sendMessagesToPartition(servers, topic, 0, 200, DefaultCompressionCodec) ++ - sendMessagesToPartition(servers, topic, 1, 200, DefaultCompressionCodec) + val sentMessages = sendMessages(servers, topic, 200, 0, DefaultCompressionCodec) ++ + sendMessages(servers, topic, 200, 1, DefaultCompressionCodec) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -264,7 +264,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages = getMessages(400, topicMessageStreams1) + val receivedMessages = getMessages(topicMessageStreams1, 400) assertEquals(sentMessages.sorted, receivedMessages.sorted) // also check partition ownership @@ -281,8 +281,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages = sendMessagesToPartition(servers, topic, 0, nMessages, NoCompressionCodec) ++ - sendMessagesToPartition(servers, topic, 1, nMessages, NoCompressionCodec) + val sentMessages = sendMessages(servers, topic, nMessages, 0, NoCompressionCodec) ++ + sendMessages(servers, topic, nMessages, 1, NoCompressionCodec) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -322,7 +322,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) // send some messages to each broker - val sentMessages1 = sendMessages(servers, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1) + val sentMessages1 = sendMessages(servers, topic, nMessages) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -340,7 +340,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val expected_1 = List( ("0", "group1_consumer1-0")) assertEquals(expected_1, actual_1) - val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages) assertEquals(sentMessages1, receivedMessages1) zkConsumerConnector1.shutdown() zkClient.close() @@ -348,8 +348,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testConsumerRebalanceListener() { // Send messages to create topic - sendMessagesToPartition(servers, topic, 0, nMessages) - sendMessagesToPartition(servers, topic, 1, nMessages) + sendMessages(servers, topic, nMessages, 0) + sendMessages(servers, topic, nMessages, 1) val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) @@ -385,7 +385,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // Consume messages from consumer 1 to make sure it has finished rebalance - getMessages(nMessages, topicMessageStreams1) + getMessages(topicMessageStreams1, nMessages) val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir) val expected_2 = List(("0", "group1_consumer1-0"), diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 0dc837a..638c578 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -78,18 +78,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { def assertQueueEmpty(): Unit = assertEquals(0, queue.size) def sendMessages(messagesPerNode: Int): Int = { - var count = 0 - for(conf <- configs) { - val producer: Producer[String, Array[Byte]] = TestUtils.createProducer( - TestUtils.getBrokerListStrFromServers(servers), - keyEncoder = classOf[StringEncoder].getName) - val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray - messages += conf.brokerId -> ms - producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*) - producer.close() - count += ms.size - } - count + TestUtils.sendMessages(servers, topic, messagesPerNode).size } def fetch(expected: Int) { diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index a130089..5b7b529 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -29,7 +29,7 @@ import kafka.admin.AdminUtils import kafka.common.FailedToSendMessageException import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} import kafka.producer.{KeyedMessage, Producer} -import kafka.serializer.StringEncoder +import kafka.serializer.StringDecoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.CoreUtils import kafka.utils.TestUtils._ @@ -175,14 +175,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(topic, "first") + sendMessage(servers, topic, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - produceMessage(topic, "second") + sendMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -192,7 +192,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // wait until new leader is (uncleanly) elected waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId)) - produceMessage(topic, "third") + sendMessage(servers, topic, "third") // second message was lost due to unclean election assertEquals(List("first", "third"), consumeAllMessages(topic)) @@ -210,14 +210,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(topic, "first") + sendMessage(servers, topic, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - produceMessage(topic, "second") + sendMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -229,7 +229,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // message production and consumption should both fail while leader is down intercept[FailedToSendMessageException] { - produceMessage(topic, "third") + sendMessage(servers, topic, "third") } assertEquals(List.empty[String], consumeAllMessages(topic)) @@ -237,7 +237,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup()) waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId)) - produceMessage(topic, "third") + sendMessage(servers, topic, "third") waitUntilMetadataIsPropagated(servers, topic, partitionId) servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) @@ -253,33 +253,16 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { server.awaitShutdown() } - private def produceMessage(topic: String, message: String) = { - val producer: Producer[String, Array[Byte]] = createProducer( - getBrokerListStrFromServers(servers), - keyEncoder = classOf[StringEncoder].getName) - producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) - producer.close() - } - private def consumeAllMessages(topic: String) : List[String] = { // use a fresh consumer group every time so that we don't need to mess with disabling auto-commit or // resetting the ZK offset val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id", 1000) val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) - val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head - - val messages = new MutableList[String] - val iter = messageStream.iterator - try { - while(iter.hasNext()) { - messages += new String(iter.next.message) // will throw a timeout exception if the message isn't there - } - } catch { - case e: ConsumerTimeoutException => - debug("consumer timed out after receiving " + messages.length + " message(s).") - } finally { - consumerConnector.shutdown - } - messages.toList + val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + + val messages = getMessages(messageStream) + consumerConnector.shutdown + + messages } } diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index ad66bb2..cb83270 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -60,7 +60,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar TestUtils.createTopic(zkClient, topic, numParts, 1, servers) // send some messages to each broker - val sentMessages1 = sendMessages(nMessages, "batch1") + val sentMessages1 = sendMessages(configs, nMessages, "batch1") // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -82,51 +82,37 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.ERROR) } - def sendMessages(conf: KafkaConfig, + def sendMessages(configs: Seq[KafkaConfig], messagesPerNode: Int, header: String, - compressed: CompressionCodec): List[String] = { + compressed: CompressionCodec = NoCompressionCodec): List[String] = { var messages: List[String] = Nil val producer: kafka.producer.Producer[Int, String] = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[IntEncoder].getName) + val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer) - for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) - messages ++= ms - import JavaConversions._ - javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]]) + for(conf <- configs) { + for (partition <- 0 until numParts) { + val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) + messages ++= ms + import JavaConversions._ + javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]]) + } } javaProducer.close messages } - def sendMessages(messagesPerNode: Int, - header: String, - compressed: CompressionCodec = NoCompressionCodec): List[String] = { - var messages: List[String] = Nil - for(conf <- configs) - messages ++= sendMessages(conf, messagesPerNode, header, compressed) - messages - } - def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { + var messages: List[String] = Nil import scala.collection.JavaConversions._ - val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams - for ((topic, messageStreams) <- topicMessageStreams) { - for (messageStream <- messageStreams) { - val iterator = messageStream.iterator - for (i <- 0 until nMessagesPerThread) { - assertTrue(iterator.hasNext) - val message = iterator.next.message - messages ::= message - debug("received message: " + message) - } - } - } + val topicMessageStreams = jTopicMessageStreams.mapValues(_.toList) + messages = TestUtils.getMessages(topicMessageStreams, nMessagesPerThread) + messages } diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 247a6e9..b42101b 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -77,12 +77,12 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging { } def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = { - val sentMessages1 = sendMessages(servers, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1) + val sentMessages1 = sendMessages(servers, topic, nMessages) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages) zkConsumerConnector1.shutdown() } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 5a9e84d..7ca83b4 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -36,7 +36,7 @@ import kafka.producer._ import kafka.message._ import kafka.api._ import kafka.cluster.Broker -import kafka.consumer.{KafkaStream, ConsumerConfig} +import kafka.consumer.{ConsumerTimeoutException, KafkaStream, ConsumerConfig} import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition import kafka.admin.AdminUtils @@ -745,71 +745,97 @@ object TestUtils extends Logging { time = time, brokerState = new BrokerState()) } - - def sendMessagesToPartition(servers: Seq[KafkaServer], - topic: String, - partition: Int, - numMessages: Int, - compression: CompressionCodec = NoCompressionCodec): List[String] = { + def sendMessages(servers: Seq[KafkaServer], + topic: String, + numMessages: Int, + partition: Int = -1, + compression: CompressionCodec = NoCompressionCodec): List[String] = { val header = "test-%d".format(partition) val props = new Properties() props.put("compression.codec", compression.codec.toString) - val producer: Producer[Int, String] = - createProducer(TestUtils.getBrokerListStrFromServers(servers), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName, - partitioner = classOf[FixedValuePartitioner].getName, - producerProps = props) - val ms = 0.until(numMessages).map(x => header + "-" + x) - producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) - debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition)) - producer.close() - ms.toList - } - def sendMessages(servers: Seq[KafkaServer], - topic: String, - producerId: String, - messagesPerNode: Int, - header: String, - compression: CompressionCodec, - numParts: Int): List[String]= { - var messages: List[String] = Nil - val props = new Properties() - props.put("compression.codec", compression.codec.toString) - props.put("client.id", producerId) - val producer: Producer[Int, String] = - createProducer(brokerList = TestUtils.getBrokerListStrFromServers(servers), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName, - partitioner = classOf[FixedValuePartitioner].getName, - producerProps = props) + // Specific Partition + if (partition >= 0) { + val producer: Producer[Int, String] = + createProducer(TestUtils.getBrokerListStrFromServers(servers), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName, + partitioner = classOf[FixedValuePartitioner].getName, + producerProps = props) - for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x) producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) - messages ++= ms debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition)) + producer.close() + ms.toList + } else { + // Use topic as the key to determine the partition + val producer: Producer[String, String] = createProducer( + TestUtils.getBrokerListStrFromServers(servers), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[DefaultPartitioner].getName, + producerProps = props) + producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)):_*) + producer.close() + debug("Sent %d messages for topic [%s]".format(ms.size, topic)) + ms.toList } + + } + + def sendMessage(servers: Seq[KafkaServer], + topic: String, + message: String) = { + + val producer: Producer[String, String] = + createProducer(TestUtils.getBrokerListStrFromServers(servers), + encoder = classOf[StringEncoder].getName(), + keyEncoder = classOf[StringEncoder].getName()) + + producer.send(new KeyedMessage[String, String](topic, topic, message)) producer.close() - messages } - def getMessages(nMessagesPerThread: Int, - topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String] = { + /** + * Consume all messages (or a strict number of messages) + * @param topicMessageStreams the Topic Message Streams + * @param nMessagesPerThread an optional field to specify the exact number of messages to be returned. + * ConsumerTimeoutException will be thrown if there are no messages to be consumed. + * If not specified, then all available messages will be consumed, and no exception is thrown. + * + * + * @return the list of messages consumed. + */ + def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]], + nMessagesPerThread: Int = -1): List[String] = { + var messages: List[String] = Nil + val shouldGetAllMessages = nMessagesPerThread < 0 for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { - val iterator = messageStream.iterator - for (i <- 0 until nMessagesPerThread) { - assertTrue(iterator.hasNext) - val message = iterator.next.message - messages ::= message - debug("received message: " + message) + val iterator = messageStream.iterator() + try { + var i = 0 + while ((shouldGetAllMessages && iterator.hasNext()) || (i < nMessagesPerThread)) { + assertTrue(iterator.hasNext) + val message = iterator.next.message // will throw a timeout exception if the message isn't there + messages ::= message + debug("received message: " + message) + i += 1 + } + } catch { + case e: ConsumerTimeoutException => + if (shouldGetAllMessages) { + // swallow the exception + debug("consumer timed out after receiving " + messages.length + " message(s).") + } else { + throw e + } } } } + messages.reverse } -- 2.2.0 From 41d7a75e88aef233cbdffa7ee66f8ce556f93888 Mon Sep 17 00:00:00 2001 From: Flutra Osmani Date: Fri, 10 Apr 2015 18:11:30 -0700 Subject: [PATCH 2/3] changing sendMessages() signature from KafkaConfig to KafkaServer --- .../scala/unit/kafka/integration/FetcherTest.scala | 10 ++++----- .../consumer/ZookeeperConsumerConnectorTest.scala | 26 +++++++++------------- .../test/scala/unit/kafka/utils/TestUtils.scala | 4 ++-- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 638c578..a19eede 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -66,20 +66,20 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { def testFetcher() { val perNode = 2 - var count = sendMessages(perNode) + var count = TestUtils.sendMessages(servers, topic, perNode).size fetch(count) assertQueueEmpty() - count = sendMessages(perNode) + count = TestUtils.sendMessages(servers, topic, perNode).size fetch(count) assertQueueEmpty() } def assertQueueEmpty(): Unit = assertEquals(0, queue.size) - def sendMessages(messagesPerNode: Int): Int = { - TestUtils.sendMessages(servers, topic, messagesPerNode).size - } +// def sendMessages(messagesPerNode: Int): Int = { +// TestUtils.sendMessages(servers, topic, messagesPerNode).size +// } def fetch(expected: Int) { var count = 0 diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index cb83270..74c761d 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -60,7 +60,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar TestUtils.createTopic(zkClient, topic, numParts, 1, servers) // send some messages to each broker - val sentMessages1 = sendMessages(configs, nMessages, "batch1") + val sentMessages1 = sendMessages(servers, nMessages, "batch1") // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -82,37 +82,33 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.ERROR) } - def sendMessages(configs: Seq[KafkaConfig], + def sendMessages(servers: Seq[KafkaServer], messagesPerNode: Int, - header: String, - compressed: CompressionCodec = NoCompressionCodec): List[String] = { + header: String): List[String] = { var messages: List[String] = Nil - val producer: kafka.producer.Producer[Int, String] = - TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName) - - val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer) - for(conf <- configs) { + for(server <- servers) { + val producer: kafka.producer.Producer[Int, String] = + TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName) + val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer) for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) + val ms = 0.until(messagesPerNode).map(x => header + server.config.brokerId + "-" + partition + "-" + x) messages ++= ms import JavaConversions._ javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]]) } + javaProducer.close } - javaProducer.close messages } def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { - var messages: List[String] = Nil import scala.collection.JavaConversions._ val topicMessageStreams = jTopicMessageStreams.mapValues(_.toList) messages = TestUtils.getMessages(topicMessageStreams, nMessagesPerThread) - messages } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 7ca83b4..8dc99b6 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -769,7 +769,7 @@ object TestUtils extends Logging { producer.close() ms.toList } else { - // Use topic as the key to determine the partition + // Use topic as the key to determine partition val producer: Producer[String, String] = createProducer( TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, @@ -798,7 +798,7 @@ object TestUtils extends Logging { } /** - * Consume all messages (or a strict number of messages) + * Consume all messages (or a specific number of messages) * @param topicMessageStreams the Topic Message Streams * @param nMessagesPerThread an optional field to specify the exact number of messages to be returned. * ConsumerTimeoutException will be thrown if there are no messages to be consumed. -- 2.2.0 From bceff2ac6e4f03e67cfeaa6aa36db94d5646130a Mon Sep 17 00:00:00 2001 From: Flutra Osmani Date: Fri, 10 Apr 2015 18:34:22 -0700 Subject: [PATCH 3/3] KAFKA-1416 Unify sendMessages/getMessages in unit tests Unified get and send messages in TestUtils.scala and its users --- core/src/test/scala/unit/kafka/integration/FetcherTest.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index a19eede..facebd8 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -77,10 +77,6 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { def assertQueueEmpty(): Unit = assertEquals(0, queue.size) -// def sendMessages(messagesPerNode: Int): Int = { -// TestUtils.sendMessages(servers, topic, messagesPerNode).size -// } - def fetch(expected: Int) { var count = 0 while(true) { -- 2.2.0