diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 70d1b81..3a4fbc4 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -26,127 +26,27 @@ import kafka.common.{TopicAndPartition, AdminCommandFailedException} object ReassignPartitionsCommand extends Logging { def main(args: Array[String]): Unit = { - val parser = new OptionParser - val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "The JSON file with the list of topics to reassign." + - "This option or manual-assignment-json-file needs to be specified. The format to use is - \n" + - "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}") - .withRequiredArg - .describedAs("topics to reassign json file path") - .ofType(classOf[String]) - - val manualAssignmentJsonFileOpt = parser.accepts("manual-assignment-json-file", "The JSON file with the list of manual reassignments" + - "This option or topics-to-move-json-file needs to be specified. The format to use is - \n" + - "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }],\n\"version\":1\n}") - .withRequiredArg - .describedAs("manual assignment json file path") - .ofType(classOf[String]) - - val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" + - " in the form \"0,1,2\". This is required for automatic topic reassignment.") - .withRequiredArg - .describedAs("brokerlist") - .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + - "form host:port. Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - - val executeOpt = parser.accepts("execute", "This option does the actual reassignment. By default, the tool does a dry run") - .withOptionalArg() - .describedAs("execute") - .ofType(classOf[String]) - - val statusCheckJsonFileOpt = parser.accepts("status-check-json-file", "REQUIRED: The JSON file with the list of partitions and the " + - "new replicas they should be reassigned to, which can be obtained from the output of a dry run.") - .withRequiredArg - .describedAs("partition reassignment json file path") - .ofType(classOf[String]) - - val options = parser.parse(args : _*) + val opts = new ReassignPartitionsCommandOptions(args) - for(arg <- List(zkConnectOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } + // should have exactly one action + val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _) + if(actions != 1) { + opts.parser.printHelpOn(System.err) + Utils.croak("Command must include exactly one action: --generate, --execute or --verify") } - if (options.has(topicsToMoveJsonFileOpt) && options.has(manualAssignmentJsonFileOpt)) { - System.err.println("Only one of the json files should be specified") - parser.printHelpOn(System.err) - System.exit(1) - } + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) - val zkConnect = options.valueOf(zkConnectOpt) + val zkConnect = opts.options.valueOf(opts.zkConnectOpt) var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) try { - - var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() - - if(options.has(statusCheckJsonFileOpt)) { - val jsonFile = options.valueOf(statusCheckJsonFileOpt) - val jsonString = Utils.readFileAsString(jsonFile) - val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) - - println("Status of partition reassignment:") - val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) - reassignedPartitionsStatus.foreach { partition => - partition._2 match { - case ReassignmentCompleted => - println("Reassignment of partition %s completed successfully".format(partition._1)) - case ReassignmentFailed => - println("Reassignment of partition %s failed".format(partition._1)) - case ReassignmentInProgress => - println("Reassignment of partition %s is still in progress".format(partition._1)) - } - } - } else if(options.has(topicsToMoveJsonFileOpt)) { - if(!options.has(brokerListOpt)) { - System.err.println("broker-list is required if topics-to-move-json-file is used") - parser.printHelpOn(System.err) - System.exit(1) - } - val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt) - val brokerListToReassign = options.valueOf(brokerListOpt).split(',').map(_.toInt) - val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) - val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) - val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) - - val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic) - groupedByTopic.foreach { topicInfo => - val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size, - topicInfo._2.head._2.size) - partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) - } - } else if (options.has(manualAssignmentJsonFileOpt)) { - val manualAssignmentJsonFile = options.valueOf(manualAssignmentJsonFileOpt) - val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile) - partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(manualAssignmentJsonString) - if (partitionsToBeReassigned.isEmpty) - throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(manualAssignmentJsonFileOpt)) - } else { - System.err.println("Missing json file. One of the file needs to be specified") - parser.printHelpOn(System.err) - System.exit(1) - } - - if (options.has(topicsToMoveJsonFileOpt) || options.has(manualAssignmentJsonFileOpt)) { - if (options.has(executeOpt)) { - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) - - if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) - else - println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) - } else { - System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + - "The following is the replica assignment. Save it for the status check option.\n" + - ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)) - } - } + if(opts.options.has(opts.verifyOpt)) + verifyAssignment(zkClient, opts) + else if(opts.options.has(opts.generateOpt)) + generateAssignment(zkClient, opts) + else if (opts.options.has(opts.executeOpt)) + executeAssignment(zkClient, opts) } catch { case e: Throwable => println("Partitions reassignment failed due to " + e.getMessage) @@ -157,6 +57,84 @@ object ReassignPartitionsCommand extends Logging { } } + def verifyAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { + if(opts.options.has(opts.verifyOpt) && !opts.options.has(opts.reassignmentJsonFileOpt)) { + opts.parser.printHelpOn(System.err) + Utils.croak("If --verify option is used, command must include --reassignment-json-file that was used during the --execute option") + } + val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) + val jsonString = Utils.readFileAsString(jsonFile) + val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) + + println("Status of partition reassignment:") + val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) + reassignedPartitionsStatus.foreach { partition => + partition._2 match { + case ReassignmentCompleted => + println("Reassignment of partition %s completed successfully".format(partition._1)) + case ReassignmentFailed => + println("Reassignment of partition %s failed".format(partition._1)) + case ReassignmentInProgress => + println("Reassignment of partition %s is still in progress".format(partition._1)) + } + } + } + + def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { + if(opts.options.has(opts.topicsToMoveJsonFileOpt) && !opts.options.has(opts.generateOpt)) { + opts.parser.printHelpOn(System.err) + Utils.croak("If --topics-to-move-json-file option is used, command must include --generate option") + } + if(opts.options.has(opts.topicsToMoveJsonFileOpt) && !opts.options.has(opts.brokerListOpt)) { + opts.parser.printHelpOn(System.err) + Utils.croak("If --topics-to-move-json-file option is used, command must include --broker-list") + } + if(opts.options.has(opts.topicsToMoveJsonFileOpt)) { + val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt) + val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt) + val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) + val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) + val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) + + var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() + val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic) + groupedByTopic.foreach { topicInfo => + val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size, + topicInfo._2.head._2.size) + partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) + } + println("Proposed partition reassignment configuration\n%s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))) + } else { + val allTopics = ZkUtils.getAllTopics(zkClient) + val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics) + println("Current partition replica assignment for all partitions\n%s" + .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) + } + } + + def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { + if(opts.options.has(opts.executeOpt) && !opts.options.has(opts.reassignmentJsonFileOpt)) { + opts.parser.printHelpOn(System.err) + Utils.croak("If --execute option is used, command must include --reassignment-json-file that was output " + + "during the --generate option") + } + val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) + val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile) + val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString) + if (partitionsToBeReassigned.isEmpty) + throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile)) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) + // before starting assignment, output the current replica assignment to facilitate rollback + val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) + println("Current partition replica assignment\n%s. Save this to use as the --reassignment-json-file option during rollback" + .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) + // start the reassignment + if(reassignPartitionsCommand.reassignPartitions()) + println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) + else + println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) + } + private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) :Map[TopicAndPartition, ReassignmentStatus] = { val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) @@ -185,6 +163,39 @@ object ReassignPartitionsCommand extends Logging { } } } + + class ReassignPartitionsCommandOptions(args: Array[String]) { + val parser = new OptionParser + + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + + "form host:port. Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val generateOpt = parser.accepts("generate", "Generate a candidate partition reassignment configuration." + + " Note that this only generates a candidate assignment, it does not execute it.") + val executeOpt = parser.accepts("execute", "Kick off the reassignment as specified by the --reassignment-json-file option.") + val verifyOpt = parser.accepts("verify", "Verify if the reassignment completed as specified by the --reassignment-json-file option.") + val reassignmentJsonFileOpt = parser.accepts("reassignment-json-file", "The JSON file with the partition reassignment configuration" + + "The format to use is - \n" + + "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }],\n\"version\":1\n}") + .withRequiredArg + .describedAs("manual assignment json file path") + .ofType(classOf[String]) + val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "Generate a reassignment configuration to move the partitions" + + " of the specified topics to the list of brokers specified by the --broker-list option. The format to use is - \n" + + "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}") + .withRequiredArg + .describedAs("topics to reassign json file path") + .ofType(classOf[String]) + val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" + + " in the form \"0,1,2\". This is required if --topics-to-move-json-file is used to generate reassignment configuration") + .withRequiredArg + .describedAs("brokerlist") + .ofType(classOf[String]) + + val options = parser.parse(args : _*) + } } class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])