From 67905e34b2c8b86b8dbf22429b13cb5b89b2761d Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Sat, 11 Oct 2014 16:57:48 -0700 Subject: [PATCH 1/4] KAFKA-1653 Disallow duplicate broker IDs in user input for admin commands. --- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala | 4 ++++ core/src/main/scala/kafka/admin/TopicCommand.scala | 3 +++ core/src/main/scala/kafka/tools/StateChangeLogMerger.scala | 4 ++++ 3 files changed, 11 insertions(+) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 691d69a..1ae5953 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -81,6 +81,8 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt) val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt) + if (brokerListToReassign.size != brokerListToReassign.toSet.size) + throw new AdminCommandFailedException("Broker list should not contain duplicate entries.") val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) @@ -106,6 +108,8 @@ object ReassignPartitionsCommand extends Logging { val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString) if (partitionsToBeReassigned.isEmpty) throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile)) + if (partitionsToBeReassigned.valuesIterator.exists(replicas => (replicas.size != replicas.toSet.size))) + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries.") val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) // before starting assignment, output the current replica assignment to facilitate rollback val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 7672c5a..71c9e70 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -19,6 +19,7 @@ package kafka.admin import joptsimple._ import java.util.Properties +import kafka.common.AdminCommandFailedException import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -225,6 +226,8 @@ object TopicCommand { val ret = new mutable.HashMap[Int, List[Int]]() for (i <- 0 until partitionList.size) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) + if (brokerList.size != brokerList.toSet.size) + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries.") ret.put(i, brokerList.toList) if (ret(i).size != ret(0).size) throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList) diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index d298e7e..02cf1fe 100644 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -115,6 +115,10 @@ object StateChangeLogMerger extends Logging { } if (options.has(partitionsOpt)) { partitions = options.valueOf(partitionsOpt).split(",").toList.map(_.toInt) + if (partitions.size != partitions.toSet.size) { + System.err.println("The list of partitions should not have repeated entries.") + System.exit(1) + } } startDate = dateFormat.parse(options.valueOf(startTimeOpt).replace('\"', ' ').trim) endDate = dateFormat.parse(options.valueOf(endTimeOpt).replace('\"', ' ').trim) -- 2.1.2 From d1dd6d9c842288b75f67f335df3ac5220024f35f Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 16 Oct 2014 14:42:48 -0700 Subject: [PATCH 2/4] Generate error for duplicates in PreferredLeaderElectionCommand instead of just swallowing duplicates. --- .../kafka/admin/PreferredReplicaLeaderElectionCommand.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index c791848..da82f52 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -78,12 +78,15 @@ object PreferredReplicaLeaderElectionCommand extends Logging { case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match { case Some(partitionsList) => - val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]] - partitions.map { p => + val partitionsRaw = partitionsList.asInstanceOf[List[Map[String, Any]]] + val partitions = partitionsRaw.map { p => val topic = p.get("topic").get.asInstanceOf[String] val partition = p.get("partition").get.asInstanceOf[Int] TopicAndPartition(topic, partition) - }.toSet + } + val partitionsSet = partitions.toSet + if (partitionsSet.size != partitions.size) throw new AdminOperationException("Preferred replica election data should not contain duplicate partitions.") + partitionsSet case None => throw new AdminOperationException("Preferred replica election data is empty") } case None => throw new AdminOperationException("Preferred replica election data is empty") -- 2.1.2 From a5215235cf9e19f8043f12e55236a476fc58e1ba Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 16 Oct 2014 14:43:19 -0700 Subject: [PATCH 3/4] Report which entries are duplicated for ReassignPartitionCommand since they may be difficult to find in large reassignments. --- .../main/scala/kafka/admin/ReassignPartitionsCommand.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 1ae5953..7f8ea71 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -108,8 +108,15 @@ object ReassignPartitionsCommand extends Logging { val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString) if (partitionsToBeReassigned.isEmpty) throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile)) - if (partitionsToBeReassigned.valuesIterator.exists(replicas => (replicas.size != replicas.toSet.size))) - throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries.") + val duplicateEntries = partitionsToBeReassigned + .map{ case(tp,replicas) => (tp, replicas.groupBy(identity).filter(_._2.size > 1).keys)} + .filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty } + if (duplicateEntries.nonEmpty) { + val duplicatesMsg = duplicateEntries + .map{ case (tp,duplicateReplicas) => "%s contains multiple entries for %s".format(tp, duplicateReplicas.mkString(",")) } + .mkString(". ") + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicatesMsg)) + } val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) // before starting assignment, output the current replica assignment to facilitate rollback val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) -- 2.1.2 From 646fe7c828b070105f80f3013cf70f523b4c19b0 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 21 Oct 2014 11:56:55 -0700 Subject: [PATCH 4/4] Report duplicate topics and duplicate topic partitions in ReassignPartitionsCommand. Make all duplication error messagse include details about what item was duplicated. --- .../PreferredReplicaLeaderElectionCommand.scala | 4 +++- .../kafka/admin/ReassignPartitionsCommand.scala | 21 ++++++++++++++------- core/src/main/scala/kafka/admin/TopicCommand.scala | 5 +++-- .../scala/kafka/tools/StateChangeLogMerger.scala | 7 ++++--- core/src/main/scala/kafka/utils/Utils.scala | 10 ++++++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 15 ++++++++++----- 6 files changed, 44 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index da82f52..79b5e0a 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -84,8 +84,10 @@ object PreferredReplicaLeaderElectionCommand extends Logging { val partition = p.get("partition").get.asInstanceOf[Int] TopicAndPartition(topic, partition) } + val duplicatePartitions = Utils.duplicates(partitions) val partitionsSet = partitions.toSet - if (partitionsSet.size != partitions.size) throw new AdminOperationException("Preferred replica election data should not contain duplicate partitions.") + if (duplicatePartitions.nonEmpty) + throw new AdminOperationException("Preferred replica election data contains duplicate partitions: %s".format(duplicatePartitions.mkString(","))) partitionsSet case None => throw new AdminOperationException("Preferred replica election data is empty") } diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 7f8ea71..979992b 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -81,10 +81,14 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt) val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt) - if (brokerListToReassign.size != brokerListToReassign.toSet.size) - throw new AdminCommandFailedException("Broker list should not contain duplicate entries.") + val duplicateReassignments = Utils.duplicates(brokerListToReassign) + if (duplicateReassignments.nonEmpty) + throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(","))) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) + val duplicateTopicsToReassign = Utils.duplicates(topicsToReassign) + if (duplicateTopicsToReassign.nonEmpty) + throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(","))) val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() @@ -105,11 +109,14 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option") val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile) - val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString) + val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString) if (partitionsToBeReassigned.isEmpty) throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile)) - val duplicateEntries = partitionsToBeReassigned - .map{ case(tp,replicas) => (tp, replicas.groupBy(identity).filter(_._2.size > 1).keys)} + val duplicateReassignedPartitions = Utils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp}) + if (duplicateReassignedPartitions.nonEmpty) + throw new AdminCommandFailedException("Partition reassignment contains duplicate topic partitions: %s".format(duplicateReassignedPartitions.mkString(","))) + val duplicateEntries= partitionsToBeReassigned + .map{ case(tp,replicas) => (tp, Utils.duplicates(replicas))} .filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty } if (duplicateEntries.nonEmpty) { val duplicatesMsg = duplicateEntries @@ -117,14 +124,14 @@ object ReassignPartitionsCommand extends Logging { .mkString(". ") throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicatesMsg)) } - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap) // before starting assignment, output the current replica assignment to facilitate rollback val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback" .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) // start the reassignment if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))) + println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap))) else println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) } diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 71c9e70..0b2735e 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -226,8 +226,9 @@ object TopicCommand { val ret = new mutable.HashMap[Int, List[Int]]() for (i <- 0 until partitionList.size) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) - if (brokerList.size != brokerList.toSet.size) - throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries.") + val duplicateBrokers = Utils.duplicates(brokerList) + if (duplicateBrokers.nonEmpty) + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicateBrokers.mkString(","))) ret.put(i, brokerList.toList) if (ret(i).size != ret(0).size) throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList) diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index 02cf1fe..b34b8c7 100644 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -22,7 +22,7 @@ import scala.util.matching.Regex import collection.mutable import java.util.Date import java.text.SimpleDateFormat -import kafka.utils.{Logging, CommandLineUtils} +import kafka.utils.{Utils, Logging, CommandLineUtils} import kafka.common.Topic import java.io.{BufferedOutputStream, OutputStream} @@ -115,8 +115,9 @@ object StateChangeLogMerger extends Logging { } if (options.has(partitionsOpt)) { partitions = options.valueOf(partitionsOpt).split(",").toList.map(_.toInt) - if (partitions.size != partitions.toSet.size) { - System.err.println("The list of partitions should not have repeated entries.") + val duplicatePartitions = Utils.duplicates(partitions) + if (duplicatePartitions.nonEmpty) { + System.err.println("The list of partitions contains repeated entries: %s".format(duplicatePartitions.mkString(","))) System.exit(1) } } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 29d5a17..23aefb4 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -566,4 +566,14 @@ object Utils extends Logging { case c => c }.mkString } + + /** + * Returns a list of duplicated items + */ + def duplicates[T](s: Traversable[T]): Iterable[T] = { + s.groupBy(identity) + .map{ case (k,l) => (k,l.size)} + .filter{ case (k,l) => (l > 1) } + .keys + } } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index a7b1fdc..56e3e88 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -575,23 +575,28 @@ object ZkUtils extends Logging { } } - def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = { - val reassignedPartitions: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map() + // Parses without deduplicating keys so the the data can be checked before allowing reassignment to proceed + def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = { Json.parseFull(jsonData) match { case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match { case Some(partitionsSeq) => - partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].foreach(p => { + partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => { val topic = p.get("topic").get.asInstanceOf[String] val partition = p.get("partition").get.asInstanceOf[Int] val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]] - reassignedPartitions += TopicAndPartition(topic, partition) -> newReplicas + TopicAndPartition(topic, partition) -> newReplicas }) case None => + Seq.empty } case None => + Seq.empty } - reassignedPartitions + } + + def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = { + parsePartitionReassignmentDataWithoutDedup(jsonData).toMap } def parseTopicsData(jsonData: String): Seq[String] = { -- 2.1.2