From 08e294a86c2744c47feca2f30cdbcfc206bf0e73 Mon Sep 17 00:00:00 2001 From: Grant Henke Date: Thu, 16 Jul 2015 09:21:07 -0500 Subject: [PATCH] KAFKA-2336: Changing offsets.topic.num.partitions after the offset topic is created breaks consumer group partition assignment --- core/src/main/scala/kafka/server/OffsetManager.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 47b6ce9..197e957 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -96,6 +96,7 @@ class OffsetManager(val config: OffsetManagerConfig, private val loadingPartitions: mutable.Set[Int] = mutable.Set() private val cleanupOrLoadMutex = new Object private val shuttingDown = new AtomicBoolean(false) + private val offsetsTopicPartitionCount = getOffsetsTopicPartitionCount this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: " @@ -170,7 +171,7 @@ class OffsetManager(val config: OffsetManagerConfig, } - def partitionFor(group: String): Int = Utils.abs(group.hashCode) % config.offsetsTopicNumPartitions + def partitionFor(group: String): Int = Utils.abs(group.hashCode) % offsetsTopicPartitionCount /** * Fetch the current offset for the given group/topic/partition from the underlying offsets storage. @@ -436,13 +437,25 @@ class OffsetManager(val config: OffsetManagerConfig, if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition." .format(numRemoved, TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition))) - } def shutdown() { shuttingDown.set(true) } + /** + * Gets the partition count off the offsets topic from ZooKeeper. + * If the topic does not exist, the configured partition count is returned. + */ + private def getOffsetsTopicPartitionCount = { + val topic = ConsumerCoordinator.OffsetsTopicName + val topicData = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq(topic)) + if(topicData(topic).nonEmpty) { + topicData(topic).size + } else { + config.offsetsTopicNumPartitions + } + } } object OffsetManager { -- 2.4.5