From 7a79bc1deca27220b1b2bb510729d66577e54e28 Mon Sep 17 00:00:00 2001 From: Onur Karaman Date: Wed, 3 Jun 2015 20:12:00 -0700 Subject: [PATCH] add response tests for ConsumerCoordinator --- .../kafka/coordinator/ConsumerCoordinator.scala | 4 +- .../kafka/coordinator/CoordinatorMetadata.scala | 4 +- .../ConsumerCoordinatorResponseTest.scala | 288 +++++++++++++++++++++ 3 files changed, 292 insertions(+), 4 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 51e89c8..a385adb 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -46,8 +46,8 @@ class ConsumerCoordinator(val config: KafkaConfig, private var coordinatorMetadata: CoordinatorMetadata = null /** - * NOTE: If a group lock and coordinatorLock are simultaneously needed, - * be sure to acquire the group lock before coordinatorLock to prevent deadlock + * NOTE: If a group lock and metadataLock are simultaneously needed, + * be sure to acquire the group lock before metadataLock to prevent deadlock */ /** diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index c39e6de..0cd5605 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -37,8 +37,8 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, maybePrepareRebalance: ConsumerGroupMetadata => Unit) { /** - * NOTE: If a group lock and coordinatorLock are simultaneously needed, - * be sure to acquire the group lock before coordinatorLock to prevent deadlock + * NOTE: If a group lock and metadataLock are simultaneously needed, + * be sure to acquire the group lock before metadataLock to prevent deadlock */ private val metadataLock = new ReentrantReadWriteLock() diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala new file mode 100644 index 0000000..c3705de --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + + +import java.util.concurrent.TimeUnit + +import junit.framework.Assert._ +import kafka.common.TopicAndPartition +import kafka.server.{KafkaConfig, OffsetManager} +import kafka.utils.TestUtils +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.JoinGroupRequest +import org.easymock.EasyMock +import org.junit.{Before, Test} +import org.scalatest.junit.JUnitSuite + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future, Promise} + +/** + * Test ConsumerCoordinator responses + */ +class ConsumerCoordinatorResponseTest extends JUnitSuite { + type JoinGroupCallbackParams = (Set[TopicAndPartition], String, Int, Short) + type JoinGroupCallback = (Set[TopicAndPartition], String, Int, Short) => Unit + type HeartbeatCallbackParams = Short + type HeartbeatCallback = Short => Unit + + val ConsumerMinSessionTimeout = 10 + val ConsumerMaxSessionTimeout = 30 + val DefaultSessionTimeout = 20 + var offsetManager: OffsetManager = null + var consumerCoordinator: ConsumerCoordinator = null + + @Before + def setUp() { + val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") + props.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString) + props.setProperty(KafkaConfig.ConsumerMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString) + offsetManager = EasyMock.createStrictMock(classOf[OffsetManager]) + consumerCoordinator = new ConsumerCoordinator(KafkaConfig.fromProps(props), null, offsetManager) + consumerCoordinator.startup() + } + + @Test + def testJoinGroupWrongCoordinator() { + val groupId = "groupId" + val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val partitionAssignmentStrategy = "range" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = false) + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, joinGroupErrorCode) + } + + @Test + def testJoinGroupUnknownPartitionAssignmentStrategy() { + val groupId = "groupId" + val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val partitionAssignmentStrategy = "foo" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code, joinGroupErrorCode) + } + + @Test + def testJoinGroupSessionTimeoutTooSmall() { + val groupId = "groupId" + val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val partitionAssignmentStrategy = "range" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMinSessionTimeout - 1, isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode) + } + + @Test + def testJoinGroupSessionTimeoutTooLarge() { + val groupId = "groupId" + val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val partitionAssignmentStrategy = "range" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMaxSessionTimeout + 1, isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode) + } + + @Test + def testJoinGroupUnknownConsumerNewGroup() { + val groupId = "groupId" + val consumerId = "consumerId" + val partitionAssignmentStrategy = "range" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, joinGroupErrorCode) + } + + @Test + def testValidJoinGroup() { + val groupId = "groupId" + val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val partitionAssignmentStrategy = "range" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.NONE.code, joinGroupErrorCode) + } + + @Test + def testJoinGroupInconsistentPartitionAssignmentStrategy() { + val groupId = "groupId" + val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val partitionAssignmentStrategy = "range" + val otherPartitionAssignmentStrategy = "roundrobin" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, otherPartitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val otherJoinGroupErrorCode = otherJoinGroupResult._4 + assertEquals(Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code, otherJoinGroupErrorCode) + } + + @Test + def testJoinGroupUnknownConsumerExistingGroup() { + val groupId = "groupId" + val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val otherConsumerId = "consumerId" + val partitionAssignmentStrategy = "range" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val otherJoinGroupErrorCode = otherJoinGroupResult._4 + assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, otherJoinGroupErrorCode) + } + + @Test + def testHeartbeatWrongCoordinator() { + val groupId = "groupId" + val consumerId = "consumerId" + + val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = false) + assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, heartbeatResult) + } + + @Test + def testHeartbeatUnknownGroup() { + val groupId = "groupId" + val consumerId = "consumerId" + + val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = true) + assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, heartbeatResult) + } + + @Test + def testHeartbeatUnknownConsumerExistingGroup() { + val groupId = "groupId" + val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val otherConsumerId = "consumerId" + val partitionAssignmentStrategy = "range" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val heartbeatResult = heartbeat(groupId, otherConsumerId, 1, isCoordinatorForGroup = true) + assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, heartbeatResult) + } + + @Test + def testHeartbeatIllegalGeneration() { + val groupId = "groupId" + val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val partitionAssignmentStrategy = "range" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val assignedConsumerId = joinGroupResult._2 + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, 2, isCoordinatorForGroup = true) + assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult) + } + + @Test + def testValidHeartbeat() { + val groupId = "groupId" + val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val partitionAssignmentStrategy = "range" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val assignedConsumerId = joinGroupResult._2 + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1, isCoordinatorForGroup = true) + assertEquals(Errors.NONE.code, heartbeatResult) + } + + @Test + def testGenerationIdIncrementsOnRebalance() { + val groupId = "groupId" + val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID + val partitionAssignmentStrategy = "range" + + val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val initialGenerationId = joinGroupResult._3 + val joinGroupErrorCode = joinGroupResult._4 + assertEquals(1, initialGenerationId) + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) + val nextGenerationId = otherJoinGroupResult._3 + val otherJoinGroupErrorCode = otherJoinGroupResult._4 + assertEquals(2, nextGenerationId) + assertEquals(Errors.NONE.code, otherJoinGroupErrorCode) + } + + private def setupJoinGroupCallback: (Future[JoinGroupCallbackParams], JoinGroupCallback) = { + val responsePromise = Promise[JoinGroupCallbackParams] + val responseFuture = responsePromise.future + val responseCallback: JoinGroupCallback = (partitions, consumerId, generationId, errorCode) => + responsePromise.success((partitions, consumerId, generationId, errorCode)) + (responseFuture, responseCallback) + } + + private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = { + val responsePromise = Promise[HeartbeatCallbackParams] + val responseFuture = responsePromise.future + val responseCallback: HeartbeatCallback = errorCode => responsePromise.success(errorCode) + (responseFuture, responseCallback) + } + + private def joinGroup(groupId: String, + consumerId: String, + partitionAssignmentStrategy: String, + sessionTimeout: Int, + isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = { + val (responseFuture, responseCallback) = setupJoinGroupCallback + EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) + EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) + EasyMock.replay(offsetManager) + consumerCoordinator.handleJoinGroup(groupId, consumerId, Set.empty, sessionTimeout, partitionAssignmentStrategy, responseCallback) + Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) + } + + private def heartbeat(groupId: String, + consumerId: String, + generationId: Int, + isCoordinatorForGroup: Boolean): HeartbeatCallbackParams = { + val (responseFuture, responseCallback) = setupHeartbeatCallback + EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) + EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) + EasyMock.replay(offsetManager) + consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback) + Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) + } +} -- 1.9.3 (Apple Git-50)