diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index a2ea5a9..d7b810f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -211,13 +211,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   // this API is used by unit tests only
   def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry
 
-  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
+  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
     info("begin registering consumer " + consumerIdString + " in ZK")
+    val timestamp = SystemTime.milliseconds.toString
     val consumerRegistrationInfo =
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
-                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true))
-    createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
-    info("end registering consumer " + consumerIdString + " in ZK")
+                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true))
+
+    while (true) {
+      try {
+        createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
+
+        info("end registering consumer " + consumerIdString + " in ZK")
+        return
+      } catch {
+        case e: ZkNodeExistsException => {
+          // An ephemeral node may still exist even after its corresponding session has expired
+          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
+          // and hence the write succeeds without ZkNodeExistsException
+          ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1 match {
+            case Some(consumerZKString) => {
+              info("I wrote this conflicted ephemeral node a while back in a different session, "
+                + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
+              Thread.sleep(config.zkSessionTimeoutMs)
+            }
+            case None => // the node disappeared; retry creating the ephemeral node immediately
+          }
+        }
+      }
+    }
   }
 
   private def sendShutdownToAllQueues() = {
diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
index 0e6c656..553640f 100644
--- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
+++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
@@ -48,7 +48,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
       else
         config.hostName 
     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
-    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, jmxPort)
+    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, config.zkSessionTimeoutMs, jmxPort)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 574922b..99660de 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,7 +17,7 @@
 package kafka.server
 
 import kafka.utils.ZkUtils._
-import kafka.utils.Logging
+import kafka.utils.{SystemTime, Logging}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
@@ -46,23 +46,38 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
 
   def elect: Boolean = {
     controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
-    try {
-      createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString)
-      info(brokerId + " successfully elected as leader")
-      leaderId = brokerId
-      onBecomingLeader()
-    } catch {
-      case e: ZkNodeExistsException =>
-        // If someone else has written the path, then
-        val data: String = controllerContext.zkClient.readData(electionPath, true)
-        debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId))
-        if (data != null) {
-          leaderId = data.toInt
-        }
-      case e2 =>
-        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
-        resign()
+    val timestamp = SystemTime.milliseconds.toString
+
+    while (true) {
+      try {
+        createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId + ":" + timestamp)
+        info(brokerId + " successfully elected as leader")
+        leaderId = brokerId
+        onBecomingLeader()
+        return amILeader
+      } catch {
+        case e: ZkNodeExistsException =>
+          // If someone else has written the path, then
+          val data: String = controllerContext.zkClient.readData(electionPath, true)
+          if (data != null) {
+            leaderId = data.toString.split(":").head.toInt
+            if (leaderId != brokerId) {
+              debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+              return amILeader
+            }
+            else {
+              info("I wrote this conflicted ephemeral node a while back in a different session, "
+                + "hence I will retry")
+            }
+          }
+          // else retry immediately
+        case e2 =>
+          error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
+          resign()
+          return amILeader
+      }
     }
+
     amILeader
   }
 
@@ -88,7 +103,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       controllerContext.controllerLock synchronized {
-        leaderId = data.toString.toInt
+        leaderId = data.toString.split(":").head.toInt
         info("New leader is %d".format(leaderId))
       }
     }
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index d53d511..1666e43 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -54,7 +54,7 @@ object ZkUtils extends Logging {
 
   def getController(zkClient: ZkClient): Int= {
     readDataMaybeNull(zkClient, ControllerPath)._1 match {
-      case Some(controller) => controller.toInt
+      case Some(controller) => controller.toString.split(":").head.toInt
       case None => throw new KafkaException("Controller doesn't exist")
     }
   }
@@ -181,19 +181,43 @@ object ZkUtils extends Logging {
     replicas.contains(brokerId.toString)
   }
 
-  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
+  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
+    val timestamp = "\"" + SystemTime.milliseconds.toString + "\""
     val brokerInfo =
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
-                             Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString),
+                             Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp),
                                                    valueInQuotes = false))
-    try {
-      createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
-    } catch {
-      case e: ZkNodeExistsException =>
-        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
+
+    while (true) {
+      try {
+        createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
+
+        info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
+        return
+      } catch {
+        case e: ZkNodeExistsException => {
+          // An ephemeral node may still exist even after its corresponding session has expired
+          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
+          // and hence the write succeeds without ZkNodeExistsException
+          ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + id)._1 match {
+            case Some(brokerZKString) => {
+              val broker = Broker.createBroker(id, brokerZKString)
+              if (broker.host == host && broker.port == port) {
+                info("I wrote this conflicted ephemeral node a while back in a different session, "
+                  + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
+                Thread.sleep(timeout)
+              } else {
+                // otherwise, throw the runtime exception
+                throw new RuntimeException("Another broker [%s:%s] other than the current broker [%s:%s] is already registered on the path %s."
+                  .format(broker.host, broker.port, host, port, brokerIdPath))
+              }
+            }
+            case None => // the node disappeared; retry creating the ephemeral node immediately
+          }
+        }
+      }
     }
-    info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
   }
 
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
 
