diff --git a/core/src/main/scala/kafka/cluster/Controller.scala b/core/src/main/scala/kafka/cluster/Controller.scala
new file mode 100644
index 0000000..c3dca32
--- /dev/null
+++ b/core/src/main/scala/kafka/cluster/Controller.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+import kafka.utils.Utils._
+import kafka.utils.Json
+import kafka.common.KafkaException
+
+/**
+ * A Kafka broker
+ */
+private[kafka] object Controller {
+
+  def parseControllerId(controllerInfoString: String): Int = {
+    try {
+      Json.parseFull(controllerInfoString) match {
+        case Some(m) =>
+          val controllerInfo = m.asInstanceOf[Map[String, Any]]
+          return controllerInfo.get("brokerid").get.asInstanceOf[Int]
+        case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
+      }
+    } catch {
+      case t =>
+        // It may be due to an incompatible controller register version
+        try {
+          return controllerInfoString.toInt
+        } catch {
+          case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
+        }
+    }
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 0ca2850..9cef51a 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -213,12 +213,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   // this API is used by unit tests only
   def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry
 
-  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
+  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
     info("begin registering consumer " + consumerIdString + " in ZK")
+    val timestamp = SystemTime.milliseconds.toString
     val consumerRegistrationInfo =
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
-                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true))
-    createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
+                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true))
+
+    createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, (consumerZKString, consumer) => true, config.zkSessionTimeoutMs)
     info("end registering consumer " + consumerIdString + " in ZK")
   }
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index c87caab..c1050ab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -37,6 +37,7 @@ import scala.Some
 import kafka.common.TopicAndPartition
 
 class ControllerContext(val zkClient: ZkClient,
+                        val zkSessionTimout: Int,
                         var controllerChannelManager: ControllerChannelManager = null,
                         val controllerLock: Object = new Object,
                         var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
@@ -83,7 +84,7 @@ object KafkaController {
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
-  val controllerContext = new ControllerContext(zkClient)
+  val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
   private val partitionStateMachine = new PartitionStateMachine(this)
   private val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 574922b..b3da232 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,10 +17,12 @@
 package kafka.server
 
 import kafka.utils.ZkUtils._
-import kafka.utils.Logging
+import kafka.utils.{Json, Utils, SystemTime, Logging}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
+import kafka.cluster.Controller
+import kafka.common.KafkaException
 
 /**
  * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
@@ -46,23 +48,31 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
 
   def elect: Boolean = {
     controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
+    val timestamp = SystemTime.milliseconds.toString
+    val electString =
+      Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false)
+        ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
+
     try {
-      createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString)
+      createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, leaderId,
+        (controllerString : String, leaderId : Any) => Controller.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
+        controllerContext.zkSessionTimout)
       info(brokerId + " successfully elected as leader")
       leaderId = brokerId
       onBecomingLeader()
-    } catch {
-      case e: ZkNodeExistsException =>
-        // If someone else has written the path, then
-        val data: String = controllerContext.zkClient.readData(electionPath, true)
-        debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId))
-        if (data != null) {
-          leaderId = data.toInt
-        }
-      case e2 =>
-        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
-        resign()
+      } catch {
+        case e: ZkNodeExistsException =>
+          // If someone else has written the path, then
+          leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+            case Some(controller) => Controller.parseControllerId(controller)
+            case None => throw new KafkaException("Controller doesn't exist")
+          }
+          debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+        case e2 =>
+          error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
+          resign()
     }
+
     amILeader
   }
 
@@ -88,7 +98,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       controllerContext.controllerLock synchronized {
-        leaderId = data.toString.toInt
+        leaderId = Controller.parseControllerId(data.toString)
         info("New leader is %d".format(leaderId))
       }
     }
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 0072a1a..bd5fd0d 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -34,6 +34,7 @@ import kafka.controller.PartitionAndReplica
 import scala.Some
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
+import kafka.cluster.Controller
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -54,7 +55,7 @@ object ZkUtils extends Logging {
 
   def getController(zkClient: ZkClient): Int= {
     readDataMaybeNull(zkClient, ControllerPath)._1 match {
-      case Some(controller) => controller.toInt
+      case Some(controller) => Controller.parseControllerId(controller)
       case None => throw new KafkaException("Controller doesn't exist")
     }
   }
@@ -188,36 +189,18 @@ object ZkUtils extends Logging {
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
                              Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp),
                                                    valueInQuotes = false))
+    val selfBroker = new Broker(id, host, port)
 
-    while (true) {
-      try {
-        createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
+    try {
+      createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, selfBroker,
+        (brokerString: String, broker: Any) => Broker.createBroker(broker.asInstanceOf[Broker].id, brokerString).equals(broker.asInstanceOf[Broker]),
+        timeout)
 
-        info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
-        return
-      } catch {
-        case e: ZkNodeExistsException => {
-          // An ephemeral node may still exist even after its corresponding session has expired
-          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
-          // and hence the write succeeds without ZkNodeExistsException
-          ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + id)._1 match {
-            case Some(brokerZKString) => {
-              val broker = Broker.createBroker(id, brokerZKString)
-              if (broker.host == host && broker.port == port) {
-                info("I wrote this conflicted ephemeral node [%s] a while back in a different session, ".format(brokerZKString)
-                  + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
-                Thread.sleep(timeout)
-              } else {
-                // otherwise, throw the runtime exception
-                throw new RuntimeException("Another broker [%s:%s] other than the current broker [%s:%s] is already registered on the path %s."
-                  .format(broker.host, broker.port, host, port, brokerIdPath))
-              }
-            }
-            case None => // the node disappeared; retry creating the ephemeral node immediately
-          }
-        }
-      }
+    } catch {
+      case e: ZkNodeExistsException =>
+        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
     }
+    info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
   }
 
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
@@ -300,6 +283,40 @@ object ZkUtils extends Logging {
   }
 
   /**
+   * Create an ephemeral node with the given path and ata.
+   * Throw NodeExistEception if node already exists.
+   * Handles the ZK session timeout bug
+   */
+  def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, caller: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = {
+    while (true) {
+      try {
+        createEphemeralPathExpectConflict(zkClient, path, data)
+        return
+      } catch {
+        case e: ZkNodeExistsException => {
+          // An ephemeral node may still exist even after its corresponding session has expired
+          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
+          // and hence the write succeeds without ZkNodeExistsException
+          ZkUtils.readDataMaybeNull(zkClient, path)._1 match {
+            case Some(writtenData) => {
+              if (checker(writtenData, caller)) {
+                info("I wrote this conflicted ephemeral node [%s] a while back in a different session, ".format(data)
+                  + "hence I will backoff for this node to be deleted by Zookeeper and retry")
+
+                Thread.sleep(backoffTime)
+              } else {
+                throw e
+              }
+            }
+            case None => // the node disappeared; retry creating the ephemeral node immediately
+          }
+        }
+        case e2 => throw e2
+      }
+    }
+  }
+
+  /**
    * Create an persistent node with the given path and data. Create parents if necessary.
    */
   def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index c4328f0..70e4b51 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -124,7 +124,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     val controllerId = 2
     val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
-    val controllerContext = new ControllerContext(zkClient)
+    val controllerContext = new ControllerContext(zkClient, 6000)
     controllerContext.liveBrokers = brokers.toSet
     val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)
     controllerChannelManager.startup()
