From 8b0c6f1fa157856f154f696dc077187cde8bf784 Mon Sep 17 00:00:00 2001
From: Ewen Cheslack-Postava <me@ewencp.org>
Date: Thu, 25 Sep 2014 15:48:43 -0700
Subject: [PATCH] KAFKA-1631 Use target number of partitions during
 reassignment to determine if partition is under-replicated.

---
 core/src/main/scala/kafka/admin/TopicCommand.scala | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 3b2166a..4f0c7c8 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -29,6 +29,7 @@ import kafka.log.LogConfig
 import kafka.consumer.Whitelist
 import kafka.server.OffsetManager
 import org.apache.kafka.common.utils.Utils.formatAddress
+import kafka.common.TopicAndPartition
 
 
 object TopicCommand {
@@ -157,6 +158,7 @@ object TopicCommand {
     val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
     val reportOverriddenConfigs = if (opts.options.has(opts.topicsWithOverridesOpt)) true else false
     val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
+    val reassigningPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient)
     for (topic <- topics) {
       ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
         case Some(topicPartitionAssignment) =>
@@ -175,9 +177,13 @@ object TopicCommand {
           if (describePartitions) {
             for ((partitionId, assignedReplicas) <- sortedPartitions) {
               val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
+              val numTargetReplicas = reassigningPartitions.get(TopicAndPartition(topic,partitionId)) match {
+                case Some(reassignedPartitions) => reassignedPartitions.newReplicas.size
+                case None => assignedReplicas.size
+              }
               val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
               if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
-                  (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
+                  (reportUnderReplicatedPartitions && inSyncReplicas.size < numTargetReplicas) ||
                   (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
                 print("\tTopic: " + topic)
                 print("\tPartition: " + partitionId)
-- 
1.9.1

