Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala	(revision 68103)
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala	(working copy)
@@ -357,7 +357,7 @@
 
   def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
     val brokers = ids.map(id => new Broker(id, "localhost", 6667))
-    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, 6000, jmxPort = -1))
+    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, jmxPort = -1))
     brokers
   }
 
Index: core/src/main/scala/kafka/server/KafkaHealthcheck.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaHealthcheck.scala	(revision 68103)
+++ core/src/main/scala/kafka/server/KafkaHealthcheck.scala	(working copy)
@@ -17,7 +17,9 @@
 
 package kafka.server
 
+import kafka.cluster.Broker
 import kafka.utils._
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
 import java.net.InetAddress
@@ -38,49 +40,41 @@
                        private val zkClient: ZkClient) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
+  val brokerNodeMonitor = new EphemeralNodeMonitor(zkClient, brokerIdPath, register)
   
   def startup() {
-    zkClient.subscribeStateChanges(new SessionExpireListener)
+    try {
+      ZkUtils.ensureNodeDoesNotExist(zkClient, brokerIdPath, isMyData, zkSessionTimeoutMs / 2)
+    }
+    catch {
+      case e: ZkNodeExistsException =>
+        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
+          + ". This probably indicates that you have configured a brokerid that is already in use", e)
+    }
+    
+    // Request notification if the broker node that is about to be created is ever deleted.  This should only
+    // happen if the ZooKeeper connection fails and a new session is started.
+    brokerNodeMonitor.start()
     register()
   }
+  
+  def advertisedHostName: String = {
+    if (advertisedHost == null || advertisedHost.trim.isEmpty)
+      InetAddress.getLocalHost.getCanonicalHostName
+    else
+      advertisedHost
+  }
 
+  def isMyData(storedData: String): Boolean = {
+    Broker.createBroker(brokerId, storedData).equals(new Broker(brokerId, advertisedHostName, advertisedPort))
+  }
+  
   /**
    * Register this broker as "alive" in zookeeper
    */
   def register() {
-    val advertisedHostName = 
-      if(advertisedHost == null || advertisedHost.trim.isEmpty) 
-        InetAddress.getLocalHost.getCanonicalHostName 
-      else
-        advertisedHost
+    info("Start registering broker %s in ZooKeeper".format(brokerId))  
     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
-    ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs, jmxPort)
+    ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, jmxPort)
   }
-
-  /**
-   *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
-   *  connection for us. We need to re-register this broker in the broker registry.
-   */
-  class SessionExpireListener() extends IZkStateListener {
-    @throws(classOf[Exception])
-    def handleStateChanged(state: KeeperState) {
-      // do nothing, since zkclient will do reconnect for us.
-    }
-
-    /**
-     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
-     * any ephemeral nodes here.
-     *
-     * @throws Exception
-     *             On any error.
-     */
-    @throws(classOf[Exception])
-    def handleNewSession() {
-      info("re-registering broker info in ZK for broker " + brokerId)
-      register()
-      info("done re-registering broker")
-      info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
-    }
-  }
-
 }
Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision 68103)
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(working copy)
@@ -90,7 +90,7 @@
   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
   private val messageStreamCreated = new AtomicBoolean(false)
 
-  private var sessionExpirationListener: ZKSessionExpireListener = null
+  private var consumerNodeMonitor: EphemeralNodeMonitor = null
   private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null
   private var loadBalancerListener: ZKRebalancerListener = null
 
@@ -162,6 +162,8 @@
       if (canShutdown) {
         info("ZKConsumerConnector shutting down")
 
+        consumerNodeMonitor.close()
+        
         if (wildcardTopicWatcher != null)
           wildcardTopicWatcher.shutdown()
         try {
@@ -223,8 +225,7 @@
     val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
                                                   "timestamp" -> timestamp))
 
-    createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null,
-                                                 (consumerZKString, consumer) => true, config.zkSessionTimeoutMs)
+    createEphemeralPath(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
     info("end registering consumer " + consumerIdString + " in ZK")
   }
 
@@ -273,41 +274,6 @@
     }
   }
 
-  class ZKSessionExpireListener(val dirs: ZKGroupDirs,
-                                 val consumerIdString: String,
-                                 val topicCount: TopicCount,
-                                 val loadBalancerListener: ZKRebalancerListener)
-    extends IZkStateListener {
-    @throws(classOf[Exception])
-    def handleStateChanged(state: KeeperState) {
-      // do nothing, since zkclient will do reconnect for us.
-    }
-
-    /**
-     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
-     * any ephemeral nodes here.
-     *
-     * @throws Exception
-     *             On any error.
-     */
-    @throws(classOf[Exception])
-    def handleNewSession() {
-      /**
-       *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
-       *  connection for us. We need to release the ownership of the current consumer and re-register this
-       *  consumer in the consumer registry and trigger a rebalance.
-       */
-      info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
-      loadBalancerListener.resetState()
-      registerConsumerInZK(dirs, consumerIdString, topicCount)
-      // explicitly trigger load balancing for this consumer
-      loadBalancerListener.syncedRebalance()
-      // There is no need to resubscribe to child and state changes.
-      // The child change watchers will be set inside rebalance when we read the children list.
-    }
-
-  }
-
   class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener)
     extends IZkDataListener {
 
@@ -651,9 +617,24 @@
     }
 
     // create listener for session expired event if not exist yet
-    if (sessionExpirationListener == null)
-      sessionExpirationListener = new ZKSessionExpireListener(
-        dirs, consumerIdString, topicCount, loadBalancerListener)
+    if (consumerNodeMonitor == null) {
+      // Define a function ot be called in the monitor detects that the node has been deleted.
+      def handleNewSession() {
+        /**
+         * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
+         * connection for us. We need to release the ownership of the current consumer and re-register this
+         * consumer in the consumer registry and trigger a rebalance.
+         */
+        info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
+        loadBalancerListener.resetState()
+        registerConsumerInZK(dirs, consumerIdString, topicCount)
+        // explicitly trigger load balancing for this consumer
+        loadBalancerListener.syncedRebalance()
+        // There is no need to resubscribe to child and state changes.
+        // The child change watchers will be set inside rebalance when we read the children list.
+      }
+      consumerNodeMonitor = new EphemeralNodeMonitor(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, handleNewSession)
+    }
 
     // create listener for topic partition change event if not exist yet
     if (topicPartitionChangeListener == null)
@@ -708,7 +689,7 @@
     })
 
     // listener to consumer and partition changes
-    zkClient.subscribeStateChanges(sessionExpirationListener)
+    consumerNodeMonitor.start()
 
     zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
 
Index: core/src/main/scala/kafka/utils/ZkUtils.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZkUtils.scala	(revision 68103)
+++ core/src/main/scala/kafka/utils/ZkUtils.scala	(working copy)
@@ -19,9 +19,10 @@
 
 import kafka.cluster.{Broker, Cluster}
 import kafka.consumer.TopicCount
-import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
+import org.I0Itec.zkclient.{IZkStateListener, IZkDataListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
 import org.I0Itec.zkclient.serialize.ZkSerializer
+import org.apache.zookeeper.Watcher.Event.KeeperState
 import collection._
 import kafka.api.LeaderAndIsr
 import mutable.ListBuffer
@@ -189,24 +190,13 @@
     }
   }
 
-  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) {
+  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val timestamp = SystemTime.milliseconds.toString
     val brokerInfo = Json.encode(Map("version" -> 1, "host" -> host, "port" -> port, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
-    val expectedBroker = new Broker(id, host, port)
 
-    try {
-      createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker,
-        (brokerString: String, broker: Any) => Broker.createBroker(broker.asInstanceOf[Broker].id, brokerString).equals(broker.asInstanceOf[Broker]),
-        timeout)
-
-    } catch {
-      case e: ZkNodeExistsException =>
-        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
-          + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or "
-          + "else you have shutdown this broker and restarted it faster than the zookeeper "
-          + "timeout so it appears to be re-registering.")
-    }
+    // The ephemeral should never exist when this method is called.
+    createEphemeralPath(zkClient, brokerIdPath, brokerInfo)
     info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
   }
 
@@ -248,7 +238,7 @@
   /**
    * Create an ephemeral node with the given path and data. Create parents if necessary.
    */
-  private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
+  def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
     try {
       client.createEphemeral(path, data)
     } catch {
@@ -720,6 +710,35 @@
       }.flatten.toSet
     }
   }
+
+  /**
+   * This function is used to ensure that an ephemeral path does not already exist. If it exists then it can be allowed
+   * to delete it.  Otherwise it throws a ZkNodeExistsException.  It is intended to be used at startup to detect stale
+   * ephemerals that exist due to a quick restart of the application.
+   * 
+   * @param zkClient      reference to the ZkClient
+   * @param path          the path that should not exist
+   * @param okToDelete    a function that determines if is OK to delete the node based on the node's current data
+   * @param deleteDelay   how long to delay after deleting a node before returning
+   */
+
+  @throws(classOf[ZkNodeExistsException])
+  def ensureNodeDoesNotExist(zkClient: ZkClient, path: String, okToDelete: String => Boolean = String => true, deleteDelay: Int = 0) {
+    val storedData = readDataMaybeNull(zkClient, path)._1
+    if (storedData.isDefined) {
+      if (okToDelete(storedData.get)) {
+        warn("Delete existing ephemeral node %s with data [%s])".format(path, storedData.get))
+        zkClient.delete(path)
+        if (deleteDelay > 0) {
+          info("Delay for %s ms to allow other nodes to process the delete of ephemeral node %s".format(deleteDelay, path))
+          Thread.sleep(deleteDelay)
+        }
+      }
+      else {
+        throw new ZkNodeExistsException("Node %s exists with data [%s]".format(path, storedData.get))
+      }
+    }
+  }
 }
 
 class LeaderExistsOrChangedListener(topic: String,
@@ -796,3 +815,73 @@
   /** how far a ZK follower can be behind a ZK leader */
   val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000)
 }
+
+/**
+ * This class provides a way to monitor an ephemeral node and notify the caller if this node is deleted.  A check for
+ * the existence of the node is performed when:
+ * 
+ * 1) An new session event is received.  When a new session is created the ZooKeeper will delete the ephemerals associated
+ *    with the old session.  This is typically when the work to recreate the node gets done.
+ * 2) When the node is deleted.  This provides an additional check in case the node is deleted after the new session
+ *    event is processed.  This is intended to avoid the need to loop in the new session event handling code waiting
+ *    for the ephemeral to get deleted.
+ *    
+ * This class should only be used if the node is owned by this applciation.
+ * 
+ * @param zkClient      reference to the ZkClient
+ * @param path          the path to be monitored
+ * @param recreateNode  the function to call when the node is not found
+ */
+
+class EphemeralNodeMonitor(zkClient: ZkClient, path: String, recreateNode: () => Unit) extends Logging {
+
+  val dataListener = new DataListener()
+  val stateListener = new StateListener()
+  
+  def start() {
+    zkClient.subscribeStateChanges(stateListener)
+    zkClient.subscribeDataChanges(path, dataListener)
+  }
+  
+  def close() {
+    zkClient.unsubscribeStateChanges(stateListener)
+    zkClient.unsubscribeDataChanges(path, dataListener)
+  }
+
+  class DataListener() extends IZkDataListener {
+    
+    var oldData: scala.Any = null
+
+    def handleDataChange(dataPath: String, newData: scala.Any) {
+      if (newData != oldData) {
+        oldData = newData
+        info("Ephemeral node %s has new data [%s]".format(dataPath, newData))
+      }
+    }
+
+    def handleDataDeleted(dataPath: String) {
+      if (zkClient.exists(path)) {
+        info("Ephemeral node %s was deleted, but, has already been recreated".format(dataPath))
+      }
+      else {
+        info("Ephemeral node %s was deleted, recreate it".format(dataPath))
+        recreateNode()
+      }
+    }
+  }
+
+  class StateListener() extends IZkStateListener {
+
+    def handleStateChanged(state: KeeperState) {}
+
+    def handleNewSession() {
+      if (zkClient.exists(path)) {
+        info("New session started, but, ephemeral %s already/still exists".format(path))
+      }
+      else {
+        info("New session started, recreate ephemeral node %s".format(path))
+        recreateNode()
+      }
+    }
+  }
+}
