diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e42d104..5e30e6e 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -34,9 +34,10 @@ import kafka.network.BlockingChannel import kafka.serializer._ import kafka.utils.CoreUtils.inLock import kafka.utils.ZkUtils._ +import kafka.utils.ZKWatchedEphemeral import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException -import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection} import org.apache.zookeeper.Watcher.Event.KeeperState import scala.collection._ @@ -90,6 +91,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val rebalanceLock = new Object private var fetcher: Option[ConsumerFetcherManager] = None private var zkClient: ZkClient = null + private var zkConnection : ZkConnection = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long] private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]] @@ -178,7 +180,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def connectZk() { info("Connecting to zookeeper instance at " + config.zkConnect) - zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) + var (client, connection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) + zkClient = client + zkConnection = connection } // Blocks until the offset manager is located and a channel is established to it. @@ -255,15 +259,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // this API is used by unit tests only def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry - + private var zkWatchedEphemeral : ZKWatchedEphemeral = null private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) { info("begin registering consumer " + consumerIdString + " in ZK") val timestamp = SystemTime.milliseconds.toString val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern, "timestamp" -> timestamp)) - createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, - (consumerZKString, consumer) => true, config.zkSessionTimeoutMs) + //createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, + // (consumerZKString, consumer) => true, config.zkSessionTimeoutMs) + + if(zkWatchedEphemeral != null) + zkWatchedEphemeral.halt + zkWatchedEphemeral = new ZKWatchedEphemeral(dirs. + consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, + zkConnection.getZookeeper) + zkWatchedEphemeral.createAndWatch info("end registering consumer " + consumerIdString + " in ZK") } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 74b587e..cb94f8e 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -20,7 +20,7 @@ package kafka.utils import kafka.cluster._ import kafka.consumer.{ConsumerThreadId, TopicCount} import kafka.server.ConfigType -import org.I0Itec.zkclient.ZkClient +import org.I0Itec.zkclient.{ZkClient,ZkConnection} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} import org.I0Itec.zkclient.serialize.ZkSerializer @@ -36,6 +36,15 @@ import kafka.controller.KafkaController import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition +import org.apache.zookeeper.AsyncCallback.StatCallback +import org.apache.zookeeper.AsyncCallback.StringCallback +import org.apache.zookeeper.CreateMode +import org.apache.zookeeper.KeeperException.Code +import org.apache.zookeeper.WatchedEvent +import org.apache.zookeeper.Watcher +import org.apache.zookeeper.ZooDefs.Ids +import org.apache.zookeeper.ZooKeeper + object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" @@ -809,7 +818,13 @@ object ZkUtils extends Logging { def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = { val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer) - zkClient + zkClient + } + + def createZkClientAndConnection(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): (ZkClient, ZkConnection) = { + val zkConnection = new ZkConnection(zkUrl, sessionTimeout) + val zkClient = new ZkClient(zkConnection, connectionTimeout, ZKStringSerializer) + (zkClient, zkConnection) } } @@ -892,3 +907,148 @@ object ZkPath { client.createPersistentSequential(path, data) } } + +class ZKWatchedEphemeral(path : String, + data : String, + zkHandle : ZooKeeper) extends Logging { + private val createCallback = new CreateCallback + private val ephemeralWatcher = new EphemeralWatcher + private val existsCallback = new ExistsCallback + @volatile + private var stop : Boolean = false + private class CreateCallback extends StringCallback { + def processResult(rc : Int, + path : String, + ctx : Object, + name : String) { + Code.get(rc) match { + case Code.OK => { + // check that exists and wait + checkAndWatch + } + case Code.CONNECTIONLOSS => { + // try again + createAndWatch + } + case Code.NONODE => { + error("No node for path %s (could be the parent missing)".format(path)) + } + case Code.SESSIONEXPIRED => { + + error("Session has expired while creating %s".format(path)) + } + case _ => { + info("ZooKeeper event while creating registration node %s %s".format(path, Code.get(rc))) + } + } + } + } + + private class EphemeralWatcher extends Watcher { + def process(event : WatchedEvent) { + // if node deleted, then recreate it + if(!stop && event.getType == Watcher.Event.EventType.NodeDeleted) + createAndWatch + } + } + + private class ExistsCallback extends StatCallback { + def processResult(rc : Int, + path : String, + ctx : Object, + stat : Stat) { + Code.get(rc) match { + case Code.OK => {} + case Code.CONNECTIONLOSS => { + // Backoff and try again + checkAndWatch + } + case Code.SESSIONEXPIRED => { + error("Session has expired while creating %s".format(path)) + } + case _ => { + info("ZooKeeper event while checking if registration node exists %s %s".format(path, Code.get(rc))) + } + } + } + } + + private def checkAndWatch() { + zkHandle.exists(path, + ephemeralWatcher, + existsCallback, + null) + } + + private def createRecursive(prefix : String, suffix : String) { + debug("Path: %s, Prefix: %s, Suffix: %s".format(path, prefix, suffix)) + if(suffix.isEmpty()) { + zkHandle.create(prefix, + data.getBytes(), + Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL, + createCallback, + null) + } else { + zkHandle.create(prefix, + new Array[Byte](0), + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + new StringCallback() { + def processResult(rc : Int, + path : String, + ctx : Object, + name : String) { + Code.get(rc) match { + case Code.OK => { + // Nothing to do + } + case Code.NODEEXISTS => { + // Nothing to do + } + case Code.CONNECTIONLOSS => { + // try again + val suffix = ctx.asInstanceOf[String] + createRecursive(path, suffix) + } + case Code.NONODE => { + error("No node for path %s (could be the parent missing)".format(path)) + } + case Code.SESSIONEXPIRED => { + error("Session has expired while creating %s".format(path)) + } + case _ => { + info("ZooKeeper event while creating registration node %s %s".format(path, Code.get(rc))) + } + } + } + }, + suffix) + // Update prefix and suffix + val index = suffix.indexOf('/', 1) match { + case -1 => suffix.length + case x : Int => x + } + // Get new prefix + val newPrefix = prefix + suffix.substring(0, index) + // Get new suffix + val newSuffix = suffix.substring(index, suffix.length) + createRecursive(newPrefix, newSuffix) + } + } + + def createAndWatch() { + val index = path.indexOf('/', 1) match { + case -1 => path.length + case x : Int => x + } + val prefix = path.substring(0, index) + val suffix = path.substring(index, path.length) + debug("Path: %s, Prefix: %s, Suffix: %s".format(path, prefix, suffix)) + createRecursive(prefix, suffix) + } + + def halt() { + stop = true + } +}