diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 691d69a..598dc8d 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -87,9 +87,15 @@ object ReassignPartitionsCommand extends Logging { var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic) + + val replicasOpt = if (opts.options.has(opts.replicasOpt)) { + Some(opts.options.valueOf(opts.replicasOpt).toInt) + } else { + None + } groupedByTopic.foreach { topicInfo => val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size, - topicInfo._2.head._2.size) + replicasOpt.getOrElse(topicInfo._2.head._2.size)) partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) } val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) @@ -180,6 +186,8 @@ object ReassignPartitionsCommand extends Logging { if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.") + val replicasOpt = parser.accepts("replicas").withRequiredArg().ofType(classOf[String]) + val options = parser.parse(args : _*) } }