From b0206bb240dcf99669c5a54f617b983ea1d6c316 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Sat, 11 Oct 2014 16:57:48 -0700 Subject: [PATCH] KAFKA-1653 Disallow duplicate broker IDs in user input for admin commands. --- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala | 4 ++++ core/src/main/scala/kafka/admin/TopicCommand.scala | 3 +++ core/src/main/scala/kafka/tools/StateChangeLogMerger.scala | 4 ++++ 3 files changed, 11 insertions(+) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 691d69a..1ae5953 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -81,6 +81,8 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt) val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt) + if (brokerListToReassign.size != brokerListToReassign.toSet.size) + throw new AdminCommandFailedException("Broker list should not contain duplicate entries.") val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) @@ -106,6 +108,8 @@ object ReassignPartitionsCommand extends Logging { val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString) if (partitionsToBeReassigned.isEmpty) throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile)) + if (partitionsToBeReassigned.valuesIterator.exists(replicas => (replicas.size != replicas.toSet.size))) + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries.") val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) // before starting assignment, output the current replica assignment to facilitate rollback val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 7672c5a..71c9e70 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -19,6 +19,7 @@ package kafka.admin import joptsimple._ import java.util.Properties +import kafka.common.AdminCommandFailedException import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -225,6 +226,8 @@ object TopicCommand { val ret = new mutable.HashMap[Int, List[Int]]() for (i <- 0 until partitionList.size) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) + if (brokerList.size != brokerList.toSet.size) + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries.") ret.put(i, brokerList.toList) if (ret(i).size != ret(0).size) throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList) diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index d298e7e..02cf1fe 100644 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -115,6 +115,10 @@ object StateChangeLogMerger extends Logging { } if (options.has(partitionsOpt)) { partitions = options.valueOf(partitionsOpt).split(",").toList.map(_.toInt) + if (partitions.size != partitions.toSet.size) { + System.err.println("The list of partitions should not have repeated entries.") + System.exit(1) + } } startDate = dateFormat.parse(options.valueOf(startTimeOpt).replace('\"', ' ').trim) endDate = dateFormat.parse(options.valueOf(endTimeOpt).replace('\"', ' ').trim) -- 2.1.2