diff --git a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala index 9e8ccc3..6e36492 100644 --- a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala +++ b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala @@ -47,9 +47,9 @@ object CheckReassignmentStatus extends Logging { val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get val newReplicas = replicasList.split(",").map(_.toInt) - (TopicAndPartition(topic, partition), newReplicas.toSeq) + (TopicAndPartition(topic, partition), newReplicas.toSet) }.toMap - case None => Map.empty[TopicAndPartition, Seq[Int]] + case None => Map.empty[TopicAndPartition, Set[Int]] } val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) @@ -66,7 +66,7 @@ object CheckReassignmentStatus extends Logging { } } - def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) + def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Set[Int]]) :Map[TopicAndPartition, ReassignmentStatus] = { val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) // for all partitions whose replica reassignment is complete, check the status @@ -77,9 +77,9 @@ object CheckReassignmentStatus extends Logging { } def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition, - reassignedReplicas: Seq[Int], - partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], - partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = { + reassignedReplicas: scala.collection.Set[Int], + partitionsToBeReassigned: Map[TopicAndPartition, scala.collection.Set[Int]], + partitionsBeingReassigned: Map[TopicAndPartition, scala.collection.Set[Int]]): ReassignmentStatus = { val newReplicas = partitionsToBeReassigned(topicAndPartition) partitionsBeingReassigned.get(topicAndPartition) match { case Some(partition) => ReassignmentInProgress diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 8d287f4..602d00c 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -77,7 +77,7 @@ object ReassignPartitionsCommand extends Logging { } } -class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]]) +class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Set[Int]]) extends Logging { def reassignPartitions(): Boolean = { try { diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index f7a7bd4..f659049 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -162,7 +162,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques stopAndDeleteReplicaRequestMap.clear() } - def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, + def addLeaderAndIsrRequestForBrokers(brokerIds: scala.collection.Set[Int], topic: String, partition: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicationFactor: Int) { brokerIds.foreach { brokerId => leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo]) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 229239c..af2d227 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -45,7 +45,7 @@ class ControllerContext(val zkClient: ZkClient, var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1, val correlationId: AtomicInteger = new AtomicInteger(0), var allTopics: Set[String] = Set.empty, - var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty, + var partitionReplicaAssignment: mutable.Map[TopicAndPartition, scala.collection.Set[Int]] = mutable.Map.empty, var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty, var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap, @@ -199,7 +199,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg removeReplicaFromIsr(topic, partition, id) match { case Some(updatedLeaderIsrAndControllerEpoch) => brokerRequestBatch.addLeaderAndIsrRequestForBrokers( - Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition, + Set(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition, updatedLeaderIsrAndControllerEpoch, replicationFactor) case None => // ignore @@ -526,7 +526,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = { + private def areReplicasInIsr(topic: String, partition: Int, replicas: scala.collection.Set[Int]): Boolean = { getLeaderAndIsrForPartition(zkClient, topic, partition) match { case Some(leaderAndIsr) => val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r)) @@ -580,7 +580,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg val reassignedReplicas = reassignedPartitionContext.newReplicas val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic)) partitionsAndReplicasForThisTopic.put(topicAndPartition, reassignedReplicas) - updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic) + updateAssignedReplicasForPartitionInZk(topicAndPartition, partitionsAndReplicasForThisTopic) info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, reassignedReplicas.mkString(","))) // update the assigned replica list after a successful zookeeper write controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas) @@ -626,11 +626,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg controllerContext.partitionsBeingReassigned.remove(topicAndPartition) } - def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition, - newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) { + def updateAssignedReplicasForPartitionInZk(topicAndPartition: TopicAndPartition, + newReplicaAssignmentForTopic: Map[TopicAndPartition, scala.collection.Set[Int]]) { try { val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic) - val jsonPartitionMap = ZkUtils.replicaAssignmentZkdata(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2))) + val jsonPartitionMap = ZkUtils.replicaAssignmentZkdata(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2.toSeq))) ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap) debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap)) } catch { @@ -820,7 +820,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL private def createReassignmentContextForPartition(topic: String, partition: Int, - newReplicas: Seq[Int]): ReassignedPartitionsContext = { + newReplicas: scala.collection.Set[Int]): ReassignedPartitionsContext = { val context = new ReassignedPartitionsContext(newReplicas) // first register ISR change listener watchIsrChangesForReassignedPartition(topic, partition, context) @@ -973,7 +973,7 @@ class ControllerEpochListener(controller: KafkaController) extends IZkDataListen } } -case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty, +case class ReassignedPartitionsContext(var newReplicas: scala.collection.Set[Int] = Set.empty, var isrChangeListener: ReassignedPartitionsIsrChangeListener = null) case class PartitionAndReplica(topic: String, partition: Int, replica: Int) diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index d295781..2b139d0 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -30,7 +30,7 @@ trait PartitionLeaderSelector { * Also, returns the list of replicas the returned leader and isr request should be sent to * This API selects a new leader for the input partition */ - def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, scala.collection.Set[Int]) } @@ -44,7 +44,7 @@ trait PartitionLeaderSelector { class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[OfflinePartitionLeaderSelector]: " - def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, scala.collection.Set[Int]) = { controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { case Some(assignedReplicas) => val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) @@ -89,7 +89,7 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex /** * The reassigned replicas are already in the ISR when selectLeader is called. */ - def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, scala.collection.Set[Int]) = { val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion @@ -120,7 +120,7 @@ class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerConte with Logging { this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: " - def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, scala.collection.Set[Int]) = { val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val preferredReplica = assignedReplicas.head // check if preferred replica is the current leader @@ -153,7 +153,7 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) this.logIdent = "[ControlledShutdownLeaderSelector]: " - def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, scala.collection.Set[Int]) = { val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion @@ -187,7 +187,7 @@ class NoOpLeaderSelector(controllerContext: ControllerContext) extends Partition this.logIdent = "[NoOpLeaderSelector]: " - def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, scala.collection.Set[Int]) = { warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.") (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition)) } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 654fa2e..58b51f2 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -283,7 +283,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { try { var zookeeperPathUpdateSucceeded: Boolean = false var newLeaderAndIsr: LeaderAndIsr = null - var replicasForThisPartition: Seq[Int] = Seq.empty[Int] + var replicasForThisPartition: Set[Int] = Set.empty[Int] while(!zookeeperPathUpdateSucceeded) { val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition) val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 5146f12..5e2e121 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -115,7 +115,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica" .format(replicaId, topicAndPartition) + "state as it is being requested to become leader") - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(Set(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size) case None => // new leader request will be sent to this replica when one gets elected } @@ -138,7 +138,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case NewReplica => // add this replica to the assigned replicas list for its partition val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) - controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) + controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas + replicaId) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" .format(controllerId, controller.epoch, replicaId, topicAndPartition)) case _ => @@ -147,7 +147,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case Some(leaderIsrAndControllerEpoch) => controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { case true => // leader is alive - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(Set(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size) replicaState.put((topic, partition, replicaId), OnlineReplica) @@ -169,7 +169,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { controller.removeReplicaFromIsr(topic, partition, replicaId) match { case Some(updatedLeaderIsrAndControllerEpoch) => // send the shrunk ISR state change request only to the leader - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(Set(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment.size) replicaState.put((topic, partition, replicaId), OfflineReplica) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 9a0e250..c2de72d 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -157,28 +157,28 @@ object ZkUtils extends Logging { /** * Gets the assigned replicas (AR) for a specific topic and partition */ - def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = { + def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): scala.collection.Set[Int] = { val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 jsonPartitionMapOpt match { case Some(jsonPartitionMap) => Json.parseFull(jsonPartitionMap) match { case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match { case Some(replicaMap) => replicaMap.asInstanceOf[Map[String, Seq[Int]]].get(partition.toString) match { - case Some(seq) => seq - case None => Seq.empty[Int] + case Some(seq) => seq.toSet + case None => Set.empty[Int] } - case None => Seq.empty[Int] + case None => Set.empty[Int] } - case None => Seq.empty[Int] + case None => Set.empty[Int] } - case None => Seq.empty[Int] + case None => Set.empty[Int] } } def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { val replicas = getReplicasForPartition(zkClient, topic, partition) debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas)) - replicas.contains(brokerId.toString) + replicas.contains(brokerId) } def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) { @@ -475,8 +475,8 @@ object ZkUtils extends Logging { ret } - def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = { - val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]] + def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[TopicAndPartition, Set[Int]] = { + val ret = new mutable.HashMap[TopicAndPartition, Set[Int]] topics.foreach { topic => val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 jsonPartitionMapOpt match { @@ -486,7 +486,7 @@ object ZkUtils extends Logging { case Some(repl) => val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]] for((partition, replicas) <- replicaMap){ - ret.put(TopicAndPartition(topic, partition.toInt), replicas) + ret.put(TopicAndPartition(topic, partition.toInt), replicas.toSet) debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas)) } case None => @@ -566,8 +566,8 @@ object ZkUtils extends Logging { } } - def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = { - val reassignedPartitions: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map() + def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Set[Int]] = { + val reassignedPartitions: mutable.Map[TopicAndPartition, Set[Int]] = mutable.Map() Json.parseFull(jsonData) match { case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match { @@ -576,7 +576,7 @@ object ZkUtils extends Logging { val topic = p.get("topic").get.asInstanceOf[String] val partition = p.get("partition").get.asInstanceOf[Int] val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]] - reassignedPartitions += TopicAndPartition(topic, partition) -> newReplicas + reassignedPartitions += TopicAndPartition(topic, partition) -> newReplicas.toSet }) case None => } @@ -585,10 +585,10 @@ object ZkUtils extends Logging { reassignedPartitions } - def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { + def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Set[Int]]): String = { var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]() for (p <- partitionsToBeReassigned) { - val jsonReplicasData = Utils.seqToJson(p._2.map(_.toString), valueInQuotes = false) + val jsonReplicasData = Utils.seqToJson(p._2.toSeq.map(_.toString), valueInQuotes = false) val jsonTopicData = Utils.mapToJsonFields(Map("topic" -> p._1.topic), valueInQuotes = true) val jsonPartitionData = Utils.mapToJsonFields(Map("partition" -> p._1.partition.toString, "replicas" -> jsonReplicasData), valueInQuotes = false) @@ -598,7 +598,7 @@ object ZkUtils extends Logging { valueInQuotes = false) } - def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) { + def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Set[Int]]) { val zkPath = ZkUtils.ReassignPartitionsPath partitionsToBeReassigned.size match { case 0 => // need to delete the /admin/reassign_partitions path diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 6c80c4c..9b6cf57 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -217,7 +217,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create the topic AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 - val newReplicas = Seq(0, 2, 3) + val newReplicas = Set(0, 2, 3) val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) @@ -242,7 +242,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create the topic AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 - val newReplicas = Seq(1, 2, 3) + val newReplicas = Set(1, 2, 3) val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) @@ -268,7 +268,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create the topic AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 - val newReplicas = Seq(2, 3) + val newReplicas = Set(2, 3) val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) @@ -291,7 +291,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // reassign partition 0 - val newReplicas = Seq(2, 3) + val newReplicas = Set(2, 3) val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) @@ -310,7 +310,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // put the partition in the reassigned path as well // reassign partition 0 - val newReplicas = Seq(0, 1) + val newReplicas = Set(0, 1) val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))