From 1b509a19014bfecfe88638126eed9d86ca6c0c41 Mon Sep 17 00:00:00 2001 From: Jon Natkins Date: Fri, 25 Jul 2014 11:51:07 -0700 Subject: [PATCH] KAFKA-1420 Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in unit tests --- .../test/scala/unit/kafka/admin/AdminTest.scala | 189 ++++++++++----------- .../scala/unit/kafka/admin/DeleteTopicTest.scala | 27 +-- .../integration/UncleanLeaderElectionTest.scala | 18 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 20 ++- 4 files changed, 133 insertions(+), 121 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index e289798..f8d8e38 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -23,13 +23,12 @@ import java.util.Properties import kafka.utils._ import kafka.log._ import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{Logging, ZkUtils, TestUtils} -import kafka.common.{TopicExistsException, TopicAndPartition} -import kafka.server.{KafkaServer, KafkaConfig} +import kafka.utils.{ Logging, ZkUtils, TestUtils } +import kafka.common.{ TopicExistsException, TopicAndPartition } +import kafka.server.{ KafkaServer, KafkaConfig } import java.io.File import TestUtils._ - class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test @@ -48,16 +47,16 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // correct assignment val expectedAssignment = Map( - 0 -> List(0, 1, 2), - 1 -> List(1, 2, 3), - 2 -> List(2, 3, 4), - 3 -> List(3, 4, 0), - 4 -> List(4, 0, 1), - 5 -> List(0, 2, 3), - 6 -> List(1, 3, 4), - 7 -> List(2, 4, 0), - 8 -> List(3, 0, 1), - 9 -> List(4, 1, 2)) + 0 -> List(0, 1, 2), + 1 -> List(1, 2, 3), + 2 -> List(2, 3, 4), + 3 -> List(3, 4, 0), + 4 -> List(4, 0, 1), + 5 -> List(0, 2, 3), + 6 -> List(1, 3, 4), + 7 -> List(2, 4, 0), + 8 -> List(3, 0, 1), + 9 -> List(4, 1, 2)) val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0) val e = (expectedAssignment.toList == actualAssignment.toList) @@ -71,17 +70,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // duplicate brokers intercept[IllegalArgumentException] { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0->Seq(0,0))) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0 -> Seq(0, 0))) } // inconsistent replication factor intercept[IllegalArgumentException] { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0))) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0 -> Seq(0, 1), 1 -> Seq(0))) } // good assignment val assignment = Map(0 -> List(0, 1, 2), - 1 -> List(1, 2, 3)) + 1 -> List(1, 2, 3)) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", assignment) val found = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq("test")) assertEquals(assignment, found("test")) @@ -89,65 +88,46 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test def testTopicCreationInZK() { - val expectedReplicaAssignment = Map( - 0 -> List(0, 1, 2), - 1 -> List(1, 2, 3), - 2 -> List(2, 3, 4), - 3 -> List(3, 4, 0), - 4 -> List(4, 0, 1), - 5 -> List(0, 2, 3), - 6 -> List(1, 3, 4), - 7 -> List(2, 4, 0), - 8 -> List(3, 0, 1), - 9 -> List(4, 1, 2), - 10 -> List(1, 2, 3), - 11 -> List(1, 3, 4) - ) - val leaderForPartitionMap = Map( - 0 -> 0, - 1 -> 1, - 2 -> 2, - 3 -> 3, - 4 -> 4, - 5 -> 0, - 6 -> 1, - 7 -> 2, - 8 -> 3, - 9 -> 4, - 10 -> 1, - 11 -> 1 - ) + val numPartitions = 10 + val replicationFactor = 3 + val brokers = List(0, 1, 2, 3, 4) + val partitions = List.range(0, 10) + val replicaSets = TestUtils.combinations(brokers, replicationFactor) + val expectedReplicaAssignment = partitions.zip(replicaSets.toList).toMap + val leaderForPartitionMap = expectedReplicaAssignment.map{ case (k, v) => (k, v(0)) } val topic = "test" - TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) + + TestUtils.createBrokersInZk(zkClient, brokers) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap assertEquals(expectedReplicaAssignment.size, actualReplicaList.size) - for(i <- 0 until actualReplicaList.size) + for (i <- 0 until actualReplicaList.size) assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i)) intercept[TopicExistsException] { // shouldn't be able to create a topic that already exists - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor) } } private def getBrokersWithPartitionDir(servers: Iterable[KafkaServer], topic: String, partitionId: Int): Set[Int] = { servers.filter(server => new File(server.config.logDirs.head, topic + "-" + partitionId).exists) - .map(_.config.brokerId) - .toSet + .map(_.config.brokerId) + .toSet } @Test def testPartitionReassignmentWithLeaderInNewReplicas() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + val numPartitions = 1 + val replicationFactor = 3 + TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers) // reassign partition 0 val newReplicas = Seq(0, 2, 3) val partitionToBeReassigned = 0 @@ -156,10 +136,10 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, + }, "Partition reassignment should complete") val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) // in sync replicas should not have any replica that is not in the new assigned replicas @@ -167,18 +147,19 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, - "New replicas should exist on brokers") + "New replicas should exist on brokers") servers.foreach(_.shutdown()) } @Test def testPartitionReassignmentWithLeaderNotInNewReplicas() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" + val numPartitions = 1 + val replicationFactor = 3 // create brokers val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers) // reassign partition 0 val newReplicas = Seq(1, 2, 3) val partitionToBeReassigned = 0 @@ -187,29 +168,30 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, - Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; + }, "Partition reassignment should complete") val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, - "New replicas should exist on brokers") + "New replicas should exist on brokers") servers.foreach(_.shutdown()) } @Test def testPartitionReassignmentNonOverlappingReplicas() { - val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" + val numPartitions = 1 + val replicationFactor = 2 // create brokers val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -218,17 +200,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, - Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; + }, "Partition reassignment should complete") val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, - "New replicas should exist on brokers") + "New replicas should exist on brokers") servers.foreach(_.shutdown()) } @@ -249,11 +231,18 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } @Test - def testResumePartitionReassignmentThatWasCompleted() { - val expectedReplicaAssignment = Map(0 -> List(0, 1)) + def testResumePartitionReassignmentThatWasIncomplete() { + val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" + + // create brokers + val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) + // shut all servers down + servers.foreach(_.shutdown()) + servers.foreach(_.awaitShutdown()) + // put the partition in the reassigned path as well // reassign partition 0 val newReplicas = Seq(0, 1) @@ -261,19 +250,20 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) reassignPartitionsCommand.reassignPartitions - // create brokers - val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new KafkaConfig(b))) + + // restart servers to allow them to complete reassignment tasks + servers.foreach(_.startup()) // wait until reassignment completes TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient), - "Partition reassignment should complete") + "Partition reassignment should complete") val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas) // ensure that there are no under replicated partitions ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, - "New replicas should exist on brokers") + "New replicas should exist on brokers") servers.foreach(_.shutdown()) } @@ -284,7 +274,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection) // try to read it back and compare with what was written val preferredReplicaElectionZkData = ZkUtils.readData(zkClient, - ZkUtils.PreferredReplicaLeaderElectionPath)._1 + ZkUtils.PreferredReplicaLeaderElectionPath)._1 val partitionsUndergoingPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData) assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection, @@ -293,18 +283,24 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test def testBasicPreferredReplicaElection() { - val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" - val partition = 1 + val partition = 0 val preferredReplica = 0 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_)) + val servers = serverConfigs.map(s => TestUtils.createServer(s)) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) - // broker 2 should be the leader since it was started first + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) + // shutdown broker 0 to force leadership to broker 1 + servers(0).shutdown + servers(0).awaitShutdown + // broker 1 should be leader now val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None).get - // trigger preferred replica election + // start broker 0 back up, so it will be available as the preferred replica + servers(0).startup + // give server 0 a moment to get back into ISR + TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition) val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(TopicAndPartition(topic, partition))) preferredReplicaElection.moveLeaderToPreferredReplica() val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader)).get @@ -314,7 +310,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @Test def testShutdownBroker() { - val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) + val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) val topic = "test" val partition = 1 // create brokers @@ -330,29 +326,28 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { try { // wait for the update metadata request to trickle to the brokers TestUtils.waitUntilTrue(() => - activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), + activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic, partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), "Topic test not created after timeout") assertEquals(0, partitionsRemaining.size) - var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get + var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic, partition).get var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size) - assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr) + assertEquals(List(0, 1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr) partitionsRemaining = controller.shutdownBroker(1) assertEquals(0, partitionsRemaining.size) activeServers = servers.filter(s => s.config.brokerId == 0) - partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get + partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic, partition).get leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic, partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) partitionsRemaining = controller.shutdownBroker(0) assertEquals(1, partitionsRemaining.size) // leader doesn't change since all the replicas are shut down - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) - } - finally { + assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic, partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + } finally { servers.foreach(_.shutdown()) } } @@ -376,7 +371,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { def checkConfig(messageSize: Int, retentionMs: Long) { TestUtils.retry(10000) { - for(part <- 0 until partitions) { + for (part <- 0 until partitions) { val logOpt = server.logManager.getLog(TopicAndPartition(topic, part)) assertTrue(logOpt.isDefined) assertEquals(retentionMs, logOpt.get.config.retentionMs) @@ -388,13 +383,13 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { try { // create a topic with a few config overrides and check that they are applied val maxMessageSize = 1024 - val retentionMs = 1000*1000 + val retentionMs = 1000 * 1000 AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs)) checkConfig(maxMessageSize, retentionMs) // now double the config values for the topic and check that it is applied - AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs)) - checkConfig(2*maxMessageSize, 2 * retentionMs) + AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2 * maxMessageSize, 2 * retentionMs)) + checkConfig(2 * maxMessageSize, 2 * retentionMs) } finally { server.shutdown() server.config.logDirs.map(Utils.rm(_)) diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01b..7910ccc 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -98,31 +98,34 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testPartitionReassignmentDuringDeleteTopic() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" val topicAndPartition = TopicAndPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(4, false) brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) - val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + val numPartitions = 1 + val replicationFactor = 3 + TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, allServers) + // find the servers that contain a replica for the topic + val replicaIdsForPartition = ZkUtils.getReplicasForPartition(zkClient, topic, 0) + val serversHostingPartition = allServers.filter(s => replicaIdsForPartition.contains(s.config.brokerId)) // wait until replica log is created on every broker - TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => + TestUtils.waitUntilTrue(() => serversHostingPartition.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created.") val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last + val follower = serversHostingPartition.filter(s => s.config.brokerId != leaderIdOpt.get).last follower.shutdown() // start topic deletion AdminUtils.deleteTopic(zkClient, topic) // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since // the topic is being deleted // reassign partition 0 - val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) - val newReplicas = Seq(1, 2, 3) + // remove one of the original replica ids to ensure that the new replica set includes a previously unassigned broker + val newReplicas = Seq(0, 1, 2, 3).diff(Seq(replicaIdsForPartition(0))) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed @@ -132,13 +135,13 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed; }, "Partition reassignment shouldn't complete.") val controllerId = ZkUtils.getController(zkClient) - val controller = servers.filter(s => s.config.brokerId == controllerId).head + val controller = allServers.filter(s => s.config.brokerId == controllerId).head assertFalse("Partition reassignment should fail", controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition)) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) - assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas) + assertEquals("Partition should not be reassigned to different broker set", replicaIdsForPartition, assignedReplicas) follower.startup() - verifyTopicDeletion(topic, servers) + verifyTopicDeletion(topic, serversHostingPartition) allServers.foreach(_.shutdown()) } @@ -192,7 +195,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.deleteTopic(zkClient, topic) verifyTopicDeletion(topic, servers) // re-create topic on same replicas - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // wait until leader is elected val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) @@ -232,7 +235,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) // wait until replica log is created on every broker TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index f44568c..555bf4f 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -101,19 +101,19 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { startBrokers(Seq(configProps1, configProps2)) // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) + createTopic(zkClient, topic, 1, 2, servers) verifyUncleanLeaderElectionEnabled } def testUncleanLeaderElectionDisabled { - // disable unclean leader election - configProps1.put("unclean.leader.election.enable", String.valueOf(false)) - configProps2.put("unclean.leader.election.enable", String.valueOf(false)) + // disable unclean leader election + configProps1.put("unclean.leader.election.enable", String.valueOf(false)) + configProps2.put("unclean.leader.election.enable", String.valueOf(false)) startBrokers(Seq(configProps1, configProps2)) // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) + createTopic(zkClient, topic, 1, 2, servers) verifyUncleanLeaderElectionDisabled } @@ -127,8 +127,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled val topicProps = new Properties() topicProps.put("unclean.leader.election.enable", String.valueOf(true)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), - topicProps) + createTopic(zkClient, topic, 1, 2, servers, topicProps) verifyUncleanLeaderElectionEnabled } @@ -142,8 +141,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled val topicProps = new Properties() topicProps.put("unclean.leader.election.enable", String.valueOf(false)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), - topicProps) + createTopic(zkClient, topic, 1, 2, servers, topicProps) verifyUncleanLeaderElectionDisabled } @@ -156,7 +154,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { topicProps.put("unclean.leader.election.enable", "invalid") intercept[IllegalArgumentException] { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1)), topicProps) + createTopic(zkClient, topic, 1, 1, servers, topicProps) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c4e13c5..c222d37 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -167,9 +167,9 @@ object TestUtils extends Logging { * Return the leader for each partition. */ def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, - servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + servers: Seq[KafkaServer], config: Properties = new Properties) : scala.collection.immutable.Map[Int, Option[Int]] = { // create topic - AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor) + AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, config) // wait until the update metadata request for new topic reaches all servers (0 until numPartitions).map { case i => TestUtils.waitUntilMetadataIsPropagated(servers, topic, i) @@ -348,6 +348,22 @@ object TestUtils extends Logging { } /** + * Generate all the combinations of n elements from a list. This function exists + * in later versions of Scala, but 2.8 doesn't have it, so this is an auxiliary + * function used to aid in generating combinations of brokers for replica assignment. + * @param list List to generate combinations from + * @param n Number of elements that each combination should have + */ + def combinations(list: List[Int], n: Int): List[List[Int]] = { + if (n > list.size) List.empty + else list match { + case _ :: _ if n == 1 => list.map(List(_)) + case head :: tail => combinations(tail, n-1).map(head :: _) ::: combinations(tail, n) + case _ => Nil + } + } + + /** * Create a producer with a few pre-configured properties. * If certain properties need to be overridden, they can be provided in producerProps. */ -- 1.8.5.2 (Apple Git-48)