From 171754d292001773e1b7f64aa99ef3b52e073fca Mon Sep 17 00:00:00 2001 From: Sam Meder Date: Tue, 27 Aug 2013 12:52:18 +0200 Subject: [PATCH] [KAFKA-1029] Use brokerId instead of leaderId when triggering ephemeral node retries --- core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index 50e3f79..f1f0625 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -42,19 +42,19 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: def startup { controllerContext.controllerLock synchronized { + controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) elect } } def elect: Boolean = { - controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) val timestamp = SystemTime.milliseconds.toString val electString = Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false) ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true)) try { - createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, leaderId, + createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId, (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int], controllerContext.zkSessionTimeout) info(brokerId + " successfully elected as leader") -- 1.8.3.1