From 7b61aab7fde48667884036585cec77ad6c5ee14d Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 9 Jul 2015 13:21:50 -0700 Subject: [PATCH 1/2] KAFKA-1740; heartbeat should return illegal generation if rebalancing --- core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 476973b..6c2df4c 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -210,7 +210,7 @@ class ConsumerCoordinator(val brokerId: Int, responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) } else if (!group.has(consumerId)) { responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) - } else if (generationId != group.generationId) { + } else if (generationId != group.generationId || !group.is(Stable)) { responseCallback(Errors.ILLEGAL_GENERATION.code) } else { val consumer = group.get(consumerId) -- 2.3.2 (Apple Git-55) From 054424a279494e2fd9b1714c6d95679a1e8097ec Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 9 Jul 2015 14:39:39 -0700 Subject: [PATCH 2/2] KAFKA-1740; add unit test for heartbeat during rebalance --- .../ConsumerCoordinatorResponseTest.scala | 45 +++++++++++++++++++--- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala index 3cd726d..87a5330 100644 --- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala @@ -43,7 +43,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite { type HeartbeatCallback = Short => Unit val ConsumerMinSessionTimeout = 10 - val ConsumerMaxSessionTimeout = 30 + val ConsumerMaxSessionTimeout = 100 val DefaultSessionTimeout = 20 var consumerCoordinator: ConsumerCoordinator = null var offsetManager : OffsetManager = null @@ -232,6 +232,30 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite { } @Test + def testHeartbeatDuringRebalanceCausesIllegalGeneration() { + val groupId = "groupId" + val partitionAssignmentStrategy = "range" + + // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts) + val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy, + 100, isCoordinatorForGroup = true) + val assignedConsumerId = joinGroupResult._2 + val initialGenerationId = joinGroupResult._3 + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + // Then join with a new consumer to trigger a rebalance + EasyMock.reset(offsetManager) + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy, + DefaultSessionTimeout, isCoordinatorForGroup = true) + + // We should be in the middle of a rebalance, so the heartbeat should return illegal generation + EasyMock.reset(offsetManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true) + assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult) + } + + @Test def testGenerationIdIncrementsOnRebalance() { val groupId = "groupId" val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID @@ -267,16 +291,25 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite { (responseFuture, responseCallback) } - private def joinGroup(groupId: String, - consumerId: String, - partitionAssignmentStrategy: String, - sessionTimeout: Int, - isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = { + private def sendJoinGroup(groupId: String, + consumerId: String, + partitionAssignmentStrategy: String, + sessionTimeout: Int, + isCoordinatorForGroup: Boolean): Future[JoinGroupCallbackParams] = { val (responseFuture, responseCallback) = setupJoinGroupCallback EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) EasyMock.replay(offsetManager) consumerCoordinator.handleJoinGroup(groupId, consumerId, Set.empty, sessionTimeout, partitionAssignmentStrategy, responseCallback) + responseFuture + } + + private def joinGroup(groupId: String, + consumerId: String, + partitionAssignmentStrategy: String, + sessionTimeout: Int, + isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = { + val responseFuture = sendJoinGroup(groupId, consumerId, partitionAssignmentStrategy, sessionTimeout, isCoordinatorForGroup) Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) } -- 2.3.2 (Apple Git-55)