diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index fc69275..2cd8023 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -108,7 +108,7 @@ object AdminUtils extends Logging { if (existingPartitionsReplicaList.size == 0) throw new AdminOperationException("The topic %s does not exist".format(topic)) - val existingReplicaList = existingPartitionsReplicaList.find(p => p._1.partition == 0) match { + val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match { case None => throw new AdminOperationException("PartitionId should start from 0") case Some(headPartitionReplica) => headPartitionReplica._2 } @@ -118,16 +118,21 @@ object AdminUtils extends Logging { // create the new partition replication list val brokerList = ZkUtils.getSortedBrokerList(zkClient) - val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "") - AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, brokerList.indexOf(existingReplicaList.head), existingPartitionsReplicaList.size) + val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "") { + var startIndex = brokerList.indexWhere(_ >= existingReplicaListForPartitionZero.head) + if(startIndex < 0) { + startIndex = 0 + } + AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaListForPartitionZero.size, startIndex, existingPartitionsReplicaList.size) + } else getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size, checkBrokerAvailable) // check if manual assignment has the right replication factor - val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size)) + val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size)) if (unmatchedRepFactorList.size != 0) throw new AdminOperationException("The replication factor in manual replication assignment " + - " is not equal to the existing replication factor for the topic " + existingReplicaList.size) + " is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size) info("Add partition list for %s is %s".format(topic, newPartitionReplicaList)) val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)