From 120617ff6dda47cbc1d164bffd56647041684c75 Mon Sep 17 00:00:00 2001 From: asingh Date: Tue, 7 Jul 2015 11:14:01 -0700 Subject: [PATCH] KAFKA-2317: De-register isrChangeNotificationListener on controller resignation --- core/src/main/scala/kafka/controller/KafkaController.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 09630d0..20f1499 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -342,6 +342,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt */ def onControllerResignation() { // de-register listeners + deregisterIsrChangeNotificationListener() deregisterReassignedPartitionsListener() deregisterPreferredReplicaElectionListener() @@ -891,16 +892,21 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt } } - private def registerReassignedPartitionsListener() = { - zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) - } - private def registerIsrChangeNotificationListener() = { debug("Registering IsrChangeNotificationListener") ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath) zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) } + private def deregisterIsrChangeNotificationListener() = { + debug("De-registering IsrChangeNotificationListener") + zkClient.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) + } + + private def registerReassignedPartitionsListener() = { + zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) + } + private def deregisterReassignedPartitionsListener() = { zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) } -- 2.3.2 (Apple Git-55)