Index: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala	(revision e402c7be723e5f189a8f2b4f91a28f3b26ea9112)
+++ core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala	(revision )
@@ -18,10 +18,12 @@
 
 import joptsimple.OptionParser
 import kafka.utils._
-import collection._
+import scala.collection._
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.common.{TopicAndPartition, AdminCommandFailedException}
+import kafka.common.TopicAndPartition
+import scala.Some
 
 object ReassignPartitionsCommand extends Logging {
 
@@ -62,6 +64,16 @@
     val jsonString = Utils.readFileAsString(jsonFile)
     val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
 
+    partitionsToBeReassigned.foreach(entry => {
+      if (!partitionExists(zkClient, entry._1.topic, entry._1.partition))
+        throw new AdminCommandFailedException("Topic %s or its partition %d not found".format(entry._1.topic, entry._1.partition))
+
+      entry._2.foreach(id =>
+        if (ZkUtils.getBrokerInfo(zkClient, id).isEmpty)
+          throw new AdminCommandFailedException("Broker %s not found".format(id))
+      )
+    })
+
     println("Status of partition reassignment:")
     val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
     reassignedPartitionsStatus.foreach { partition =>
@@ -77,19 +89,32 @@
   }
 
   def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
-    if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt)))
+    if (!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt)))
       CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options")
     val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
+
     val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
+    brokerListToReassign.foreach(id =>
+      if (ZkUtils.getBrokerInfo(zkClient, id).isEmpty)
+        throw new AdminCommandFailedException("Broker %s not found".format(id))
+    )
+
     val duplicateReassignments = Utils.duplicates(brokerListToReassign)
     if (duplicateReassignments.nonEmpty)
       throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(",")))
+
     val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
     val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
     val duplicateTopicsToReassign = Utils.duplicates(topicsToReassign)
     if (duplicateTopicsToReassign.nonEmpty)
       throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
+
     val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
+    topicsToReassign.foreach { topic =>
+      var topicFound = false
+      topicPartitionsToReassign.foreach(topicAndPartition => topicFound |= topicAndPartition._1.topic == topic)
+      if (!topicFound) throw new AdminCommandFailedException("Topic %s not found".format(topic))
+    }
 
     var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
     val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
@@ -98,6 +123,7 @@
         topicInfo._2.head._2.size)
       partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
     }
+
     val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
     println("Current partition replica assignment\n\n%s"
       .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
@@ -105,30 +131,37 @@
   }
 
   def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
-    if(!opts.options.has(opts.reassignmentJsonFileOpt))
+    if (!opts.options.has(opts.reassignmentJsonFileOpt))
       CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option")
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
+
     val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile))
+
     val duplicateReassignedPartitions = Utils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp})
     if (duplicateReassignedPartitions.nonEmpty)
       throw new AdminCommandFailedException("Partition reassignment contains duplicate topic partitions: %s".format(duplicateReassignedPartitions.mkString(",")))
+
     val duplicateEntries= partitionsToBeReassigned
       .map{ case(tp,replicas) => (tp, Utils.duplicates(replicas))}
       .filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty }
+
     if (duplicateEntries.nonEmpty) {
       val duplicatesMsg = duplicateEntries
         .map{ case (tp,duplicateReplicas) => "%s contains multiple entries for %s".format(tp, duplicateReplicas.mkString(",")) }
         .mkString(". ")
       throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicatesMsg))
     }
+
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap)
+
     // before starting assignment, output the current replica assignment to facilitate rollback
     val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
     println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
       .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
+
     // start the reassignment
     if(reassignPartitionsCommand.reassignPartitions())
       println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap)))
@@ -136,6 +169,15 @@
       println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
   }
 
+  private def partitionExists(zkClient: ZkClient, topic: String, partition: Int): Boolean = {
+    val partitionsOpt = ZkUtils.getPartitionsForTopics(zkClient, List(topic)).get(topic)
+    partitionsOpt match {
+      case Some(partitions) =>
+        partitions.contains(partition)
+      case None => false
+    }
+  }
+
   private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
   :Map[TopicAndPartition, ReassignmentStatus] = {
     val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
@@ -205,9 +247,18 @@
 class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
   extends Logging {
   def reassignPartitions(): Boolean = {
+    partitions.foreach(entry => {
+      if (!ReassignPartitionsCommand.partitionExists(zkClient, entry._1.topic, entry._1.partition))
+        throw new AdminCommandFailedException("Topic %s or its partition %d not found".format(entry._1.topic, entry._1.partition))
+
+      entry._2.foreach(id =>
+        if (ZkUtils.getBrokerInfo(zkClient, id).isEmpty)
+          throw new AdminCommandFailedException("Broker %s not found".format(id))
+      )
+    })
+
     try {
-      val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1.topic, p._1.partition))
-      val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(validPartitions)
+      val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(partitions)
       ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
       true
     } catch {
@@ -216,24 +267,6 @@
         throw new AdminCommandFailedException("Partition reassignment currently in " +
         "progress for %s. Aborting operation".format(partitionsBeingReassigned))
       case e: Throwable => error("Admin command failed", e); false
-    }
-  }
-
-  def validatePartition(zkClient: ZkClient, topic: String, partition: Int): Boolean = {
-    // check if partition exists
-    val partitionsOpt = ZkUtils.getPartitionsForTopics(zkClient, List(topic)).get(topic)
-    partitionsOpt match {
-      case Some(partitions) =>
-        if(partitions.contains(partition)) {
-          true
-        }else{
-          error("Skipping reassignment of partition [%s,%d] ".format(topic, partition) +
-            "since it doesn't exist")
-          false
-        }
-      case None => error("Skipping reassignment of partition " +
-        "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic))
-        false
     }
   }
 }
