diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index d53d511..4f1b9be 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -183,17 +183,42 @@ object ZkUtils extends Logging {
 
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
+    val timeString = new java.text.SimpleDateFormat("dd-MM-yyyy HH:mm:ss.SSS").format(SystemTime.milliseconds);
     val brokerInfo =
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
-                             Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString),
+                             Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timeString),
                                                    valueInQuotes = false))
-    try {
-      createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
-    } catch {
-      case e: ZkNodeExistsException =>
-        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
+
+    while (true) {
+      try {
+        createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
+      } catch {
+        case e: ZkNodeExistsException => {
+          // An ephermeral node may still exist even after its corresponding session has expired
+          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
+          // and hence the write succeeds without ZkNodeExistsException
+          ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + id)._1 match {
+            case Some(brokerZKString) => {
+              val broker = Broker.createBroker(id, brokerZKString)
+              if (broker.host == host && broker.port == port) {
+                info("I wrote this conflicted ephermeral node a while back in a different session, "
+                  + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
+                Thread.sleep(2000)
+              } else {
+                // otherwise, throw the runtime exception
+                throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
+                  + ". This probably indicates that you either have configured a brokerid that is already in use, or "
+                  + "else you have shutdown this broker and restarted it faster than the zookeeper "
+                  + "timeout so it appears to be re-registering.")
+              }
+            }
+            case None => // the node disappeared; retry creating the ephemeral node
+          }
+        }
+      }
+      info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
+      return
     }
-    info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
   }
 
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
