diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index e289798..c7eec6d 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -25,11 +25,10 @@ import kafka.log._ import kafka.zk.ZooKeeperTestHarness import kafka.utils.{Logging, ZkUtils, TestUtils} import kafka.common.{TopicExistsException, TopicAndPartition} -import kafka.server.{KafkaServer, KafkaConfig} +import kafka.server.{KafkaServer, KafkaConfig, RunningAsBroker} import java.io.File import TestUtils._ - class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test @@ -130,7 +129,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { intercept[TopicExistsException] { // shouldn't be able to create a topic that already exists - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + val numPartitions = 12 + val replicationFactor = 3 + AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor) } } @@ -142,12 +143,13 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test def testPartitionReassignmentWithLeaderInNewReplicas() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + val numPartitions = 1 + val replicationFactor = 3 + TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers) // reassign partition 0 val newReplicas = Seq(0, 2, 3) val partitionToBeReassigned = 0 @@ -173,12 +175,13 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test def testPartitionReassignmentWithLeaderNotInNewReplicas() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" + val numPartitions = 1 + val replicationFactor = 3 // create brokers val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers) // reassign partition 0 val newReplicas = Seq(1, 2, 3) val partitionToBeReassigned = 0 @@ -204,12 +207,13 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test def testPartitionReassignmentNonOverlappingReplicas() { - val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" + val numPartitions = 1 + val replicationFactor = 2 // create brokers val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -252,8 +256,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { def testResumePartitionReassignmentThatWasCompleted() { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" + + // create brokers + val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // put the partition in the reassigned path as well // reassign partition 0 val newReplicas = Seq(0, 1) @@ -261,8 +268,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) reassignPartitionsCommand.reassignPartitions - // create brokers - val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // wait until reassignment completes TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient), @@ -293,18 +298,24 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test def testBasicPreferredReplicaElection() { - val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" - val partition = 1 + val partition = 0 val preferredReplica = 0 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_)) + val servers = serverConfigs.map(s => TestUtils.createServer(s)) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) - // broker 2 should be the leader since it was started first + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) + // shutdown broker 0 to force leadership to broker 1 + servers(0).shutdown + servers(0).awaitShutdown + // broker 1 should be leader now val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None).get - // trigger preferred replica election + // start broker 0 back up, so it will be available as the preferred replica + servers(0).startup + // give server 0 a moment to get back into ISR + TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition) val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(TopicAndPartition(topic, partition))) preferredReplicaElection.moveLeaderToPreferredReplica() val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader)).get diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01b..f3c41af 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -98,16 +98,19 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testPartitionReassignmentDuringDeleteTopic() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" val topicAndPartition = TopicAndPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(4, false) brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) - val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + val numPartitions = 1 + val replicationFactor = 3 + TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, allServers) + // find the servers that contain a replica for the topic + val replicaIds = ZkUtils.getReplicasForPartition(zkClient, topic, 0) + val servers = allServers.filter(s => replicaIds.contains(s.config.brokerId)) // wait until replica log is created on every broker TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), @@ -121,8 +124,8 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since // the topic is being deleted // reassign partition 0 - val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) - val newReplicas = Seq(1, 2, 3) + // remove one of the original replica ids to ensure that the new replica set includes a previously unassigned broker + val newReplicas = Seq(0, 1, 2, 3).diff(Seq(replicaIds(0))) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed @@ -132,11 +135,11 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed; }, "Partition reassignment shouldn't complete.") val controllerId = ZkUtils.getController(zkClient) - val controller = servers.filter(s => s.config.brokerId == controllerId).head + val controller = allServers.filter(s => s.config.brokerId == controllerId).head assertFalse("Partition reassignment should fail", controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition)) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) - assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas) + assertEquals("Partition should not be reassigned to different broker set", replicaIds, assignedReplicas) follower.startup() verifyTopicDeletion(topic, servers) allServers.foreach(_.shutdown()) @@ -192,7 +195,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.deleteTopic(zkClient, topic) verifyTopicDeletion(topic, servers) // re-create topic on same replicas - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // wait until leader is elected val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) @@ -232,7 +235,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // wait until replica log is created on every broker TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index f44568c..bb1868a 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -101,19 +101,23 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { startBrokers(Seq(configProps1, configProps2)) // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) + val numPartitions = 1 + val replicationFactor = 2 + createTopic(zkClient, topic, numPartitions, replicationFactor, servers) verifyUncleanLeaderElectionEnabled } def testUncleanLeaderElectionDisabled { - // disable unclean leader election - configProps1.put("unclean.leader.election.enable", String.valueOf(false)) - configProps2.put("unclean.leader.election.enable", String.valueOf(false)) + // disable unclean leader election + configProps1.put("unclean.leader.election.enable", String.valueOf(false)) + configProps2.put("unclean.leader.election.enable", String.valueOf(false)) startBrokers(Seq(configProps1, configProps2)) // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) + val numPartitions = 1 + val replicationFactor = 2 + createTopic(zkClient, topic, numPartitions, replicationFactor, servers) verifyUncleanLeaderElectionDisabled } @@ -127,8 +131,9 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled val topicProps = new Properties() topicProps.put("unclean.leader.election.enable", String.valueOf(true)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), - topicProps) + val numPartitions = 1 + val replicationFactor = 2 + createTopic(zkClient, topic, numPartitions, replicationFactor, servers, topicProps) verifyUncleanLeaderElectionEnabled } @@ -142,9 +147,10 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled val topicProps = new Properties() topicProps.put("unclean.leader.election.enable", String.valueOf(false)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), - topicProps) - + val numPartitions = 1 + val replicationFactor = 2 + createTopic(zkClient, topic, numPartitions, replicationFactor, servers, topicProps) + verifyUncleanLeaderElectionDisabled } @@ -156,7 +162,9 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { topicProps.put("unclean.leader.election.enable", "invalid") intercept[IllegalArgumentException] { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1)), topicProps) + val numPartitions = 1 + val replicationFactor = 1 + createTopic(zkClient, topic, numPartitions, replicationFactor, servers, topicProps) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c4e13c5..03b441c 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -167,9 +167,9 @@ object TestUtils extends Logging { * Return the leader for each partition. */ def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, - servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + servers: Seq[KafkaServer], config: Properties = new Properties) : scala.collection.immutable.Map[Int, Option[Int]] = { // create topic - AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor) + AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, config) // wait until the update metadata request for new topic reaches all servers (0 until numPartitions).map { case i => TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)