diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 94c5332..fc69275 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -108,7 +108,10 @@ object AdminUtils extends Logging { if (existingPartitionsReplicaList.size == 0) throw new AdminOperationException("The topic %s does not exist".format(topic)) - val existingReplicaList = existingPartitionsReplicaList.head._2 + val existingReplicaList = existingPartitionsReplicaList.find(p => p._1.partition == 0) match { + case None => throw new AdminOperationException("PartitionId should start from 0") + case Some(headPartitionReplica) => headPartitionReplica._2 + } val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size if (partitionsToAdd <= 0) throw new AdminOperationException("The number of partitions for a topic can only be increased") @@ -116,7 +119,7 @@ object AdminUtils extends Logging { // create the new partition replication list val brokerList = ZkUtils.getSortedBrokerList(zkClient) val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "") - AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size) + AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, brokerList.indexOf(existingReplicaList.head), existingPartitionsReplicaList.size) else getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size, checkBrokerAvailable)