From 982641810916f7c1b40508167f0d41a54ae2d2db Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Sat, 11 Oct 2014 16:57:48 -0700 Subject: [PATCH 1/3] KAFKA-1653 Disallow duplicate broker IDs in user input for admin commands. --- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala | 4 ++++ core/src/main/scala/kafka/admin/TopicCommand.scala | 3 +++ core/src/main/scala/kafka/tools/StateChangeLogMerger.scala | 4 ++++ 3 files changed, 11 insertions(+) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 691d69a..1ae5953 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -81,6 +81,8 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt) val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt) + if (brokerListToReassign.size != brokerListToReassign.toSet.size) + throw new AdminCommandFailedException("Broker list should not contain duplicate entries.") val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) @@ -106,6 +108,8 @@ object ReassignPartitionsCommand extends Logging { val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString) if (partitionsToBeReassigned.isEmpty) throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile)) + if (partitionsToBeReassigned.valuesIterator.exists(replicas => (replicas.size != replicas.toSet.size))) + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries.") val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) // before starting assignment, output the current replica assignment to facilitate rollback val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 7672c5a..71c9e70 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -19,6 +19,7 @@ package kafka.admin import joptsimple._ import java.util.Properties +import kafka.common.AdminCommandFailedException import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -225,6 +226,8 @@ object TopicCommand { val ret = new mutable.HashMap[Int, List[Int]]() for (i <- 0 until partitionList.size) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) + if (brokerList.size != brokerList.toSet.size) + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries.") ret.put(i, brokerList.toList) if (ret(i).size != ret(0).size) throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList) diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index d298e7e..02cf1fe 100644 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -115,6 +115,10 @@ object StateChangeLogMerger extends Logging { } if (options.has(partitionsOpt)) { partitions = options.valueOf(partitionsOpt).split(",").toList.map(_.toInt) + if (partitions.size != partitions.toSet.size) { + System.err.println("The list of partitions should not have repeated entries.") + System.exit(1) + } } startDate = dateFormat.parse(options.valueOf(startTimeOpt).replace('\"', ' ').trim) endDate = dateFormat.parse(options.valueOf(endTimeOpt).replace('\"', ' ').trim) -- 2.1.2 From 1743b48c8c9af9c6b7ac995bd6dbf163c91ae250 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 16 Oct 2014 14:42:48 -0700 Subject: [PATCH 2/3] Generate error for duplicates in PreferredLeaderElectionCommand instead of just swallowing duplicates. --- .../kafka/admin/PreferredReplicaLeaderElectionCommand.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index c791848..da82f52 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -78,12 +78,15 @@ object PreferredReplicaLeaderElectionCommand extends Logging { case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match { case Some(partitionsList) => - val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]] - partitions.map { p => + val partitionsRaw = partitionsList.asInstanceOf[List[Map[String, Any]]] + val partitions = partitionsRaw.map { p => val topic = p.get("topic").get.asInstanceOf[String] val partition = p.get("partition").get.asInstanceOf[Int] TopicAndPartition(topic, partition) - }.toSet + } + val partitionsSet = partitions.toSet + if (partitionsSet.size != partitions.size) throw new AdminOperationException("Preferred replica election data should not contain duplicate partitions.") + partitionsSet case None => throw new AdminOperationException("Preferred replica election data is empty") } case None => throw new AdminOperationException("Preferred replica election data is empty") -- 2.1.2 From 20183a8ffac6f039846e47679c476aab0fe2fd35 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 16 Oct 2014 14:43:19 -0700 Subject: [PATCH 3/3] Report which entries are duplicated for ReassignPartitionCommand since they may be difficult to find in large reassignments. --- .../main/scala/kafka/admin/ReassignPartitionsCommand.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 1ae5953..7f8ea71 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -108,8 +108,15 @@ object ReassignPartitionsCommand extends Logging { val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString) if (partitionsToBeReassigned.isEmpty) throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile)) - if (partitionsToBeReassigned.valuesIterator.exists(replicas => (replicas.size != replicas.toSet.size))) - throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries.") + val duplicateEntries = partitionsToBeReassigned + .map{ case(tp,replicas) => (tp, replicas.groupBy(identity).filter(_._2.size > 1).keys)} + .filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty } + if (duplicateEntries.nonEmpty) { + val duplicatesMsg = duplicateEntries + .map{ case (tp,duplicateReplicas) => "%s contains multiple entries for %s".format(tp, duplicateReplicas.mkString(",")) } + .mkString(". ") + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicatesMsg)) + } val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) // before starting assignment, output the current replica assignment to facilitate rollback val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) -- 2.1.2