From 67102f034aff51afeea5ed0065913bf7cc4f4771 Mon Sep 17 00:00:00 2001
From: Onur Karaman <okaraman@linkedin.com>
Date: Thu, 6 Nov 2014 19:04:36 -0800
Subject: [PATCH] ControllerContext removeTopic does not correctly update
 state

---
 .../scala/kafka/controller/KafkaController.scala   |  4 +-
 .../kafka/controller/ControllerContextTest.scala   | 57 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 2 deletions(-)
 create mode 100644 core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 51a5bad..66df6d2 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -115,8 +115,8 @@ class ControllerContext(val zkClient: ZkClient,
   }
 
   def removeTopic(topic: String) = {
-    partitionLeadershipInfo = partitionLeadershipInfo.dropWhile(p => p._1.topic.equals(topic))
-    partitionReplicaAssignment = partitionReplicaAssignment.dropWhile(p => p._1.topic.equals(topic))
+    partitionLeadershipInfo = partitionLeadershipInfo.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }
+    partitionReplicaAssignment = partitionReplicaAssignment.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }
     allTopics -= topic
   }
 }
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
new file mode 100644
index 0000000..bc9cd73
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import org.junit.Test
+import org.junit.Assert._
+import kafka.common.TopicAndPartition
+import scala.collection.mutable
+import kafka.api.LeaderAndIsr
+import org.scalatest.junit.JUnitSuite
+
+class ControllerContextTest extends JUnitSuite {
+
+  @Test
+  def testRemoveTopic {
+    val mockZkSessionTimeout = 5
+    val controllerContext = new ControllerContext(null, mockZkSessionTimeout)
+    controllerContext.partitionLeadershipInfo = mutable.LinkedHashMap.empty
+    controllerContext.partitionReplicaAssignment = mutable.LinkedHashMap.empty
+    controllerContext.allTopics = Set.empty
+
+    val topicToRemove = "topic1"
+    val remainingTopicAndPartition = TopicAndPartition("topic2", 2)
+    val topicAndPartitions = Seq(TopicAndPartition(topicToRemove, 1), remainingTopicAndPartition, TopicAndPartition(topicToRemove, 3))
+    val mockLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(-1, -1, List.empty, -1), -1)
+
+    topicAndPartitions.foreach { topicAndPartition =>
+      controllerContext.partitionLeadershipInfo += topicAndPartition -> mockLeaderIsrAndControllerEpoch
+      controllerContext.partitionReplicaAssignment += topicAndPartition -> Seq.empty
+      controllerContext.allTopics += topicAndPartition.topic
+    }
+
+    controllerContext.removeTopic(topicToRemove)
+
+    val expectedPartitionLeadershipInfo = mutable.LinkedHashMap(remainingTopicAndPartition -> mockLeaderIsrAndControllerEpoch)
+    val expectedPartitionReplicaAssignment = mutable.LinkedHashMap(remainingTopicAndPartition -> Seq.empty)
+    val expectedAllTopics = Set(remainingTopicAndPartition.topic)
+
+    assertEquals(expectedPartitionLeadershipInfo, controllerContext.partitionLeadershipInfo)
+    assertEquals(expectedPartitionReplicaAssignment, controllerContext.partitionReplicaAssignment)
+    assertEquals(expectedAllTopics, controllerContext.allTopics)
+  }
+}
\ No newline at end of file
-- 
1.7.12.4

