From 6c10d7ca4ca8a3800a4aef46a2fb219d2f22affa Mon Sep 17 00:00:00 2001 From: futtre Date: Sun, 1 Mar 2015 15:39:13 -0800 Subject: [PATCH] KAFKA-1416 Unify sendMessages/getMessages in unit tests Unified get and send messages in TestUtils.scala and its users --- .../consumer/ZookeeperConsumerConnectorTest.scala | 20 +++--- .../scala/unit/kafka/integration/FetcherTest.scala | 15 ++-- .../integration/UncleanLeaderElectionTest.scala | 29 +++----- .../consumer/ZookeeperConsumerConnectorTest.scala | 44 ++++-------- .../scala/unit/kafka/metrics/MetricsTest.scala | 4 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 81 +++++++++++++++------- 6 files changed, 96 insertions(+), 97 deletions(-) diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index a17e853..a3342cd 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -80,7 +80,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar // also the iterator should support re-entrant, so loop it twice for (i <- 0 until 2) { try { - getMessages(nMessages*2, topicMessageStreams0) + getMessages(topicMessageStreams0, nMessages * 2) fail("should get an exception") } catch { case e: ConsumerTimeoutException => // this is ok @@ -106,7 +106,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2) assertEquals(sentMessages1.sorted, receivedMessages1.sorted) // also check partition ownership @@ -131,7 +131,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages2.sorted, receivedMessages2.sorted) // also check partition ownership @@ -152,7 +152,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages3.sorted, receivedMessages3.sorted) // also check partition ownership @@ -194,7 +194,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2) assertEquals(sentMessages1.sorted, receivedMessages1.sorted) // also check partition ownership @@ -219,7 +219,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages2.sorted, receivedMessages2.sorted) // also check partition ownership @@ -240,7 +240,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages3.sorted, receivedMessages3.sorted) // also check partition ownership @@ -265,7 +265,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages = getMessages(400, topicMessageStreams1) + val receivedMessages = getMessages(topicMessageStreams1, 400) assertEquals(sentMessages.sorted, receivedMessages.sorted) // also check partition ownership @@ -323,7 +323,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) // send some messages to each broker - val sentMessages1 = sendMessages(configs, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1) + val sentMessages1 = sendMessages(configs, topic, nMessages) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -341,7 +341,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val expected_1 = List( ("0", "group1_consumer1-0")) assertEquals(expected_1, actual_1) - val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages) assertEquals(sentMessages1, receivedMessages1) zkConsumerConnector1.shutdown() zkClient.close() diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 25845ab..8fd6482 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -80,17 +80,10 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { def assertQueueEmpty(): Unit = assertEquals(0, queue.size) def sendMessages(messagesPerNode: Int): Int = { - var count = 0 - for(conf <- configs) { - val producer: Producer[String, Array[Byte]] = TestUtils.createProducer( - TestUtils.getBrokerListStrFromConfigs(configs), - keyEncoder = classOf[StringEncoder].getName) - val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray - messages += conf.brokerId -> ms - producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*) - producer.close() - count += ms.size - } + + val messages = TestUtils.sendMessages(configs, topic, messagesPerNode) + val count = messages.size + count } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index ba3bcdc..32a7405 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -29,7 +29,7 @@ import kafka.admin.AdminUtils import kafka.common.FailedToSendMessageException import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} import kafka.producer.{KeyedMessage, Producer} -import kafka.serializer.{DefaultEncoder, StringEncoder} +import kafka.serializer.{StringDecoder, DefaultEncoder, StringEncoder} import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.Utils import kafka.utils.TestUtils._ @@ -253,11 +253,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { } private def produceMessage(topic: String, message: String) = { - val producer: Producer[String, Array[Byte]] = createProducer( - getBrokerListStrFromConfigs(configs), - keyEncoder = classOf[StringEncoder].getName) - producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) - producer.close() + sendMessage(configs, topic, message) } private def consumeAllMessages(topic: String) : List[String] = { @@ -265,20 +261,11 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // resetting the ZK offset val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id", 1000) val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) - val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head - - val messages = new MutableList[String] - val iter = messageStream.iterator - try { - while(iter.hasNext()) { - messages += new String(iter.next.message) // will throw a timeout exception if the message isn't there - } - } catch { - case e: ConsumerTimeoutException => - debug("consumer timed out after receiving " + messages.length + " message(s).") - } finally { - consumerConnector.shutdown - } - messages.toList + val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + + val messages = getMessages(messageStream) + consumerConnector.shutdown + + messages } } diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index d6248b0..ccba710 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -60,7 +60,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar TestUtils.createTopic(zkClient, topic, numParts, 1, servers) // send some messages to each broker - val sentMessages1 = sendMessages(nMessages, "batch1") + val sentMessages1 = sendMessages(configs, nMessages, "batch1") // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1)) @@ -82,51 +82,37 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.ERROR) } - def sendMessages(conf: KafkaConfig, + def sendMessages(configs: Seq[KafkaConfig], messagesPerNode: Int, header: String, - compressed: CompressionCodec): List[String] = { + compressed: CompressionCodec = NoCompressionCodec): List[String] = { var messages: List[String] = Nil val producer: kafka.producer.Producer[Int, String] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[IntEncoder].getName) + val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer) - for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) - messages ++= ms - import JavaConversions._ - javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]]) + for(conf <- configs) { + for (partition <- 0 until numParts) { + val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) + messages ++= ms + import JavaConversions._ + javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]]) + } } javaProducer.close messages } - def sendMessages(messagesPerNode: Int, - header: String, - compressed: CompressionCodec = NoCompressionCodec): List[String] = { - var messages: List[String] = Nil - for(conf <- configs) - messages ++= sendMessages(conf, messagesPerNode, header, compressed) - messages - } - def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { + var messages: List[String] = Nil import scala.collection.JavaConversions._ - val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams - for ((topic, messageStreams) <- topicMessageStreams) { - for (messageStream <- messageStreams) { - val iterator = messageStream.iterator - for (i <- 0 until nMessagesPerThread) { - assertTrue(iterator.hasNext) - val message = iterator.next.message - messages ::= message - debug("received message: " + message) - } - } - } + val topicMessageStreams = jTopicMessageStreams.mapValues(_.toList) + messages = TestUtils.getMessages(topicMessageStreams, nMessagesPerThread) + messages } diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 111e4a2..48f1974 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -75,12 +75,12 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging { } def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = { - val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1) + val sentMessages1 = sendMessages(configs, topic, nMessages) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages) zkConsumerConnector1.shutdown() } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 6ce1807..e0da683 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -35,7 +35,7 @@ import kafka.producer._ import kafka.message._ import kafka.api._ import kafka.cluster.Broker -import kafka.consumer.{KafkaStream, ConsumerConfig} +import kafka.consumer.{ConsumerTimeoutException, KafkaStream, ConsumerConfig} import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition import kafka.admin.AdminUtils @@ -756,25 +756,19 @@ object TestUtils extends Logging { def sendMessages(configs: Seq[KafkaConfig], topic: String, - producerId: String, - messagesPerNode: Int, - header: String, - compression: CompressionCodec, - numParts: Int): List[String]= { + numMessages: Int, + numParts: Int = 1, + compression: CompressionCodec = NoCompressionCodec): List[String] = { + var messages: List[String] = Nil - val props = new Properties() - props.put("compression.codec", compression.codec.toString) - props.put("client.id", producerId) - val producer: Producer[Int, String] = + val producer: Producer[String, String] = createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName, - partitioner = classOf[FixedValuePartitioner].getName, - producerProps = props) + encoder = classOf[StringEncoder].getName(), + keyEncoder = classOf[StringEncoder].getName()) for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x) - producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) + val ms = 0.until(numMessages).map(x => partition + "-" + x) + producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)):_*) messages ++= ms debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition)) } @@ -782,23 +776,62 @@ object TestUtils extends Logging { messages } - def getMessages(nMessagesPerThread: Int, - topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String] = { + def sendMessage(configs: Seq[KafkaConfig], + topic: String, + message: String) = { + + val producer: Producer[String, String] = + createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName(), + keyEncoder = classOf[StringEncoder].getName()) + + producer.send(new KeyedMessage[String, String](topic, topic, message)) + producer.close() + } + + /** + * Consume all messages (or a strict number of messages) + * @param topicMessageStreams the Topic Message Streams + * @param nMessagesPerThread an optional field to specify the exact number of messages to be returned. + * ConsumerTimeoutException will be thrown if there are no messages to be consumed. + * If not specified, then all available messages will be consumed, and no exception is thrown. + * + * + * @return the list of messages consumed. + */ + def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]], + nMessagesPerThread: Int = -1): List[String] = { + var messages: List[String] = Nil + val shouldGetAllMessages = nMessagesPerThread < 0 for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { - val iterator = messageStream.iterator - for (i <- 0 until nMessagesPerThread) { - assertTrue(iterator.hasNext) - val message = iterator.next.message - messages ::= message - debug("received message: " + message) + val iterator = messageStream.iterator() + try { + var i = 0 + while ((shouldGetAllMessages && iterator.hasNext()) || (i < nMessagesPerThread)) { + assertTrue(iterator.hasNext) + val message = iterator.next.message // will throw a timeout exception if the message isn't there + messages ::= message + debug("received message: " + message) + i += 1 + } + } catch { + case e: ConsumerTimeoutException => + if (shouldGetAllMessages) { + // swallow the exception + debug("consumer timed out after receiving " + messages.length + " message(s).") + } else { + throw e + } } } } + messages.reverse } + def verifyTopicDeletion(zkClient: ZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) { val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _)) // wait until admin path for delete topic is deleted, signaling completion of topic deletion -- 2.2.0