From f75d4e54811dff0364f520d27576439bf49aa094 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Wed, 24 Jul 2013 22:23:02 -0700 Subject: [PATCH 1/3] Add a way to move topics --- .../kafka/admin/ReassignPartitionsCommand.scala | 86 ++++++++++++++++------ core/src/main/scala/kafka/utils/ZkUtils.scala | 17 +++++ 2 files changed, 82 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 8d287f4..093c9e1 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -18,6 +18,7 @@ package kafka.admin import joptsimple.OptionParser import kafka.utils._ +import collection._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import kafka.common.{TopicAndPartition, AdminCommandFailedException} @@ -26,21 +27,40 @@ object ReassignPartitionsCommand extends Logging { def main(args: Array[String]): Unit = { val parser = new OptionParser - val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " + - "new replicas they should be reassigned to in the following format - \n" + - "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }]\n}") + val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "The JSON file with the list of topics to reassign." + + "This option or manual-assignment-json-file needs to be specified. The format to use is - \n" + + "{\"topics\":\n\t[{\"topic\": \"foo\"}]\n}") .withRequiredArg - .describedAs("partition reassignment json file path") + .describedAs("topics to reassign json file path") .ofType(classOf[String]) + + val manualAssignmentJsonFileOpt = parser.accepts("manual-assignment-json-file", "The JSON file with the list of manual reassignments" + + "This option or topics-to-move-json-file needs to be specified. The format to use is - \n" + + "{\"topics\":\n\t[{\"topic\": \"foo\"}]\n}") + .withRequiredArg + .describedAs("manual assignment json file path") + .ofType(classOf[String]) + + val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" + + " in the form \"0,1,2\". This is required for automatic topic reassignment.") + .withRequiredArg + .describedAs("brokerlist") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + "form host:port. Multiple URLS can be given to allow fail-over.") .withRequiredArg .describedAs("urls") .ofType(classOf[String]) + val dryRunOpt = parser.accepts("dryrun", "This option just shows the final assignment and does not execute the command") + .withOptionalArg() + .describedAs("dryRun") + .ofType(classOf[String]) + val options = parser.parse(args : _*) - for(arg <- List(jsonFileOpt, zkConnectOpt)) { + for(arg <- List(zkConnectOpt)) { if(!options.has(arg)) { System.err.println("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) @@ -48,24 +68,48 @@ object ReassignPartitionsCommand extends Logging { } } - val jsonFile = options.valueOf(jsonFileOpt) val zkConnect = options.valueOf(zkConnectOpt) - val jsonString = Utils.readFileAsString(jsonFile) - var zkClient: ZkClient = null - + var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) try { - // read the json file into a string - val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) - if (partitionsToBeReassigned.isEmpty) - throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile)) - - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) - - if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) - else - println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) + + var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() + + if(options.has(topicsToMoveJsonFileOpt)) { + val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt) + val brokerList = options.valueOf(brokerListOpt) + val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) + val brokerListToReassign = brokerList.split(',') map (_.toInt) + val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) + val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) + val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic) + groupedByTopic.foreach { topicInfo => + val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size, + topicInfo._2.get(TopicAndPartition(topicInfo._1, 0)).get.size) + partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) + } + + } else if (options.has(manualAssignmentJsonFileOpt)) { + val manualAssignmentJsonFile = options.valueOf(manualAssignmentJsonFileOpt) + val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile) + partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(manualAssignmentJsonString) + if (partitionsToBeReassigned.isEmpty) + throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(manualAssignmentJsonFileOpt)) + } else { + System.err.println("Missing json file. One of the file needs to be specified") + parser.printHelpOn(System.err) + System.exit(1) + } + + if (!options.has(dryRunOpt)) { + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) + + if(reassignPartitionsCommand.reassignPartitions()) + println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) + else + println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) + } else { + System.out.println("The replica assignment is \n" + partitionsToBeReassigned.toString()) + } } catch { case e => println("Partitions reassignment failed due to " + e.getMessage) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index d53d511..bc7156e 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -578,6 +578,23 @@ object ZkUtils extends Logging { reassignedPartitions } + def parseTopicsData(jsonData: String): Seq[String] = { + var topics = List.empty[String] + Json.parseFull(jsonData) match { + case Some(m) => + m.asInstanceOf[Map[String, Any]].get("topics") match { + case Some(partitionsSeq) => + partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].foreach(p => { + val topic = p.get("topic").get.asInstanceOf[String] + topics ++= List(topic) + }) + case None => + } + case None => + } + topics + } + def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]() for (p <- partitionsToBeReassigned) { -- 1.7.12.4 (Apple Git-37) From afcc2829668c1f7d33032db28f3f7311790e0314 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Tue, 6 Aug 2013 12:55:58 -0700 Subject: [PATCH 2/3] reassign partition changes 2 --- .../kafka/admin/ReassignPartitionsCommand.scala | 5 +- .../scala/kafka/controller/KafkaController.scala | 130 +++++++++++---------- core/src/main/scala/kafka/utils/ZkUtils.scala | 3 +- 3 files changed, 73 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 093c9e1..6b6e4d8 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -29,14 +29,14 @@ object ReassignPartitionsCommand extends Logging { val parser = new OptionParser val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "The JSON file with the list of topics to reassign." + "This option or manual-assignment-json-file needs to be specified. The format to use is - \n" + - "{\"topics\":\n\t[{\"topic\": \"foo\"}]\n}") + "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}") .withRequiredArg .describedAs("topics to reassign json file path") .ofType(classOf[String]) val manualAssignmentJsonFileOpt = parser.accepts("manual-assignment-json-file", "The JSON file with the list of manual reassignments" + "This option or topics-to-move-json-file needs to be specified. The format to use is - \n" + - "{\"topics\":\n\t[{\"topic\": \"foo\"}]\n}") + "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }],\n\"version\":1\n}") .withRequiredArg .describedAs("manual assignment json file path") .ofType(classOf[String]) @@ -81,6 +81,7 @@ object ReassignPartitionsCommand extends Logging { val brokerListToReassign = brokerList.split(',') map (_.toInt) val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) + val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic) groupedByTopic.foreach { topicInfo => val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size, diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b07e27b..5883d51 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -135,7 +135,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg info("Shutting down broker " + id) controllerContext.controllerLock synchronized { - if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) + if (!controllerContext.liveBrokerIds.contains(id)) throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id)) controllerContext.shuttingDownBrokerIds.add(id) @@ -145,10 +145,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } val allPartitionsAndReplicationFactorOnBroker = controllerContext.controllerLock synchronized { - getPartitionsAssignedToBroker(zkClient, controllerContext.allTopics.toSeq, id).map { - case(topic, partition) => - val topicAndPartition = TopicAndPartition(topic, partition) - (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size) + controllerContext.partitionReplicaAssignment.map { + topicPartitionInfo => + (topicPartitionInfo._1, topicPartitionInfo._2.size) } } @@ -185,7 +184,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 }.map(_._1) } - replicatedPartitionsBrokerLeads().toSet + val partitionsBrokerLeads = replicatedPartitionsBrokerLeads().toSet + if (partitionsBrokerLeads.size == 0) + info("Shutdown succeeded") + partitionsBrokerLeads } } @@ -373,6 +375,54 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } + def watchIsrChangesForReassignedPartition(topic: String, + partition: Int, + reassignedPartitionContext: ReassignedPartitionsContext) { + val reassignedReplicas = reassignedPartitionContext.newReplicas + val isrChangeListener = new ReassignedPartitionsIsrChangeListener(controller, topic, partition, + reassignedReplicas.toSet) + reassignedPartitionContext.isrChangeListener = isrChangeListener + // register listener on the leader and isr path to wait until they catch up with the current leader + zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener) + } + + def initiateReassignPartitionForTopic(topicAndPartition: TopicAndPartition, + reassignedPartitionContext: ReassignedPartitionsContext) { + val newReplicas = reassignedPartitionContext.newReplicas + val topic = topicAndPartition.topic + val partition = topicAndPartition.partition + val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) + try { + val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition) + assignedReplicasOpt match { + case Some(assignedReplicas) => + if(assignedReplicas == newReplicas) { + throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) + + " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) + } else { + if(aliveNewReplicas == newReplicas) { + info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) + // first register ISR change listener + watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) + controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext) + controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext) + } else { + // some replica in RAR is not alive. Fail partition reassignment + throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + + " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) + + "Failing partition reassignment") + } + } + case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist" + .format(topicAndPartition)) + } + } catch { + case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e) + // remove the partition from the admin path to unblock the admin client + controller.removePartitionFromReassignedPartitions(topicAndPartition) + } + } + def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) { info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(","))) try { @@ -479,12 +529,18 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg val reassignedPartitions = partitionsBeingReassigned.filter(partition => controllerContext.partitionReplicaAssignment(partition._1) == partition._2.newReplicas).map(_._1) reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p)) - controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned - controllerContext.partitionsBeingReassigned --= reassignedPartitions + var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap + partitionsToReassign ++= partitionsBeingReassigned + partitionsToReassign --= reassignedPartitions + info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString())) info("Partitions already reassigned: %s".format(reassignedPartitions.toString())) - info("Resuming reassignment of partitions: %s".format(controllerContext.partitionsBeingReassigned.toString())) - controllerContext.partitionsBeingReassigned.foreach(partition => onPartitionReassignment(partition._1, partition._2)) + info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString())) + + // need to call method + partitionsToReassign.foreach { topicPartitionToReassign => + initiateReassignPartitionForTopic(topicPartitionToReassign._1, topicPartitionToReassign._2) + } } private def initializeAndMaybeTriggerPreferredReplicaElection() { @@ -773,39 +829,8 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL val newPartitions = partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1)) newPartitions.foreach { partitionToBeReassigned => controllerContext.controllerLock synchronized { - val topic = partitionToBeReassigned._1.topic - val partition = partitionToBeReassigned._1.partition - val newReplicas = partitionToBeReassigned._2 - val topicAndPartition = partitionToBeReassigned._1 - val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) - try { - val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition) - assignedReplicasOpt match { - case Some(assignedReplicas) => - if(assignedReplicas == newReplicas) { - throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) + - " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) - } else { - if(aliveNewReplicas == newReplicas) { - info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) - val context = createReassignmentContextForPartition(topic, partition, newReplicas) - controllerContext.partitionsBeingReassigned.put(topicAndPartition, context) - controller.onPartitionReassignment(topicAndPartition, context) - } else { - // some replica in RAR is not alive. Fail partition reassignment - throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + - " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) + - "Failing partition reassignment") - } - } - case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist" - .format(topicAndPartition)) - } - } catch { - case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e) - // remove the partition from the admin path to unblock the admin client - controller.removePartitionFromReassignedPartitions(topicAndPartition) - } + val context = new ReassignedPartitionsContext(partitionToBeReassigned._2) + initiateReassignPartitionForTopic(partitionToBeReassigned._1.topic, partitionToBeReassigned._1.partition, context) } } } @@ -818,25 +843,6 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL @throws(classOf[Exception]) def handleDataDeleted(dataPath: String) { } - - private def createReassignmentContextForPartition(topic: String, - partition: Int, - newReplicas: Seq[Int]): ReassignedPartitionsContext = { - val context = new ReassignedPartitionsContext(newReplicas) - // first register ISR change listener - watchIsrChangesForReassignedPartition(topic, partition, context) - context - } - - private def watchIsrChangesForReassignedPartition(topic: String, partition: Int, - reassignedPartitionContext: ReassignedPartitionsContext) { - val reassignedReplicas = reassignedPartitionContext.newReplicas - val isrChangeListener = new ReassignedPartitionsIsrChangeListener(controller, topic, partition, - reassignedReplicas.toSet) - reassignedPartitionContext.isrChangeListener = isrChangeListener - // register listener on the leader and isr path to wait until they catch up with the current leader - zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener) - } } class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int, diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index bc7156e..b1e9902 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -584,7 +584,8 @@ object ZkUtils extends Logging { case Some(m) => m.asInstanceOf[Map[String, Any]].get("topics") match { case Some(partitionsSeq) => - partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].foreach(p => { + val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]] + mapPartitionSeq.foreach(p => { val topic = p.get("topic").get.asInstanceOf[String] topics ++= List(topic) }) -- 1.7.12.4 (Apple Git-37) From 877ef08d2621556664ee9a9653bac6215fec3155 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Thu, 8 Aug 2013 09:38:09 -0700 Subject: [PATCH 3/3] few more changes for reassign partition --- .../scala/kafka/controller/KafkaController.scala | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 5883d51..b4b0d7f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -135,7 +135,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg info("Shutting down broker " + id) controllerContext.controllerLock synchronized { - if (!controllerContext.liveBrokerIds.contains(id)) + if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id)) controllerContext.shuttingDownBrokerIds.add(id) @@ -145,9 +145,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } val allPartitionsAndReplicationFactorOnBroker = controllerContext.controllerLock synchronized { - controllerContext.partitionReplicaAssignment.map { - topicPartitionInfo => - (topicPartitionInfo._1, topicPartitionInfo._2.size) + getPartitionsAssignedToBroker(zkClient, controllerContext.allTopics.toSeq, id).map { + case(topic, partition) => + val topicAndPartition = TopicAndPartition(topic, partition) + (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size) } } @@ -184,10 +185,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 }.map(_._1) } - val partitionsBrokerLeads = replicatedPartitionsBrokerLeads().toSet - if (partitionsBrokerLeads.size == 0) - info("Shutdown succeeded") - partitionsBrokerLeads + replicatedPartitionsBrokerLeads().toSet } } @@ -379,7 +377,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg partition: Int, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas - val isrChangeListener = new ReassignedPartitionsIsrChangeListener(controller, topic, partition, + val isrChangeListener = new ReassignedPartitionsIsrChangeListener(this, topic, partition, reassignedReplicas.toSet) reassignedPartitionContext.isrChangeListener = isrChangeListener // register listener on the leader and isr path to wait until they catch up with the current leader @@ -405,7 +403,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // first register ISR change listener watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext) - controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext) + onPartitionReassignment(topicAndPartition, reassignedPartitionContext) } else { // some replica in RAR is not alive. Fail partition reassignment throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + @@ -419,7 +417,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } catch { case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e) // remove the partition from the admin path to unblock the admin client - controller.removePartitionFromReassignedPartitions(topicAndPartition) + removePartitionFromReassignedPartitions(topicAndPartition) } } @@ -830,7 +828,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL newPartitions.foreach { partitionToBeReassigned => controllerContext.controllerLock synchronized { val context = new ReassignedPartitionsContext(partitionToBeReassigned._2) - initiateReassignPartitionForTopic(partitionToBeReassigned._1.topic, partitionToBeReassigned._1.partition, context) + controller.initiateReassignPartitionForTopic(partitionToBeReassigned._1, context) } } } -- 1.7.12.4 (Apple Git-37)