From 79499ad6d26193fac2a19e6abac3dc35fc9c77da Mon Sep 17 00:00:00 2001 From: Onur Karaman Date: Thu, 9 Jul 2015 02:13:53 -0700 Subject: [PATCH] fix heartbeat to allow offset commits during PreparingRebalance --- .../src/main/scala/kafka/coordinator/ConsumerCoordinator.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 476973b..5121aa5 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -244,7 +244,8 @@ class ConsumerCoordinator(val brokerId: Int, responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code)) } else if (!group.has(consumerId)) { responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code)) - } else if (generationId != group.generationId) { + } else if (!(group.is(Stable) && generationId == group.generationId) + && !(group.is(PreparingRebalance) && generationId == group.generationId - 1)) { responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) } else if (!offsetMetadata.keySet.subsetOf(group.get(consumerId).assignedTopicPartitions)) { responseCallback(offsetMetadata.mapValues(_ => Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code)) @@ -341,7 +342,8 @@ class ConsumerCoordinator(val brokerId: Int, private def prepareRebalance(group: ConsumerGroupMetadata) { group.transitionTo(PreparingRebalance) - info("Preparing to rebalance group %s with old generation %s".format(group.groupId, group.generationId)) + group.generationId += 1 + info("Preparing to rebalance group %s generation %s".format(group.groupId, group.generationId)) val rebalanceTimeout = group.rebalanceTimeout val delayedRebalance = new DelayedRebalance(this, group, rebalanceTimeout) @@ -353,9 +355,7 @@ class ConsumerCoordinator(val brokerId: Int, assert(group.notYetRejoinedConsumers == List.empty[ConsumerMetadata]) group.transitionTo(Rebalancing) - group.generationId += 1 - - info("Rebalancing group %s with new generation %s".format(group.groupId, group.generationId)) + info("Rebalancing group %s generation %s".format(group.groupId, group.generationId)) val assignedPartitionsPerConsumer = reassignPartitions(group) trace("Rebalance for group %s generation %s has assigned partitions: %s" -- 1.9.3 (Apple Git-50)