diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 686a0df..bdc72ea 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -57,7 +57,7 @@ object TopicCommand { else if(opts.options.has(opts.describeOpt)) describeTopic(zkClient, opts) } catch { - case e => + case e: Throwable => println("Error while executing topic command " + e.getMessage) println(Utils.stackTrace(e)) } finally { diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index c95c650..919aeb2 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -132,7 +132,7 @@ class RequestSendThread(val controllerId: Int, channel.send(request) isSendSuccessful = true } catch { - case e => // if the send was not successful, reconnect to broker and resend the message + case e: Throwable => // if the send was not successful, reconnect to broker and resend the message error(("Controller %d epoch %d failed to send %s request with correlation id %s to broker %s. " + "Reconnecting to broker.").format(controllerId, controllerContext.epoch, RequestKeys.nameForKey(request.requestId.get), request.correlationId, toBroker.toString()), e) @@ -173,7 +173,7 @@ class RequestSendThread(val controllerId: Int, channel.connect() info("Controller %d connected to %s for sending state change requests".format(controllerId, broker.toString())) } catch { - case e => { + case e: Throwable => { channel.disconnect() error("Controller %d's connection to broker %s was unsuccessful".format(controllerId, broker.toString()), e) } diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index a649461..91f0728 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -302,7 +302,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } else isMessageInAllReplicas = false } catch { - case t => + case t: Throwable => throw new RuntimeException("Error in processing replica %d in partition %s at offset %d." .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) } diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 440aed8..fcd5eee 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -63,33 +63,11 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { servers ++= List(server1, server2, server3, server4) brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port)) - // create topics with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic1, Map(0->Seq(0,1))) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic2, Map(0->Seq(1,2))) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic3, Map(0->Seq(2,3,0,1))) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic4, Map(0->Seq(0,3))) - - - // wait until leader is elected - var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId) - var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId) - var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId) - var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId) - - debug("Leader for " + topic1 + " is elected to be: %s".format(leader1.getOrElse(-1))) - debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1))) - debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1))) - debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1))) - - assertTrue("Leader should get elected", leader1.isDefined) - assertTrue("Leader should get elected", leader2.isDefined) - assertTrue("Leader should get elected", leader3.isDefined) - assertTrue("Leader should get elected", leader4.isDefined) - - assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) - assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2)) - assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3)) - assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3)) + // create topics first + createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) + createTopic(zkClient, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers) + createTopic(zkClient, topic3, partitionReplicaAssignment = Map(0->Seq(2,3,0,1)), servers = servers) + createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers) } override def tearDown() { @@ -129,8 +107,8 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(leader2.get, leader2FromZk) // read metadata from a broker and verify the new topic partitions exist - TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2, 5000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1) + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2) val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", 2000,0).topicsMetadata val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) @@ -154,8 +132,8 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(leader2.get, leader2FromZk) // read metadata from a broker and verify the new topic partitions exist - TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2, 5000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1) + TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2) val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas", 2000,0).topicsMetadata val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2)) @@ -173,12 +151,12 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.addPartitions(zkClient, topic3, 7) // read metadata from a broker and verify the new topic partitions exist - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6, 5000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6) val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement", 2000,0).topicsMetadata diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 8991050..3a6c5ff 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -310,8 +310,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition, 1000) + TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers) val controllerId = ZkUtils.getController(zkClient) val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 965099a..151ba7c 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -20,7 +20,6 @@ package kafka.consumer import java.util.concurrent._ import java.util.concurrent.atomic._ -import java.util.Properties import scala.collection._ import junit.framework.Assert._ @@ -28,7 +27,6 @@ import kafka.message._ import kafka.server._ import kafka.utils.TestUtils._ import kafka.utils._ -import kafka.admin.AdminUtils import org.junit.Test import kafka.serializer._ import kafka.cluster.{Broker, Cluster} @@ -61,8 +59,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { override def setUp() { super.setUp - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)), new Properties) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) } @Test diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index e93305a..40a25a2 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -25,7 +25,6 @@ import scala.collection._ import org.scalatest.junit.JUnit3Suite import kafka.message._ import kafka.serializer._ -import kafka.admin.AdminUtils import org.I0Itec.zkclient.ZkClient import kafka.utils._ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} @@ -97,8 +96,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -176,8 +175,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) // create a consumer val consumerConfig1 = new ConsumerConfig( @@ -249,8 +248,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++ sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) @@ -275,8 +274,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++ sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -310,13 +309,11 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer) // create topic topic1 with 1 partition on broker 0 - AdminUtils.createTopic(zkClient, topic, 1, 1) + createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) // send some messages to each broker val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 1415773..7125ec9 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -109,7 +109,7 @@ kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow FAILED for(i <- 0 until numMessages) producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes)) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) // update offset in zookeeper for consumer to jump "forward" in time val dirs = new ZKGroupTopicDirs(group, topic) diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 9e1a3b7..4075068 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -30,7 +30,6 @@ import kafka.serializer._ import kafka.producer.{KeyedMessage, Producer} import kafka.utils.TestUtils._ import kafka.utils.TestUtils -import kafka.admin.AdminUtils class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { @@ -55,8 +54,8 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { override def setUp() { super.setUp - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(configs.head.brokerId))) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) + fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient) fetcher.stopConnections() fetcher.startConnections(topicInfos, cluster) diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 60a466f..6d489ad 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -209,9 +209,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testConsumerEmptyTopic() { val newTopic = "new-topic" - AdminUtils.createTopic(zkClient, newTopic, 1, 1) - TestUtils.waitUntilMetadataIsPropagated(servers, newTopic, 0, 1000) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0) + TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers) + val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build()) assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) } diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index 3346156..5eee08a 100644 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -21,12 +21,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import junit.framework.Assert._ -import kafka.utils.{ZkUtils, Utils, TestUtils} -import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch, ControllerChannelManager} -import kafka.cluster.Broker -import kafka.common.ErrorMapping -import kafka.api._ -import kafka.admin.AdminUtils +import kafka.utils.{Utils, TestUtils} import kafka.server.{KafkaConfig, KafkaServer} class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -80,31 +75,10 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { val topic4 = "new-topic4" // create topics with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic1, Map(0->Seq(0,1))) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic2, Map(0->Seq(1,2))) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic3, Map(0->Seq(2,3))) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic4, Map(0->Seq(0,3))) - - // wait until leader is elected - var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId) - var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId) - var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId) - var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId) - - debug("Leader for " + topic1 + " is elected to be: %s".format(leader1.getOrElse(-1))) - debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1))) - debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1))) - debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1))) - - assertTrue("Leader should get elected", leader1.isDefined) - assertTrue("Leader should get elected", leader2.isDefined) - assertTrue("Leader should get elected", leader3.isDefined) - assertTrue("Leader should get elected", leader4.isDefined) - - assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) - assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2)) - assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3)) - assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3)) + createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) + createTopic(zkClient, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers) + createTopic(zkClient, topic3, partitionReplicaAssignment = Map(0->Seq(2,3)), servers = servers) + createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers) // Do a rolling bounce and check if leader transitions happen correctly diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 761f759..35dc071 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -65,9 +65,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testBasicTopicMetadata { // create topic val topic = "test" - AdminUtils.createTopic(zkClient, topic, 1, 1) - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0, 1000) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) + var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) @@ -84,12 +83,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic val topic1 = "testGetAllTopicMetadata1" val topic2 = "testGetAllTopicMetadata2" - AdminUtils.createTopic(zkClient, topic1, 1, 1) - AdminUtils.createTopic(zkClient, topic2, 1, 1) - - // wait for leader to be elected for both topics - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0, 1000) - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic2, 0, 1000) + createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) + createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) // issue metadata request with empty list of topics var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata", @@ -120,7 +115,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { // wait for leader to be elected TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0) // retry the metadata for the auto created topic topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 1bf9462..1b11eb6 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -173,7 +173,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { debug("Follower for " + topic + " is: %s".format(followerId)) produceMessage(topic, "first") - waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) + waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server @@ -208,7 +208,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { debug("Follower for " + topic + " is: %s".format(followerId)) produceMessage(topic, "first") - waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) + waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server @@ -235,7 +235,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId)) produceMessage(topic, "third") - waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) + waitUntilMetadataIsPropagated(servers, topic, partitionId) servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) // verify clean leader transition to ISR follower diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index bdc6f01..6c3feac 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -169,7 +169,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val broker1 = new Broker(0, "localhost", 9092) val broker2 = new Broker(1, "localhost", 9093) - broker1 + // form expected partitions metadata val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2)) val partition2Metadata = new PartitionMetadata(1, Some(broker2), List(broker1, broker2)) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 439e33e..c1219a8 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -86,10 +86,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @Test def testUpdateBrokerPartitionInfo() { val topic = "new-topic" - AdminUtils.createTopic(zkClient, topic, 1, 2) - // wait until the update metadata request for new topic reaches all servers - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 500) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers) val props1 = new util.Properties() props1.put("metadata.broker.list", "localhost:80,localhost:81") @@ -152,9 +149,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val topic = "new-topic" // create topic with 1 partition and await leadership - AdminUtils.createTopic(zkClient, topic, 1, 2) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers) val producer1 = new Producer[String, String](producerConfig1) val producer2 = new Producer[String, String](producerConfig2) @@ -183,7 +178,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ fail("Should have timed out for 3 acks.") } catch { - case se: FailedToSendMessageException => true + case se: FailedToSendMessageException => + // this is expected case e: Throwable => fail("Not expected", e) } finally { @@ -203,13 +199,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val topic = "new-topic" // create topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0))) - // waiting for 1 partition is enough - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 2) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 3) + TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)), + servers = servers) val config = new ProducerConfig(props) val producer = new Producer[String, String](config) @@ -266,9 +257,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val topic = "new-topic" // create topics in ZK - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1))) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) // do a simple test to make sure plumbing is okay try { diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 4840824..0dec9ec 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -92,8 +92,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val props = TestUtils.getSyncProducerConfig(server.socketServer.port) val producer = new SyncProducer(new SyncProducerConfig(props)) - AdminUtils.createTopic(zkClient, "test", 1, 1) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0) + TestUtils.createTopic(zkClient, "test", numPartitions = 1, replicationFactor = 1, servers = servers) val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1)) val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1) diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 5136fbe..b278bb6 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -19,7 +19,6 @@ package kafka.server import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness -import kafka.admin.AdminUtils import kafka.utils.TestUtils._ import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} @@ -61,10 +60,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val partitionId = 0 // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(0, 1))) + val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0) - // wait until leader is elected - val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId) debug("leader Epoc: " + leaderEpoch1) debug("Leader is elected to be: %s".format(leader1.getOrElse(-1))) @@ -108,10 +105,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val partitionId = 0 // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(0, 1))) + val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0) - // wait until leader is elected - val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId) debug("leader Epoc: " + leaderEpoch1) debug("Leader is elected to be: %s".format(leader1.getOrElse(-1))) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 76ae659..3fb08e6 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -22,7 +22,6 @@ import kafka.utils._ import junit.framework.Assert._ import java.util.{Random, Properties} import kafka.consumer.SimpleConsumer -import org.junit.{After, Before, Test} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite @@ -30,8 +29,6 @@ import kafka.admin.AdminUtils import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} import kafka.utils.TestUtils._ import kafka.common.{ErrorMapping, TopicAndPartition} -import kafka.utils.nonthreadsafe -import kafka.utils.threadsafe import org.junit.After import org.junit.Before import org.junit.Test @@ -123,8 +120,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val topic = topicPartition.split("-").head // setup brokers in zookeeper as owners of partitions for this test - AdminUtils.createTopic(zkClient, topic, 1, 1) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) var offsetChanged = false for(i <- 1 to 14) { diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index ddb2402..7a0ef6f 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -19,7 +19,6 @@ package kafka.server import org.scalatest.junit.JUnit3Suite import org.junit.Assert._ import java.io.File -import kafka.admin.AdminUtils import kafka.utils.TestUtils._ import kafka.utils.IntEncoder import kafka.utils.{Utils, TestUtils} @@ -72,13 +71,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1))) - - // wait until leader is elected - var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) - assertTrue("Leader should get elected", leader.isDefined) - // NOTE: this is to avoid transient test failures - assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) + createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) val numMessages = 2L sendMessages(numMessages.toInt) @@ -105,14 +98,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1))) + var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)(0) - // wait until leader is elected - var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) - assertTrue("Leader should get elected", leader.isDefined) - // NOTE: this is to avoid transient test failures - assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) - assertEquals(0L, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) sendMessages(1) @@ -169,13 +156,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1))) + createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) - // wait until leader is elected - var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) - assertTrue("Leader should get elected", leader.isDefined) - // NOTE: this is to avoid transient test failures - assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) sendMessages(20) var hw = 20L // give some time for follower 1 to record leader HW of 600 @@ -202,13 +184,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(server1.config.brokerId, server2.config.brokerId))) - - // wait until leader is elected - var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) - assertTrue("Leader should get elected", leader.isDefined) - // NOTE: this is to avoid transient test failures - assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) + var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(server1.config.brokerId, server2.config.brokerId)), + servers = servers)(0) sendMessages(2) var hw = 2L diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 90c21c6..19a8635 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -30,7 +30,6 @@ import kafka.utils.TestUtils._ import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition} import scala.util.Random import scala.collection._ -import kafka.admin.AdminUtils class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val random: Random = new Random() @@ -78,9 +77,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val topicAndPartition = TopicAndPartition(topic, 0) val expectedReplicaAssignment = Map(0 -> List(1)) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined) + createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server)) + val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) val commitResponse = simpleConsumer.commitOffsets(commitRequest) @@ -168,9 +166,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { def testLargeMetadataPayload() { val topicAndPartition = TopicAndPartition("large-metadata", 0) val expectedReplicaAssignment = Map(0 -> List(1)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topicAndPartition.topic, expectedReplicaAssignment) - var leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicAndPartition.topic, 0) - assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined) + createTopic(zkClient, topicAndPartition.topic, partitionReplicaAssignment = expectedReplicaAssignment, + servers = Seq(server)) val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata( offset=42L, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 5305167..481a400 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -22,7 +22,6 @@ import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import kafka.producer.KeyedMessage import kafka.serializer.StringEncoder -import kafka.admin.AdminUtils import kafka.utils.TestUtils import junit.framework.Assert._ import kafka.common._ @@ -51,8 +50,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { // create a topic and partition and await leadership for (topic <- List(topic1,topic2)) { - AdminUtils.createTopic(zkClient, topic, 1, 2) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = brokers) } // send test messages to leader diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 1651822..addd11a 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -26,7 +26,6 @@ import kafka.zk.ZooKeeperTestHarness import kafka.producer._ import kafka.utils.IntEncoder import kafka.utils.TestUtils._ -import kafka.admin.AdminUtils import kafka.api.FetchRequestBuilder import kafka.utils.{TestUtils, Utils} @@ -49,8 +48,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { var producer = new Producer[Int, String](new ProducerConfig(producerConfig)) // create topic - AdminUtils.createTopic(zkClient, topic, 1, 1) - TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000) + createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) // send some messages producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*) @@ -69,7 +67,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { server.startup() // wait for the broker to receive the update metadata request after startup - TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0) producer = new Producer[Int, String](new ProducerConfig(producerConfig)) val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 4bd5964..130b6be 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -23,8 +23,6 @@ import java.nio._ import java.nio.channels._ import java.util.Random import java.util.Properties -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit import collection.mutable.Map import collection.mutable.ListBuffer @@ -150,15 +148,33 @@ object TestUtils extends Logging { } /** - * Create a topic in zookeeper + * Create a topic in zookeeper. + * Wait until the leader is elected and the metadata is propagated to all brokers. + * Return the leader for each partition. */ def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, - servers: List[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { // create topic AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor) // wait until the update metadata request for new topic reaches all servers (0 until numPartitions).map { case i => - TestUtils.waitUntilMetadataIsPropagated(servers, topic, i, 500) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, i) + i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i) + }.toMap + } + + /** + * Create a topic in zookeeper using a customized replica assignment. + * Wait until the leader is elected and the metadata is propagated to all brokers. + * Return the leader for each partition. + */ + def createTopic(zkClient: ZkClient, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]], + servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + // create topic + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaAssignment) + // wait until the update metadata request for new topic reaches all servers + partitionReplicaAssignment.keySet.map { case i => + TestUtils.waitUntilMetadataIsPropagated(servers, topic, i) i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i) }.toMap } @@ -553,8 +569,8 @@ object TestUtils extends Logging { byteBuffer } - def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = { - assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), + def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L) = { + assertTrue("Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout), TestUtils.waitUntilTrue(() => servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)), timeout)) }