diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index e28979827110dfbbb92fe5b152e7f1cc973de400..e2c403e0c087ef18fc085bac3de4c9a062209e56 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -28,6 +28,7 @@ import kafka.common.{TopicExistsException, TopicAndPartition} import kafka.server.{KafkaServer, KafkaConfig} import java.io.File import TestUtils._ +import kafka.server.RunningAsBroker class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @@ -130,7 +131,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { intercept[TopicExistsException] { // shouldn't be able to create a topic that already exists - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createTopic(zkClient, topic, 12, 3) } } @@ -147,7 +148,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, 1, 3, servers) // reassign partition 0 val newReplicas = Seq(0, 2, 3) val partitionToBeReassigned = 0 @@ -173,12 +174,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test def testPartitionReassignmentWithLeaderNotInNewReplicas() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, 1, 3, servers) // reassign partition 0 val newReplicas = Seq(1, 2, 3) val partitionToBeReassigned = 0 @@ -204,12 +204,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test def testPartitionReassignmentNonOverlappingReplicas() { - val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create brokers val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, 1, 2, servers) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -252,8 +251,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { def testResumePartitionReassignmentThatWasCompleted() { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" + + // create brokers + val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // put the partition in the reassigned path as well // reassign partition 0 val newReplicas = Seq(0, 1) @@ -261,8 +263,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) reassignPartitionsCommand.reassignPartitions - // create brokers - val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // wait until reassignment completes TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient), @@ -293,20 +293,23 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test def testBasicPreferredReplicaElection() { - val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) val topic = "test" - val partition = 1 + val partition = 0 val preferredReplica = 0 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_)) + val servers = serverConfigs.map(s => TestUtils.createServer(s)) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) - // broker 2 should be the leader since it was started first + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + servers(0).shutdown + servers(0).awaitShutdown + // broker 1 should be leader now val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None).get - // trigger preferred replica election + servers(0).startup + TestUtils.waitUntilTrue(() => servers(0).brokerState.currentState == RunningAsBroker.state, "Broker failed to restart", 10000) val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(TopicAndPartition(topic, partition))) preferredReplicaElection.moveLeaderToPreferredReplica() + println(AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata) val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader)).get assertEquals("Preferred replica election failed", preferredReplica, newLeader) servers.foreach(_.shutdown()) diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01bcef9cacd8dec1f5d662644fc6fe4994bc..e5da7c01d710a854d42c86895a771c8ebc146951 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -107,7 +107,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // wait until replica log is created on every broker TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), @@ -192,7 +192,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.deleteTopic(zkClient, topic) verifyTopicDeletion(topic, servers) // re-create topic on same replicas - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // wait until leader is elected val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) @@ -232,7 +232,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // wait until replica log is created on every broker TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index f44568cb25edf25db857415119018fd4c9922f61..905afe079eed827b711e6fff098232b3927bd499 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -101,7 +101,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { startBrokers(Seq(configProps1, configProps2)) // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) + createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers) verifyUncleanLeaderElectionEnabled } @@ -113,7 +113,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { startBrokers(Seq(configProps1, configProps2)) // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) + createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers) verifyUncleanLeaderElectionDisabled } @@ -127,8 +127,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled val topicProps = new Properties() topicProps.put("unclean.leader.election.enable", String.valueOf(true)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), - topicProps) + createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers, topicProps) verifyUncleanLeaderElectionEnabled } @@ -142,8 +141,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled val topicProps = new Properties() topicProps.put("unclean.leader.election.enable", String.valueOf(false)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), - topicProps) + createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers, topicProps) verifyUncleanLeaderElectionDisabled } @@ -156,7 +154,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { topicProps.put("unclean.leader.election.enable", "invalid") intercept[IllegalArgumentException] { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1)), topicProps) + createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1)), servers, topicProps) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c4e13c5240c8303853d08cc3b40088f8c7dae460..8a4173b06c88bf57b8bc6fba2260253d4eea37b7 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -183,15 +183,20 @@ object TestUtils extends Logging { * Return the leader for each partition. */ def createTopic(zkClient: ZkClient, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]], - servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + servers: Seq[KafkaServer], config: Properties) : scala.collection.immutable.Map[Int, Option[Int]] = { // create topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaAssignment) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaAssignment, config) // wait until the update metadata request for new topic reaches all servers partitionReplicaAssignment.keySet.map { case i => TestUtils.waitUntilMetadataIsPropagated(servers, topic, i) i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i) }.toMap } + + def createTopic(zkClient: ZkClient, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]], + servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + createTopic(zkClient, topic, partitionReplicaAssignment, servers, new Properties) + } /** * Create a test config for a consumer