diff --git a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala index 7e85f87..51e629a 100644 --- a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala +++ b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala @@ -27,7 +27,8 @@ object CheckReassignmentStatus extends Logging { def main(args: Array[String]): Unit = { val parser = new OptionParser val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " + - "new replicas they should be reassigned to") + "new replicas they should be reassigned to, which can be obtained from the output of running " + + "ReassignPartitionsCommand without execution") .withRequiredArg .describedAs("partition reassignment json file path") .ofType(classOf[String]) @@ -54,18 +55,7 @@ object CheckReassignmentStatus extends Logging { try { // read the json file into a string - val partitionsToBeReassigned = Json.parseFull(jsonString) match { - case Some(reassignedPartitions) => - val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] - partitions.map { m => - val topic = m.asInstanceOf[Map[String, String]].get("topic").get - val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt - val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get - val newReplicas = replicasList.split(",").map(_.toInt) - (TopicAndPartition(topic, partition), newReplicas.toSeq) - }.toMap - case None => Map.empty[TopicAndPartition, Seq[Int]] - } + val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) reassignedPartitionsStatus.foreach { partition => diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index f333d29..aaec268 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -116,7 +116,7 @@ object ReassignPartitionsCommand extends Logging { println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) } else { System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + - "The replica assignment is \n" + partitionsToBeReassigned.toString()) + "The following is the replica assignment. Save it for CheckReassignmentStatus.\n" + ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)) } } catch { case e: Throwable =>