From 6d25d9491f1b6af5c282078b3aba7a0492345ec7 Mon Sep 17 00:00:00 2001 From: Grant Henke Date: Tue, 11 Aug 2015 10:36:19 -0500 Subject: [PATCH] KAFKA-2336: Changing offsets.topic.num.partitions after the offset topic is created breaks consumer group partition assignment --- core/src/main/scala/kafka/server/OffsetManager.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 47b6ce9..0e613e7 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -96,6 +96,7 @@ class OffsetManager(val config: OffsetManagerConfig, private val loadingPartitions: mutable.Set[Int] = mutable.Set() private val cleanupOrLoadMutex = new Object private val shuttingDown = new AtomicBoolean(false) + private val offsetsTopicPartitionCount = getOffsetsTopicPartitionCount this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: " @@ -170,7 +171,7 @@ class OffsetManager(val config: OffsetManagerConfig, } - def partitionFor(group: String): Int = Utils.abs(group.hashCode) % config.offsetsTopicNumPartitions + def partitionFor(group: String): Int = Utils.abs(group.hashCode) % offsetsTopicPartitionCount /** * Fetch the current offset for the given group/topic/partition from the underlying offsets storage. @@ -436,13 +437,24 @@ class OffsetManager(val config: OffsetManagerConfig, if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition." .format(numRemoved, TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition))) - } def shutdown() { shuttingDown.set(true) } + /** + * Gets the partition count of the offsets topic from ZooKeeper. + * If the topic does not exist, the configured partition count is returned. + */ + private def getOffsetsTopicPartitionCount = { + val topic = ConsumerCoordinator.OffsetsTopicName + val topicData = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq(topic)) + if (topicData(topic).nonEmpty) + topicData(topic).size + else + config.offsetsTopicNumPartitions + } } object OffsetManager { -- 2.4.6