diff --git a/core/src/main/scala/kafka/cluster/Controller.scala b/core/src/main/scala/kafka/cluster/Controller.scala
new file mode 100644
index 0000000..c3dca32
--- /dev/null
+++ b/core/src/main/scala/kafka/cluster/Controller.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+import kafka.utils.Utils._
+import kafka.utils.Json
+import kafka.common.KafkaException
+
+/**
+ * A Kafka broker
+ */
+private[kafka] object Controller {
+
+  def parseControllerId(controllerInfoString: String): Int = {
+    try {
+      Json.parseFull(controllerInfoString) match {
+        case Some(m) =>
+          val controllerInfo = m.asInstanceOf[Map[String, Any]]
+          return controllerInfo.get("brokerid").get.asInstanceOf[Int]
+        case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
+      }
+    } catch {
+      case t =>
+        // It may be due to an incompatible controller register version
+        try {
+          return controllerInfoString.toInt
+        } catch {
+          case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
+        }
+    }
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 17977e7..9cef51a 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -220,28 +220,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
                              ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true))
 
-    while (true) {
-      try {
-        createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
-
-        info("end registering consumer " + consumerIdString + " in ZK")
-        return
-      } catch {
-        case e: ZkNodeExistsException => {
-          // An ephemeral node may still exist even after its corresponding session has expired
-          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
-          // and hence the write succeeds without ZkNodeExistsException
-          ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1 match {
-            case Some(consumerZKString) => {
-              info("I wrote this conflicted ephemeral node a while back in a different session, "
-                + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
-              Thread.sleep(config.zkSessionTimeoutMs)
-            }
-            case None => // the node disappeared; retry creating the ephemeral node immediately
-          }
-        }
-      }
-    }
+    createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, (consumerZKString, consumer) => true, config.zkSessionTimeoutMs)
+    info("end registering consumer " + consumerIdString + " in ZK")
   }
 
   private def sendShutdownToAllQueues() = {
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 800f900..c1050ab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -37,7 +37,7 @@ import scala.Some
 import kafka.common.TopicAndPartition
 
 class ControllerContext(val zkClient: ZkClient,
-                        val zkSessionTimeout: Int,
+                        val zkSessionTimout: Int,
                         var controllerChannelManager: ControllerChannelManager = null,
                         val controllerLock: Object = new Object,
                         var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index d785db9..b3da232 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -21,6 +21,7 @@ import kafka.utils.{Json, Utils, SystemTime, Logging}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
+import kafka.cluster.Controller
 import kafka.common.KafkaException
 
 /**
@@ -52,57 +53,25 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false)
         ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
 
-    var electNotDone = true
-    do {
-      electNotDone = false
-      try {
-        createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, electString)
-
-        info(brokerId + " successfully elected as leader")
-        leaderId = brokerId
-        onBecomingLeader()
+    try {
+      createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, leaderId,
+        (controllerString : String, leaderId : Any) => Controller.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
+        controllerContext.zkSessionTimout)
+      info(brokerId + " successfully elected as leader")
+      leaderId = brokerId
+      onBecomingLeader()
       } catch {
         case e: ZkNodeExistsException =>
-          readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
-          // If someone else has written the path, then read the broker id
-            case Some(controllerString) =>
-              try {
-                Json.parseFull(controllerString) match {
-                  case Some(m) =>
-                    val controllerInfo = m.asInstanceOf[Map[String, Any]]
-                    leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
-                    if (leaderId != brokerId) {
-                      info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
-                    } else {
-                      info("I wrote this conflicted ephemeral node a while back in a different session, "
-                        + "hence I will retry")
-                      electNotDone = true
-                      Thread.sleep(controllerContext.zkSessionTimeout)
-                    }
-                  case None =>
-                    warn("Error while reading leader info %s on broker %d, may be it is an old version".format(controllerString, brokerId))
-                    throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. May be it is an old version")
-                }
-              } catch {
-                case t =>
-                  // It may be due to an incompatible controller register version
-                  info("Failed to parse the controller info as json. " +
-                    "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controllerString))
-                  try {
-                    leaderId = controllerString.toInt
-                    info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
-                  } catch {
-                    case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
-                  }
-              }
-            case None =>
-              // The node disappears, retry immediately
+          // If someone else has written the path, then
+          leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+            case Some(controller) => Controller.parseControllerId(controller)
+            case None => throw new KafkaException("Controller doesn't exist")
           }
+          debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
         case e2 =>
           error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
           resign()
-      }
-    } while (electNotDone)
+    }
 
     amILeader
   }
@@ -129,25 +98,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       controllerContext.controllerLock synchronized {
-        try {
-          Json.parseFull(data.toString) match {
-            case Some(m) =>
-              val controllerInfo = m.asInstanceOf[Map[String, Any]]
-              leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
-              info("New leader is %d".format(leaderId))
-            case None =>
-              error("Error while reading the leader info %s".format(data.toString))
-          }
-        } catch {
-          case t =>
-            // It may be due to an incompatible controller register version
-            try {
-              leaderId = data.toString.toInt
-              info("New leader is %d".format(leaderId))
-            } catch {
-              case t => throw new KafkaException("Failed to parse the leader info from zookeeper: " + data, t)
-            }
-        }
+        leaderId = Controller.parseControllerId(data.toString)
+        info("New leader is %d".format(leaderId))
       }
     }
 
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 9772af8..bd5fd0d 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -34,6 +34,7 @@ import kafka.controller.PartitionAndReplica
 import scala.Some
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
+import kafka.cluster.Controller
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -54,25 +55,7 @@ object ZkUtils extends Logging {
 
   def getController(zkClient: ZkClient): Int= {
     readDataMaybeNull(zkClient, ControllerPath)._1 match {
-      case Some(controller) =>
-        try {
-          Json.parseFull(controller) match {
-            case Some(m) =>
-              val controllerInfo = m.asInstanceOf[Map[String, Any]]
-              controllerInfo.get("brokerid").get.asInstanceOf[Int]
-            case None => throw new KafkaException("Failed to parse the controller info json [%s] from zookeeper.".format(controller))
-          }
-        } catch {
-          case t =>
-            // It may be due to an incompatible controller register version
-            info("Failed to parse the controller info as json. " +
-              "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controller))
-            try {
-              controller.toInt
-            } catch {
-              case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
-            }
-        }
+      case Some(controller) => Controller.parseControllerId(controller)
       case None => throw new KafkaException("Controller doesn't exist")
     }
   }
@@ -206,36 +189,18 @@ object ZkUtils extends Logging {
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
                              Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp),
                                                    valueInQuotes = false))
+    val selfBroker = new Broker(id, host, port)
 
-    while (true) {
-      try {
-        createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
+    try {
+      createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, selfBroker,
+        (brokerString: String, broker: Any) => Broker.createBroker(broker.asInstanceOf[Broker].id, brokerString).equals(broker.asInstanceOf[Broker]),
+        timeout)
 
-        info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
-        return
-      } catch {
-        case e: ZkNodeExistsException => {
-          // An ephemeral node may still exist even after its corresponding session has expired
-          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
-          // and hence the write succeeds without ZkNodeExistsException
-          ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + id)._1 match {
-            case Some(brokerZKString) => {
-              val broker = Broker.createBroker(id, brokerZKString)
-              if (broker.host == host && broker.port == port) {
-                info("I wrote this conflicted ephemeral node [%s] a while back in a different session, ".format(brokerZKString)
-                  + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
-                Thread.sleep(timeout)
-              } else {
-                // otherwise, throw the runtime exception
-                throw new RuntimeException("Another broker [%s:%s] other than the current broker [%s:%s] is already registered on the path %s."
-                  .format(broker.host, broker.port, host, port, brokerIdPath))
-              }
-            }
-            case None => // the node disappeared; retry creating the ephemeral node immediately
-          }
-        }
-      }
+    } catch {
+      case e: ZkNodeExistsException =>
+        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
     }
+    info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
   }
 
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
@@ -318,6 +283,40 @@ object ZkUtils extends Logging {
   }
 
   /**
+   * Create an ephemeral node with the given path and ata.
+   * Throw NodeExistEception if node already exists.
+   * Handles the ZK session timeout bug
+   */
+  def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, caller: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = {
+    while (true) {
+      try {
+        createEphemeralPathExpectConflict(zkClient, path, data)
+        return
+      } catch {
+        case e: ZkNodeExistsException => {
+          // An ephemeral node may still exist even after its corresponding session has expired
+          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
+          // and hence the write succeeds without ZkNodeExistsException
+          ZkUtils.readDataMaybeNull(zkClient, path)._1 match {
+            case Some(writtenData) => {
+              if (checker(writtenData, caller)) {
+                info("I wrote this conflicted ephemeral node [%s] a while back in a different session, ".format(data)
+                  + "hence I will backoff for this node to be deleted by Zookeeper and retry")
+
+                Thread.sleep(backoffTime)
+              } else {
+                throw e
+              }
+            }
+            case None => // the node disappeared; retry creating the ephemeral node immediately
+          }
+        }
+        case e2 => throw e2
+      }
+    }
+  }
+
+  /**
    * Create an persistent node with the given path and data. Create parents if necessary.
    */
   def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
