From a02b6083874b9c3edd78c66ac619f594b0dd4548 Mon Sep 17 00:00:00 2001 From: Ze'ev Klapow Date: Wed, 25 Mar 2015 14:32:08 -0400 Subject: [PATCH 1/7] ZookeeperConsumer: listen for epoch changes. --- .../consumer/ZookeeperConsumerConnector.scala | 25 ++++++++++++++++++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 3 +++ 2 files changed, 28 insertions(+) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e42d104..aec621e 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -99,6 +99,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var sessionExpirationListener: ZKSessionExpireListener = null private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null private var loadBalancerListener: ZKRebalancerListener = null + private var epochChangeListener: ZKEpochChangeListener = null private var offsetsChannel: BlockingChannel = null private val offsetsChannelLock = new Object @@ -534,6 +535,24 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } + class ZKEpochChangeListener(val loadBalancerListener: ZKRebalancerListener) + extends IZkDataListener { + + def handleDataChange(dataPath: String, data: Object): Unit = { + try { + info("Epoch at " + dataPath + " changed to " + data.toString + ", triggering rebalance") + loadBalancerListener.rebalanceEventTriggered(); + } catch { + case e: Throwable => error("Error while handling topic partition change for data path " + dataPath, e ) + } + } + + @throws(classOf[Exception]) + def handleDataDeleted(dataPath: String) { + warn("Epoch node at path " + dataPath + " was deleted, which should not happen at this time") + } + } + class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { @@ -894,6 +913,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (topicPartitionChangeListener == null) topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener) + if (epochChangeListener == null) + epochChangeListener = new ZKEpochChangeListener(loadBalancerListener) + val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams // map of {topic -> Set(thread-1, thread-2, ...)} @@ -950,6 +972,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) + // listen for epoch changes + zkClient.subscribeDataChanges(ZkUtils.getConsumerEpochPath(config.groupId), epochChangeListener) + topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 166814c..9cc6c62 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -76,6 +76,9 @@ object ZkUtils extends Logging { def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String = getTopicPartitionPath(topic, partitionId) + "/" + "state" + def getConsumerEpochPath(groupId: String): String = + ConsumersPath + "/" + groupId + "/" + "epoch" + def getSortedBrokerList(zkClient: ZkClient): Seq[Int] = ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).sorted -- 2.4.1 From ca074a4211fa23ff9f8810493b964a6f763bcedc Mon Sep 17 00:00:00 2001 From: Ze'ev Klapow Date: Wed, 25 Mar 2015 14:40:51 -0400 Subject: [PATCH 2/7] ZookeeperConsumer: Update epoch on start. --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index aec621e..0b359ff 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -265,6 +265,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, (consumerZKString, consumer) => true, config.zkSessionTimeoutMs) + + info("updating epoch to: " + consumerIdString) + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getConsumerEpochPath(config.groupId)) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getConsumerEpochPath(config.groupId), consumerIdString) + info("end registering consumer " + consumerIdString + " in ZK") } -- 2.4.1 From d4fe519b29ca74547a7e09597173f8934b2bc616 Mon Sep 17 00:00:00 2001 From: Ze'ev Klapow Date: Wed, 25 Mar 2015 15:46:17 -0400 Subject: [PATCH 3/7] ZookeeperConsumer: debouncede rebalance. --- .../consumer/ZookeeperConsumerConnector.scala | 37 +++++++++++++++++----- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 0b359ff..2432922 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -39,8 +39,8 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState -import scala.collection._ import scala.collection.JavaConversions._ +import scala.collection._ /** @@ -95,6 +95,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-") private val messageStreamCreated = new AtomicBoolean(false) + private var consumerIdCache: Set[String] = null private var sessionExpirationListener: ZKSessionExpireListener = null private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null @@ -525,7 +526,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, try { info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance") // queue up the rebalance event - loadBalancerListener.rebalanceEventTriggered() + loadBalancerListener.compareAndTriggerRebalance() // There is no need to re-subscribe the watcher since it will be automatically // re-registered upon firing of this event by zkClient } catch { @@ -546,7 +547,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def handleDataChange(dataPath: String, data: Object): Unit = { try { info("Epoch at " + dataPath + " changed to " + data.toString + ", triggering rebalance") - loadBalancerListener.rebalanceEventTriggered(); + loadBalancerListener.compareAndTriggerRebalance(); } catch { case e: Throwable => error("Error while handling topic partition change for data path " + dataPath, e ) } @@ -559,7 +560,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } class ZKRebalancerListener(val group: String, val consumerIdString: String, - val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) + val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]], + val dirs: ZKGroupDirs) extends IZkChildListener { private val partitionAssignor = PartitionAssignor.createInstance(config.partitionAssignmentStrategy) @@ -604,11 +606,30 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, watcherExecutorThread.start() @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - rebalanceEventTriggered() + def handleChildChange(parentPath : String, curChilds : java.util.List[String]): Unit = { + compareAndTriggerRebalance() } - def rebalanceEventTriggered() { + def compareAndTriggerRebalance() { + val consumerIds = zkClient.getChildren(dirs.consumerRegistryDir).toSet + + if (consumerIdCache == consumerIds) { + info("Consumer IDs match old state, not triggering rebalance.") + } else { + info("Consumer IDs changed, triggering rebalance") + consumerIdCache = consumerIds + + if (lock.tryLock()) { + try { + rebalanceEventTriggered() + } finally { + lock.unlock() + } + } + } + } + + private def rebalanceEventTriggered() { inLock(lock) { isWatcherTriggered = true cond.signalAll() @@ -906,7 +927,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (loadBalancerListener == null) { val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] loadBalancerListener = new ZKRebalancerListener( - config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) + config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]], dirs) } // create listener for session expired event if not exist yet -- 2.4.1 From 7dd8cee1aa8187beda6cb5591c9ed541e40650ac Mon Sep 17 00:00:00 2001 From: Ze'ev Klapow Date: Wed, 25 Mar 2015 15:58:37 -0400 Subject: [PATCH 4/7] ZookeeperConsumer: Lock consumer check not trigger --- .../consumer/ZookeeperConsumerConnector.scala | 38 +++++++++------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 2432922..f4db6e8 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -606,36 +606,30 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, watcherExecutorThread.start() @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]): Unit = { + def handleChildChange(parentPath: String, curChilds: java.util.List[String]): Unit = { compareAndTriggerRebalance() } def compareAndTriggerRebalance() { - val consumerIds = zkClient.getChildren(dirs.consumerRegistryDir).toSet - - if (consumerIdCache == consumerIds) { - info("Consumer IDs match old state, not triggering rebalance.") - } else { - info("Consumer IDs changed, triggering rebalance") - consumerIdCache = consumerIds - - if (lock.tryLock()) { - try { - rebalanceEventTriggered() - } finally { - lock.unlock() - } - } - } - } - - private def rebalanceEventTriggered() { inLock(lock) { - isWatcherTriggered = true - cond.signalAll() + val consumerIds = zkClient.getChildren(dirs.consumerRegistryDir).toSet + + if (consumerIdCache == consumerIds) { + info("Consumer IDs match old state, not triggering rebalance.") + } else { + info("Consumer IDs changed, triggering rebalance") + consumerIdCache = consumerIds + + rebalanceEventTriggered() + } } } + private def rebalanceEventTriggered() { + isWatcherTriggered = true + cond.signalAll() + } + private def deletePartitionOwnershipFromZK(topic: String, partition: Int) { val topicDirs = new ZKGroupTopicDirs(group, topic) val znode = topicDirs.consumerOwnerDir + "/" + partition -- 2.4.1 From 3d0fde482fbe557d1855c587fb3ce3f7122639c2 Mon Sep 17 00:00:00 2001 From: Ze'ev Klapow Date: Wed, 25 Mar 2015 16:20:55 -0400 Subject: [PATCH 5/7] ZookeeperConsumer: periodically check balance. --- .../main/scala/kafka/consumer/ConsumerConfig.scala | 13 ++++++++++ .../consumer/ZookeeperConsumerConnector.scala | 28 +++++++++++++++++++++- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 0199317..35add58 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -53,6 +53,9 @@ object ConsumerConfig extends Config { val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" val DefaultClientId = "" + val RebalanceCheck = true + val RebalanceCheckIntervalMs = 30 * 1000 + def validate(config: ConsumerConfig) { validateClientId(config.clientId) validateGroupId(config.groupId) @@ -181,6 +184,16 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */ val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy) + /* + * if true, periodically check to make sure the consumer is properly balanced + */ + val rebalanceCheckEnabled = props.getBoolean("rebalance.check.enabled", RebalanceCheck) + + /* + * if true, periodically check to make sure the consumer is properly balanced + */ + val rebalanceCheckIntervalMs = props.getInt("rebalance.check.interval.ms") + validate(this) } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index f4db6e8..963675a 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -132,8 +132,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, createFetcher() ensureOffsetManagerConnected() - if (config.autoCommitEnable) { + if (config.autoCommitEnable || config.rebalanceCheckEnabled) scheduler.startup + + if (config.autoCommitEnable) { info("starting auto committer every " + config.autoCommitIntervalMs + " ms") scheduler.schedule("kafka-consumer-autocommit", autoCommit, @@ -142,6 +144,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, unit = TimeUnit.MILLISECONDS) } + if (config.rebalanceCheckEnabled) { + info("starting rebalance check every " + config.rebalanceCheckIntervalMs + "ms") + scheduler.schedule("kafka-consumer-rebalance-check", + rebalanceCheck, + delay = config.rebalanceCheckIntervalMs, + period = config.rebalanceCheckIntervalMs, + unit = TimeUnit.MILLISECONDS) + } + KafkaMetricsReporter.startReporters(config.props) AppInfo.registerInfo() @@ -404,6 +415,21 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } + def rebalanceCheck() { + trace("rebalance check") + + try { + if (loadBalancerListener != null) { + loadBalancerListener.compareAndTriggerRebalance() + } else { + debug("no load balancer listener available, not checking for rebalance") + } + } catch { + case t: Throwable => + error("exception while checking for rebalance: ", t) + } + } + private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = { val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) val offsetString = readDataMaybeNull(zkClient, dirs.consumerOffsetDir + "/" + topicPartition.partition)._1 -- 2.4.1 From e4af520c7f7be09ab78b55644745ccbbcbc16fd3 Mon Sep 17 00:00:00 2001 From: Ze'ev Klapow Date: Wed, 25 Mar 2015 16:45:09 -0400 Subject: [PATCH 6/7] ConsumerConfig: set default for rebalanceCheck --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 35add58..1ffeef2 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -192,7 +192,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /* * if true, periodically check to make sure the consumer is properly balanced */ - val rebalanceCheckIntervalMs = props.getInt("rebalance.check.interval.ms") + val rebalanceCheckIntervalMs = props.getInt("rebalance.check.interval.ms", RebalanceCheckIntervalMs) validate(this) } -- 2.4.1 From 40aa2b95dfc44f5d742bf6e53a98da56430514cc Mon Sep 17 00:00:00 2001 From: Ze'ev Klapow Date: Sun, 29 Mar 2015 00:18:42 -0400 Subject: [PATCH 7/7] only compare acnd check if zkClient != null --- .../kafka/consumer/ZookeeperConsumerConnector.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 963675a..b2aa62e 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -638,15 +638,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def compareAndTriggerRebalance() { inLock(lock) { - val consumerIds = zkClient.getChildren(dirs.consumerRegistryDir).toSet + if (zkClient != null) { + val consumerIds = zkClient.getChildren(dirs.consumerRegistryDir).toSet - if (consumerIdCache == consumerIds) { - info("Consumer IDs match old state, not triggering rebalance.") - } else { - info("Consumer IDs changed, triggering rebalance") - consumerIdCache = consumerIds + if (consumerIdCache == consumerIds) { + info("Consumer IDs match old state, not triggering rebalance.") + } else { + info("Consumer IDs changed, triggering rebalance") + consumerIdCache = consumerIds - rebalanceEventTriggered() + rebalanceEventTriggered() + } } } } -- 2.4.1