From 547e1e86008386bf267d181f302b3c3c004b484e Mon Sep 17 00:00:00 2001 From: Onur Karaman Date: Fri, 10 Apr 2015 10:18:06 -0700 Subject: [PATCH] add heartbeat to coordinator --- .../clients/consumer/internals/Coordinator.java | 1 + .../org/apache/kafka/common/protocol/Errors.java | 8 +- core/src/main/scala/kafka/api/RequestKeys.scala | 13 +- .../kafka/coordinator/ConsumerCoordinator.scala | 564 ++++++++++++--------- .../scala/kafka/coordinator/ConsumerRegistry.scala | 29 +- .../scala/kafka/coordinator/DelayedHeartbeat.scala | 36 +- .../scala/kafka/coordinator/DelayedJoinGroup.scala | 21 +- .../scala/kafka/coordinator/DelayedRebalance.scala | 42 +- .../scala/kafka/coordinator/GroupRegistry.scala | 55 +- .../scala/kafka/coordinator/HeartbeatBucket.scala | 36 -- .../kafka/coordinator/PartitionAssignor.scala | 52 ++ .../main/scala/kafka/network/RequestChannel.scala | 7 +- .../scala/kafka/server/DelayedOperationKey.scala | 6 - core/src/main/scala/kafka/server/KafkaApis.scala | 16 +- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- .../main/scala/kafka/server/OffsetManager.scala | 2 +- 16 files changed, 508 insertions(+), 382 deletions(-) delete mode 100644 core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala create mode 100644 core/src/main/scala/kafka/coordinator/PartitionAssignor.scala diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index e55ab11..125ab35 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -132,6 +132,7 @@ public final class Coordinator { // TODO: needs to handle disconnects and errors, should not just throw exceptions Errors.forCode(response.errorCode()).maybeThrow(); this.consumerId = response.consumerId(); + this.generation = response.generationId(); // set the flag to refresh last committed offsets this.subscriptions.needRefreshCommits(); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 36aa412..2285fc7 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -69,7 +69,13 @@ public enum Errors { INVALID_REQUIRED_ACKS(21, new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")), ILLEGAL_GENERATION(22, - new ApiException("Specified consumer generation id is not valid.")); + new ApiException("Specified consumer generation id is not valid.")), + INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY(23, + new ApiException("The request partition assignment strategy does not match that of the group.")), + UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY(24, + new ApiException("The request partition assignment strategy is unknown to the broker.")), + UNKNOWN_CONSUMER_ID(25, + new ApiException("The coordinator is not aware of this consumer.")); private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index ef7a86e..20acbcf 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -20,6 +20,8 @@ package kafka.api import kafka.common.KafkaException import java.nio.ByteBuffer +import org.apache.kafka.common.protocol.ApiKeys + object RequestKeys { val ProduceKey: Short = 0 val FetchKey: Short = 1 @@ -50,9 +52,14 @@ object RequestKeys { ) def nameForKey(key: Short): String = { - keyToNameAndDeserializerMap.get(key) match { - case Some(nameAndSerializer) => nameAndSerializer._1 - case None => throw new KafkaException("Wrong request type %d".format(key)) + // HACK HACK HACK + if (key == JoinGroupKey) ApiKeys.JOIN_GROUP.name + else if (key == HeartbeatKey) ApiKeys.HEARTBEAT.name + else { + keyToNameAndDeserializerMap.get(key) match { + case Some(nameAndSerializer) => nameAndSerializer._1 + case None => throw new KafkaException("Wrong request type %d".format(key)) + } } } diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 456b602..69f37f4 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -16,79 +16,77 @@ */ package kafka.coordinator -import org.apache.kafka.common.protocol.Errors +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.common.TopicAndPartition import kafka.server._ import kafka.utils._ - -import scala.collection.mutable.HashMap - -import org.I0Itec.zkclient.{IZkChildListener, ZkClient} +import org.I0Itec.zkclient.{IZkDataListener, ZkClient} +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.JoinGroupRequest +import collection.mutable +import kafka.utils.CoreUtils.{inReadLock,inWriteLock} /** - * Kafka coordinator handles consumer group and consumer offset management. + * ConsumerCoordinator handles consumer group and consumer offset management. * - * Each Kafka server instantiates a coordinator, which is responsible for a set of + * Each Kafka server instantiates a coordinator which is responsible for a set of * consumer groups; the consumer groups are assigned to coordinators based on their * group names. */ class ConsumerCoordinator(val config: KafkaConfig, - val zkClient: ZkClient) extends Logging { - - this.logIdent = "[Kafka Coordinator " + config.brokerId + "]: " - - /* zookeeper listener for topic-partition changes */ - private val topicPartitionChangeListeners = new HashMap[String, TopicPartitionChangeListener] - - /* the consumer group registry cache */ - // TODO: access to this map needs to be synchronized - private val consumerGroupRegistries = new HashMap[String, GroupRegistry] + val zkClient: ZkClient, + val offsetManager: OffsetManager) extends Logging { - /* the list of subscribed groups per topic */ - // TODO: access to this map needs to be synchronized - private val consumerGroupsPerTopic = new HashMap[String, List[String]] + this.logIdent = "[ConsumerCoordinator " + config.brokerId + "]: " - /* the delayed operation purgatory for heartbeat-based failure detection */ private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null - - /* the delayed operation purgatory for handling join-group requests */ private var joinGroupPurgatory: DelayedOperationPurgatory[DelayedJoinGroup] = null - - /* the delayed operation purgatory for preparing rebalance process */ private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null - /* latest consumer heartbeat bucket's end timestamp in milliseconds */ - private var latestHeartbeatBucketEndMs: Long = SystemTime.milliseconds + private val isLoadingGroupMetadata = new AtomicBoolean(false) /** - * Start-up logic executed at the same time when the server starts up. + * NOTE: If a groupRegistry lock and coordinatorLock are simultaneously needed, + * be sure to acquire the groupRegistry lock before coordinatorLock to prevent deadlock */ - def startup() { + private val coordinatorLock = new ReentrantReadWriteLock() - // Initialize consumer group registries and heartbeat bucket metadata - latestHeartbeatBucketEndMs = SystemTime.milliseconds + /** + * These should be guarded by coordinatorLock + */ + private val groupRegistries = new mutable.HashMap[String, GroupRegistry] + private val consumerGroupsPerTopic = new mutable.HashMap[String, Set[String]] + private val topicPartitionCounts = new mutable.HashMap[String, Int] + private val topicPartitionChangeListeners = new mutable.HashMap[String, TopicPartitionChangeListener] - // Initialize purgatories for delayed heartbeat, join-group and rebalance operations - heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](purgatoryName = "Heartbeat", brokerId = config.brokerId) - joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](purgatoryName = "JoinGroup", brokerId = config.brokerId) - rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](purgatoryName = "Rebalance", brokerId = config.brokerId) + /** + * Startup logic executed at the same time when the server starts up. + */ + def startup() { + info("Starting up.") + heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId) + joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup]("JoinGroup", config.brokerId) + rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId) + loadGroupMetadata() + } + private def loadGroupMetadata() { + info("Loading group metadata.") + isLoadingGroupMetadata.set(false) } /** - * Shut-down logic executed at the same time when server shuts down, + * Shutdown logic executed at the same time when server shuts down, * ordering of actions should be reversed from the start-up process * */ def shutdown() { - + info("Shutting down.") // De-register all Zookeeper listeners for topic-partition changes - for (topic <- topicPartitionChangeListeners.keys) { - deregisterTopicChangeListener(topic) - } + topicPartitionChangeListeners.keys.foreach(deregisterTopicPartitionChangeListener) topicPartitionChangeListeners.clear() // Shutdown purgatories for delayed heartbeat, join-group and rebalance operations @@ -97,251 +95,335 @@ class ConsumerCoordinator(val config: KafkaConfig, rebalancePurgatory.shutdown() // Clean up consumer group registries metadata - consumerGroupRegistries.clear() + groupRegistries.clear() consumerGroupsPerTopic.clear() + topicPartitionCounts.clear() } - /** - * Process a join-group request from a consumer to join as a new group member - */ - def consumerJoinGroup(groupId: String, - consumerId: String, - topics: List[String], - sessionTimeoutMs: Int, - partitionAssignmentStrategy: String, - responseCallback:(List[TopicAndPartition], Int, Short) => Unit ) { - - // if the group does not exist yet, create one - if (!consumerGroupRegistries.contains(groupId)) - createNewGroup(groupId, partitionAssignmentStrategy) - - val groupRegistry = consumerGroupRegistries(groupId) - - // if the consumer id is unknown or it does exists in - // the group yet, register this consumer to the group - if (consumerId.equals(JoinGroupRequest.UNKNOWN_CONSUMER_ID)) { - createNewConsumer(groupId, groupRegistry.generateNextConsumerId, topics, sessionTimeoutMs) - } else if (!groupRegistry.memberRegistries.contains(consumerId)) { - createNewConsumer(groupId, consumerId, topics, sessionTimeoutMs) + def joinGroup(groupId: String, + consumerId: String, + topics: List[String], + sessionTimeoutMs: Int, + partitionAssignmentStrategy: String, + responseCallback:(List[TopicAndPartition], String, Int, Short) => Unit) { + if (isLoadingGroupMetadata.get) { + responseCallback(Nil, consumerId, 0, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code) + } + else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) { + responseCallback(Nil, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code) + } + else { + val groupRegistry = inReadLock(coordinatorLock) { + groupRegistries.get(groupId).orNull + } + if (groupRegistry == null) { + if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) { + responseCallback(Nil, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) + } + else { + val groupRegistry = addGroup(groupId, partitionAssignmentStrategy) + doJoinGroup(groupRegistry, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) + } + } + else { + doJoinGroup(groupRegistry, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) + } } - - // add a delayed join-group operation to the purgatory - // TODO - - // if the current group is under rebalance process, - // check if the delayed rebalance operation can be finished - // TODO - - // TODO -------------------------------------------------------------- - // TODO: this is just a stub for new consumer testing, - // TODO: needs to be replaced with the logic above - // TODO -------------------------------------------------------------- - // just return all the partitions of the subscribed topics - val partitionIdsPerTopic = ZkUtils.getPartitionsForTopics(zkClient, topics) - val partitions = partitionIdsPerTopic.flatMap{ case (topic, partitionIds) => - partitionIds.map(partition => { - TopicAndPartition(topic, partition) - }) - }.toList - - responseCallback(partitions, 1 /* generation id */, Errors.NONE.code) - - info("Handled join-group from consumer " + consumerId + " to group " + groupId) } - /** - * Process a heartbeat request from a consumer - */ - def consumerHeartbeat(groupId: String, - consumerId: String, - generationId: Int, - responseCallback: Short => Unit) { - - // check that the group already exists - // TODO - - // check that the consumer has already registered for the group - // TODO - - // check if the consumer generation id is correct - // TODO - - // remove the consumer from its current heartbeat bucket, and add it back to the corresponding bucket - // TODO - - // create the heartbeat response, if partition rebalance is triggered set the corresponding error code - // TODO - - info("Handled heartbeat of consumer " + consumerId + " from group " + groupId) - - // TODO -------------------------------------------------------------- - // TODO: this is just a stub for new consumer testing, - // TODO: needs to be replaced with the logic above - // TODO -------------------------------------------------------------- - // check if the consumer already exist, if yes return OK, - // otherwise return illegal generation error - if (consumerGroupRegistries.contains(groupId) - && consumerGroupRegistries(groupId).memberRegistries.contains(consumerId)) - responseCallback(Errors.NONE.code) - else - responseCallback(Errors.ILLEGAL_GENERATION.code) + private def doJoinGroup(groupRegistry: GroupRegistry, + consumerId: String, + topics: List[String], + sessionTimeoutMs: Int, + partitionAssignmentStrategy: String, + responseCallback:(List[TopicAndPartition], String, Int, Short) => Unit) { + groupRegistry synchronized { + if (partitionAssignmentStrategy != groupRegistry.partitionAssignmentStrategy) { + responseCallback(Nil, consumerId, 0, Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code) + } + else if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID && !groupRegistry.hasConsumer(consumerId)) { + responseCallback(Nil, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) + } + else { + // if the consumer id is unknown, register this consumer to the group + val consumerRegistry = if (consumerId == JoinGroupRequest.UNKNOWN_CONSUMER_ID) { + val generatedConsumerId = groupRegistry.generateNextConsumerId + val registry = addConsumer(generatedConsumerId, topics, sessionTimeoutMs, groupRegistry, partitionAssignmentStrategy) + if (groupRegistry.canRebalance) + prepareRebalance(groupRegistry) + registry + } + else groupRegistry.consumerRegistries(consumerId) + + consumerRegistry.awaitingRebalance = true + + def delayedJoinGroupCallback() { + consumerRegistry.awaitingRebalance = false + + makeHeartbeat(groupRegistry, consumerRegistry) + responseCallback(consumerRegistry.assignedTopicPartitions, consumerRegistry.consumerId, groupRegistry.generationId, Errors.NONE.code) + } + + val delayedJoinGroup = new DelayedJoinGroup(sessionTimeoutMs, groupRegistry, consumerRegistry, delayedJoinGroupCallback) + val consumerKey = ConsumerKey(groupRegistry.groupId, consumerRegistry.consumerId) + val consumerGroupKey = ConsumerGroupKey(groupRegistry.groupId) + joinGroupPurgatory.tryCompleteElseWatch(delayedJoinGroup, Seq(consumerKey)) + + if (groupRegistry.state == PrepareRebalance) + rebalancePurgatory.checkAndComplete(consumerGroupKey) + } + } } - /** - * Create a new consumer - */ - private def createNewConsumer(groupId: String, - consumerId: String, - topics: List[String], - sessionTimeoutMs: Int) { - debug("Registering consumer " + consumerId + " for group " + groupId) - - // create the new consumer registry entry - val consumerRegistry = new ConsumerRegistry(groupId, consumerId, topics, sessionTimeoutMs) - - consumerGroupRegistries(groupId).memberRegistries.put(consumerId, consumerRegistry) - - // check if the partition assignment strategy is consistent with the group - // TODO - - // add the group to the subscribed topics - // TODO - - // schedule heartbeat tasks for the consumer - // TODO - - // add the member registry entry to the group - // TODO - - // start preparing group partition rebalance - // TODO - - info("Registered consumer " + consumerId + " for group " + groupId) + def heartbeat(groupId: String, + consumerId: String, + generationId: Int, + responseCallback: Short => Unit) { + if (isLoadingGroupMetadata.get) { + responseCallback(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code) + } + else if (!isCoordinatorForGroup(groupId)) { + responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code) + } + else { + val groupRegistry = inReadLock(coordinatorLock) { + groupRegistries.get(groupId).orNull + } + if (groupRegistry == null) { + responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) + } + else { + groupRegistry synchronized { + if (!groupRegistry.hasConsumer(consumerId)) { + responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) + } + else if (generationId != groupRegistry.generationId) { + responseCallback(Errors.ILLEGAL_GENERATION.code) + } + else { + val consumerRegistry = groupRegistry.consumerRegistries(consumerId) + makeHeartbeat(groupRegistry, consumerRegistry) + responseCallback(Errors.NONE.code) + } + } + } + } } - /** - * Create a new consumer group in the registry - */ - private def createNewGroup(groupId: String, partitionAssignmentStrategy: String) { - debug("Creating new group " + groupId) - - val groupRegistry = new GroupRegistry(groupId, partitionAssignmentStrategy) - - consumerGroupRegistries.put(groupId, groupRegistry) - - info("Created new group registry " + groupId) + private def makeHeartbeat(groupRegistry: GroupRegistry, consumerRegistry: ConsumerRegistry) { + consumerRegistry.latestHeartbeat = SystemTime.milliseconds + val consumerKey = ConsumerKey(consumerRegistry.groupId, consumerRegistry.consumerId) + heartbeatPurgatory.checkAndComplete(consumerKey) + val heartbeatDeadline = consumerRegistry.latestHeartbeat + consumerRegistry.sessionTimeoutMs + val delayedHeartbeat = new DelayedHeartbeat(groupRegistry, consumerRegistry, heartbeatDeadline, consumerRegistry.sessionTimeoutMs, onConsumerHeartbeatExpired) + heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(consumerKey)) } - /** - * Callback invoked when a consumer's heartbeat has expired - */ - private def onConsumerHeartbeatExpired(groupId: String, consumerId: String) { + private def addGroup(groupId: String, partitionAssignmentStrategy: String) = { + inWriteLock(coordinatorLock) { + groupRegistries.getOrElseUpdate(groupId, new GroupRegistry(groupId, partitionAssignmentStrategy)) + } + } - // if the consumer does not exist in group registry anymore, do nothing - // TODO + private def removeGroup(groupRegistry: GroupRegistry) { + groupRegistry.state = Dead + info("Group %s generation %s is dead".format(groupRegistry.groupId, groupRegistry.generationId)) + inWriteLock(coordinatorLock) { + groupRegistries.remove(groupRegistry.groupId) + } + } - // record heartbeat failure - // TODO + private def addConsumer(consumerId: String, + topics: List[String], + sessionTimeoutMs: Int, + groupRegistry: GroupRegistry, + partitionAssignmentStrategy: String) = { + val consumerRegistry = new ConsumerRegistry(consumerId, groupRegistry.groupId, topics, sessionTimeoutMs) + inWriteLock(coordinatorLock) { + topics.foreach { topic => + val groups = consumerGroupsPerTopic.getOrElse(topic, Set.empty) + consumerGroupsPerTopic.put(topic, groups + groupRegistry.groupId) + registerTopicPartitionChangeListener(topic) + } + } + groupRegistry.consumerRegistries.put(consumerId, consumerRegistry) + consumerRegistry + } - // if the maximum failures has been reached, mark consumer as failed - // TODO + private def removeConsumer(groupRegistry: GroupRegistry, consumerRegistry: ConsumerRegistry) { + trace("Consumer %s in group %s has failed".format(consumerRegistry.consumerId, groupRegistry.groupId)) + groupRegistry.consumerRegistries.remove(consumerRegistry.consumerId) + val remainingTopicsForGroup = groupRegistry.consumerRegistries.values.flatMap(_.topics).toSet + inWriteLock(coordinatorLock) { + consumerRegistry.topics.foreach { topic => + // nobody else in the group was interested in the topic, so dissociate the group from the topic + if (!remainingTopicsForGroup.contains(topic)) { + val remainingGroupsPerTopic = consumerGroupsPerTopic(topic) - groupRegistry.groupId + // no other group cares about the topic, so erase all state associated with the topic + if (remainingGroupsPerTopic.isEmpty) { + consumerGroupsPerTopic.remove(topic) + topicPartitionCounts.remove(topic) + deregisterTopicPartitionChangeListener(topic) + } + else { + consumerGroupsPerTopic.put(topic, remainingGroupsPerTopic) + } + } + } + } } - /** - * Callback invoked when a consumer is marked as failed - */ - private def onConsumerFailure(groupId: String, consumerId: String) { + private def prepareRebalance(groupRegistry: GroupRegistry) { + groupRegistry.state = PrepareRebalance + groupRegistry.generationId += 1 + info("Preparing to rebalance group %s generation %s".format(groupRegistry.groupId, groupRegistry.generationId)) - // remove the consumer from its group registry metadata - // TODO + val rebalanceTimeout = groupRegistry.consumerRegistries.values.foldLeft(0) {(timeout, consumerRegistry) => + timeout.max(consumerRegistry.sessionTimeoutMs) + } + val delayedRebalance = new DelayedRebalance(rebalanceTimeout, groupRegistry, rebalance, onRebalanceFailure) + val consumerGroupKey = ConsumerGroupKey(groupRegistry.groupId) + rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(consumerGroupKey)) + } - // cut the socket connection to the consumer - // TODO: howto ?? + private def rebalance(groupRegistry: GroupRegistry) { + groupRegistry synchronized { + groupRegistry.state = Rebalancing + info("Rebalancing group %s generation %s".format(groupRegistry.groupId, groupRegistry.generationId)) - // if the group has no consumer members any more, remove the group - // otherwise start preparing group partition rebalance - // TODO + val assignedPartitionsPerConsumer = reassignPartitions(groupRegistry) + trace("Rebalance for group %s generation %s has assigned partitions: %s" + .format(groupRegistry.groupId, groupRegistry.generationId, assignedPartitionsPerConsumer)) + groupRegistry.state = Stable + info("Stabilized group %s generation %s".format(groupRegistry.groupId, groupRegistry.generationId)) + groupRegistry.consumerRegistries.values.foreach { consumerRegistry => + val consumerKey = ConsumerKey(groupRegistry.groupId, consumerRegistry.consumerId) + joinGroupPurgatory.checkAndComplete(consumerKey) + } + } } - /** - * Prepare partition rebalance for the group - */ - private def prepareRebalance(groupId: String) { - - // try to change the group state to PrepareRebalance + private def onRebalanceFailure(groupRegistry: GroupRegistry, failedConsumers: List[ConsumerRegistry]) { + groupRegistry synchronized { + failedConsumers.foreach { failedConsumer => + removeConsumer(groupRegistry, failedConsumer) + // TODO: cut the socket connection to the consumer + } - // add a task to the delayed rebalance purgatory + if (groupRegistry.consumerRegistries.isEmpty) + removeGroup(groupRegistry) + } + } - // TODO + private def onConsumerHeartbeatExpired(groupRegistry: GroupRegistry, consumerRegistry: ConsumerRegistry) { + groupRegistry synchronized { + consumerRegistry.numExpiredHeartbeat += 1 + // if the maximum failures has been reached, mark consumer as failed + // TODO: figure out a value for maximum failures + if (consumerRegistry.numExpiredHeartbeat > 0) + onConsumerFailure(groupRegistry, consumerRegistry) + } } - /** - * Start partition rebalance for the group - */ - private def startRebalance(groupId: String) { + private def onConsumerFailure(groupRegistry: GroupRegistry, consumerRegistry: ConsumerRegistry) { + groupRegistry synchronized { + removeConsumer(groupRegistry, consumerRegistry) - // try to change the group state to UnderRebalance + // TODO: cut the socket connection to the consumer - // compute new assignment based on the strategy + if (groupRegistry.canRebalance) + prepareRebalance(groupRegistry) + } + } - // send back the join-group response + private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId)) - // TODO + private def reassignPartitions(groupRegistry: GroupRegistry) = { + val assignor = PartitionAssignor.createInstance(groupRegistry.partitionAssignmentStrategy) + val topicsPerConsumer = groupRegistry.consumerRegistries.mapValues { consumerRegistry => + consumerRegistry.topics + }.toMap + val partitionsPerTopic = inReadLock(coordinatorLock) { + topicPartitionCounts.toMap + } + val assignedPartitionsPerConsumer = assignor.assign(topicsPerConsumer, partitionsPerTopic) + assignedPartitionsPerConsumer.foreach { case (consumerId, partitions) => + groupRegistry.consumerRegistries(consumerId).assignedTopicPartitions = partitions + } + assignedPartitionsPerConsumer } - /** - * Fail current partition rebalance for the group - */ - - /** - * Register ZK listeners for topic-partition changes - */ - private def registerTopicChangeListener(topic: String) = { - if (!topicPartitionChangeListeners.contains(topic)) { - val listener = new TopicPartitionChangeListener(config) - topicPartitionChangeListeners.put(topic, listener) - ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getTopicPath(topic)) - zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), listener) + private def registerTopicPartitionChangeListener(topic: String) { + inWriteLock(coordinatorLock) { + if (!topicPartitionChangeListeners.contains(topic)) { + val listener = new TopicPartitionChangeListener(config) + topicPartitionChangeListeners.put(topic, listener) + zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), listener) + } } } - /** - * De-register ZK listeners for topic-partition changes - */ - private def deregisterTopicChangeListener(topic: String) = { - val listener = topicPartitionChangeListeners.get(topic).get - zkClient.unsubscribeChildChanges(ZkUtils.getTopicPath(topic), listener) - topicPartitionChangeListeners.remove(topic) + private def deregisterTopicPartitionChangeListener(topic: String) { + inWriteLock(coordinatorLock) { + val listener = topicPartitionChangeListeners(topic) + zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener) + topicPartitionChangeListeners.remove(topic) + } } /** * Zookeeper listener that catch topic-partition changes */ - class TopicPartitionChangeListener(val config: KafkaConfig) extends IZkChildListener with Logging { - - this.logIdent = "[TopicChangeListener on coordinator " + config.brokerId + "]: " - - /** - * Try to trigger a rebalance for each group subscribed in the changed topic - * - * @throws Exception - * On any error. - */ - def handleChildChange(parentPath: String , curChilds: java.util.List[String]) { - debug("Fired for path %s with children %s".format(parentPath, curChilds)) + class TopicPartitionChangeListener(val config: KafkaConfig) extends IZkDataListener with Logging { + this.logIdent = "[TopicPartitionChangeListener on Coordinator " + config.brokerId + "]: " + + override def handleDataChange(dataPath: String, data: Object) { + info("Handling data change for path: %s data: %s".format(dataPath, data)) + val topic = topicFromDataPath(dataPath) + val topicData = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq(topic)) + val numPartitions = topicData(topic).size + + val groupsToRebalance = inWriteLock(coordinatorLock) { + topicPartitionCounts.put(topic, numPartitions) + consumerGroupsPerTopic(topic).map(groupId => groupRegistries(groupId)) + } - // get the topic - val topic = parentPath.split("/").last + groupsToRebalance.foreach { groupRegistry => + groupRegistry synchronized { + /** + * This condition exists because a consumer can leave between the coordinatorLock above and the prepareRebalance below. + * We can't wrap this loop in the coordinatorLock because we must preserve the ordering of nested locks to prevent deadlock. + */ + if (groupRegistry.canRebalance) + prepareRebalance(groupRegistry) + } + } + } - // get groups that subscribed to this topic - val groups = consumerGroupsPerTopic.get(topic).get + override def handleDataDeleted(dataPath: String) { + info("Handling data delete for path: %s".format(dataPath)) + val topic = topicFromDataPath(dataPath) + val groupsToRebalance = inWriteLock(coordinatorLock) { + topicPartitionCounts.put(topic, 0) + consumerGroupsPerTopic(topic).map(groupId => groupRegistries(groupId)) + } - for (groupId <- groups) { - prepareRebalance(groupId) + groupsToRebalance.foreach { groupRegistry => + groupRegistry synchronized { + /** + * This condition exists because a consumer can leave between the coordinatorLock above and the prepareRebalance below. + * We can't wrap this loop in the coordinatorLock because we must preserve the ordering of nested locks to prevent deadlock. + */ + if (groupRegistry.canRebalance) + prepareRebalance(groupRegistry) + } } } - } -} - + private def topicFromDataPath(dataPath: String) = { + val nodes = dataPath.split("/") + nodes.last + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala index 2f57970..4a9d070 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala @@ -17,36 +17,29 @@ package kafka.coordinator -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} -import java.util.HashMap +import kafka.common.TopicAndPartition +import kafka.utils.nonthreadsafe /** - * Consumer registry metadata contains the following metadata: + * ConsumerRegistry contains the following metadata: * * Heartbeat metadata: * 1. negotiated heartbeat session timeout. * 2. recorded number of timed-out heartbeats. - * 3. associated heartbeat bucket in the purgatory. + * 3. timestamp of the latest heartbeat * * Subscription metadata: * 1. subscribed topic list * 2. assigned partitions for the subscribed topics. */ -class ConsumerRegistry(val groupId: String, - val consumerId: String, +@nonthreadsafe +class ConsumerRegistry(val consumerId: String, + val groupId: String, val topics: List[String], val sessionTimeoutMs: Int) { - /* number of expired heartbeat recorded */ - val numExpiredHeartbeat = new AtomicInteger(0) - - /* flag indicating if join group request is received */ - val joinGroupReceived = new AtomicBoolean(false) - - /* assigned partitions per subscribed topic */ - val assignedPartitions = new HashMap[String, List[Int]] - - /* associated heartbeat bucket */ - var currentHeartbeatBucket = null - + var numExpiredHeartbeat = 0 + var awaitingRebalance = false + var assignedTopicPartitions = List[TopicAndPartition]() + var latestHeartbeat: Long = -1 } diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala index 6a6bc7b..2b1b07c 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala @@ -20,29 +20,33 @@ package kafka.coordinator import kafka.server.DelayedOperation /** - * Delayed heartbeat operations that are added to the purgatory for session-timeout checking - * - * These operations will always be expired. Once it has expired, all its - * currently contained consumers are marked as heartbeat timed out. + * Delayed heartbeat operations that are added to the purgatory for session-timeout checking. + * Heartbeats are paused during rebalance. */ -class DelayedHeartbeat(sessionTimeout: Long, - bucket: HeartbeatBucket, - expireCallback: (String, String) => Unit) +class DelayedHeartbeat(groupRegistry: GroupRegistry, + consumerRegistry: ConsumerRegistry, + heartbeatDeadline: Long, + sessionTimeout: Long, + expireCallback: (GroupRegistry, ConsumerRegistry) => Unit) extends DelayedOperation(sessionTimeout) { - /* this function should never be called */ override def tryComplete(): Boolean = { - - throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket") + groupRegistry synchronized { + if (keepAlive) + forceComplete() + else false + } } override def onExpiration() { - // TODO + groupRegistry synchronized { + if (!keepAlive) + expireCallback(groupRegistry, consumerRegistry) + } } - /* mark all consumers within the heartbeat as heartbeat timed out */ - override def onComplete() { - for (registry <- bucket.consumerRegistryList) - expireCallback(registry.groupId, registry.consumerId) - } + override def onComplete() {} + + private def keepAlive(): Boolean = + consumerRegistry.awaitingRebalance || consumerRegistry.latestHeartbeat > heartbeatDeadline - sessionTimeout } diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala index df60cbc..bdd7958 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala @@ -27,22 +27,27 @@ import kafka.server.DelayedOperation * calculated partition assignment. */ class DelayedJoinGroup(sessionTimeout: Long, + groupRegistry: GroupRegistry, consumerRegistry: ConsumerRegistry, responseCallback: () => Unit) extends DelayedOperation(sessionTimeout) { - /* always successfully complete the operation once called */ override def tryComplete(): Boolean = { - forceComplete() + groupRegistry synchronized { + if (groupRegistry.state == Stable) { + forceComplete() + } + else { + false + } + } } - override def onExpiration() { - // TODO - } + override def onExpiration() {} /* always assume the partition is already assigned as this delayed operation should never time-out */ override def onComplete() { - - // TODO - responseCallback + groupRegistry synchronized { + responseCallback() + } } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala index 8defa2e..19a233a 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala @@ -18,7 +18,6 @@ package kafka.coordinator import kafka.server.DelayedOperation -import java.util.concurrent.atomic.AtomicBoolean /** @@ -33,34 +32,35 @@ import java.util.concurrent.atomic.AtomicBoolean */ class DelayedRebalance(sessionTimeout: Long, groupRegistry: GroupRegistry, - rebalanceCallback: String => Unit, - failureCallback: (String, String) => Unit) + rebalanceCallback: GroupRegistry => Unit, + failureCallback: (GroupRegistry, List[ConsumerRegistry]) => Unit) extends DelayedOperation(sessionTimeout) { - val allConsumersJoinedGroup = new AtomicBoolean(false) - /* check if all known consumers have requested to re-join group */ override def tryComplete(): Boolean = { - allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft - (true) ((agg, cur) => agg && cur.joinGroupReceived.get())) - - if (allConsumersJoinedGroup.get()) - forceComplete() - else - false + groupRegistry synchronized { + val allConsumersJoinedGroup = groupRegistry.consumerRegistries.values.forall(_.awaitingRebalance) + if (allConsumersJoinedGroup) { + forceComplete() + } + else { + false + } + } } - override def onExpiration() { - // TODO - } + override def onExpiration() {} /* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */ override def onComplete() { - groupRegistry.memberRegistries.values.foreach(consumerRegistry => - if (!consumerRegistry.joinGroupReceived.get()) - failureCallback(groupRegistry.groupId, consumerRegistry.consumerId) - ) - - rebalanceCallback(groupRegistry.groupId) + groupRegistry synchronized { + val failedConsumers = groupRegistry.consumerRegistries.values.filter { consumerRegistry => + !consumerRegistry.awaitingRebalance + }.toList + if (groupRegistry.consumerRegistries.isEmpty || !failedConsumers.isEmpty) + failureCallback(groupRegistry, failedConsumers) + if (groupRegistry.state != Dead) + rebalanceCallback(groupRegistry) + } } } diff --git a/core/src/main/scala/kafka/coordinator/GroupRegistry.scala b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala index 94ef582..a1f35e1 100644 --- a/core/src/main/scala/kafka/coordinator/GroupRegistry.scala +++ b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala @@ -17,63 +17,74 @@ package kafka.coordinator -import scala.collection.mutable -import java.util.concurrent.atomic.AtomicInteger +import java.util.UUID -sealed trait GroupStates { def state: Byte } +import kafka.utils.nonthreadsafe + +import collection.mutable + +sealed trait GroupState { def state: Byte } /** - * Consumer group is preparing start rebalance + * Consumer group is preparing to rebalance * * action: respond consumer heartbeat with error code, - * transition: all known consumers has re-joined group => UnderRebalance + * transition: all known consumers has re-joined group => Rebalancing + * all known consumers have left the group => Dead */ -case object PrepareRebalance extends GroupStates { val state: Byte = 1 } +case object PrepareRebalance extends GroupState { val state: Byte = 1 } /** - * Consumer group is under rebalance + * Consumer group is rebalancing * * action: send the join-group response with new assignment - * transition: all consumers has heartbeat with the new generation id => Fetching + * transition: all consumers has heartbeat with the new generation id => Stable * new consumer join-group received => PrepareRebalance */ -case object UnderRebalance extends GroupStates { val state: Byte = 2 } +case object Rebalancing extends GroupState { val state: Byte = 2 } /** - * Consumer group is fetching data + * Consumer group is stable * * action: respond consumer heartbeat normally * transition: consumer failure detected via heartbeat => PrepareRebalance * consumer join-group received => PrepareRebalance * zookeeper watcher fired => PrepareRebalance */ -case object Fetching extends GroupStates { val state: Byte = 3 } +case object Stable extends GroupState { val state: Byte = 3 } + +/** + * Consumer group has no more members + * action: none + * transition: none + */ +case object Dead extends GroupState { val state: Byte = 4 } -case class GroupState() { - @volatile var currentState: Byte = PrepareRebalance.state -} -/* Group registry contains the following metadata of a registered group in the coordinator: +/** + * GroupRegistry contains the following metadata: * * Membership metadata: - * 1. List of consumers registered in this group + * 1. Consumers registered in this group * 2. Partition assignment strategy for this group * * State metadata: * 1. Current group state * 2. Current group generation id */ +@nonthreadsafe class GroupRegistry(val groupId: String, val partitionAssignmentStrategy: String) { - val memberRegistries = new mutable.HashMap[String, ConsumerRegistry]() - - val state: GroupState = new GroupState() + val consumerRegistries = new mutable.HashMap[String, ConsumerRegistry] + var state: GroupState = Stable + var generationId = 0 - val generationId = new AtomicInteger(1) + // TODO: decide if ids should be predictable or random + def generateNextConsumerId = UUID.randomUUID().toString - val nextConsumerId = new AtomicInteger(1) + def hasConsumer(consumerId: String) = consumerRegistries.contains(consumerId) - def generateNextConsumerId = groupId + "-" + nextConsumerId.getAndIncrement + def canRebalance = state != Dead && state != PrepareRebalance } diff --git a/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala deleted file mode 100644 index 821e26e..0000000 --- a/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import scala.collection.mutable - -/** - * A bucket of consumers that are scheduled for heartbeat expiration. - * - * The motivation behind this is to avoid expensive fine-grained per-consumer - * heartbeat expiration but use coarsen-grained methods that group consumers - * with similar deadline together. This will result in some consumers not - * being expired for heartbeats in time but is tolerable. - */ -class HeartbeatBucket(val startMs: Long, endMs: Long) { - - /* The list of consumers that are contained in this bucket */ - val consumerRegistryList = new mutable.HashSet[ConsumerRegistry] - - // TODO -} diff --git a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala new file mode 100644 index 0000000..ec1c4c1 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.common.TopicAndPartition + +trait PartitionAssignor { + /** + * Assigns partitions to consumers in a group. + * @return A mapping from consumer to assigned partitions. + */ + def assign(topicsPerConsumer: Map[String, List[String]], + partitionsPerTopic: Map[String, Int]): Map[String, List[TopicAndPartition]] +} + +object PartitionAssignor { + val strategies = Set("range", "roundrobin") + + def createInstance(strategy: String) = strategy match { + case "roundrobin" => new RoundRobinAssignor() + case _ => new RangeAssignor() + } +} + +class RoundRobinAssignor extends PartitionAssignor { + override def assign(topicsPerConsumer: Map[String, List[String]], + partitionsPerTopic: Map[String, Int]): Map[String, List[TopicAndPartition]] = { + Map.empty + } +} + +class RangeAssignor extends PartitionAssignor { + override def assign(topicsPerConsumer: Map[String, List[String]], + partitionsPerTopic: Map[String, Int]): Map[String, List[TopicAndPartition]] = { + Map.empty + } +} diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 1d9c57b..ed01eca 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,7 +26,7 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ -import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader} import org.apache.log4j.Logger @@ -207,8 +207,11 @@ object RequestMetrics { val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics] val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Consumer" val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Follower" + // HACK HACK HACK + val joinGroupMetricName = RequestKeys.nameForKey(RequestKeys.JoinGroupKey) + val heartbeatMetricName = RequestKeys.nameForKey(RequestKeys.HeartbeatKey) (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1) - ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name))) + ++ List(consumerFetchMetricName, followFetchMetricName, joinGroupMetricName, heartbeatMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name))) } class RequestMetrics(name: String) extends KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala index b673e43..c122bde 100644 --- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala +++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala @@ -38,12 +38,6 @@ case class TopicPartitionOperationKey(topic: String, partition: Int) extends Del override def keyLabel = "%s-%d".format(topic, partition) } -/* used by bucketized delayed-heartbeat operations */ -case class TTimeMsKey(time: Long) extends DelayedOperationKey { - - override def keyLabel = "%d".format(time) -} - /* used by delayed-join-group operations */ case class ConsumerKey(groupId: String, consumerId: String) extends DelayedOperationKey { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b4004aa..f30b559 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -499,14 +499,16 @@ class KafkaApis(val requestChannel: RequestChannel, val respHeader = new ResponseHeader(request.header.correlationId) // the callback for sending a join-group response - def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { + def sendResponseCallback(partitions: List[TopicAndPartition], consumerId: String, generationId: Int, errorCode: Short) { val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer - val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.consumerId, partitionList) + val responseBody = new JoinGroupResponse(errorCode, generationId, consumerId, partitionList) + trace("Sending join group response %s for correlation id %d to client %s." + .format(responseBody, request.header.correlationId, request.header.clientId)) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, responseBody))) } // let the coordinator to handle join-group - coordinator.consumerJoinGroup( + coordinator.joinGroup( joinGroupRequest.groupId(), joinGroupRequest.consumerId(), joinGroupRequest.topics().toList, @@ -521,12 +523,14 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a heartbeat response def sendResponseCallback(errorCode: Short) { - val response = new HeartbeatResponse(errorCode) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, response))) + val responseBody = new HeartbeatResponse(errorCode) + trace("Sending join group response %s for correlation id %d to client %s." + .format(responseBody, request.header.correlationId, request.header.clientId)) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, responseBody))) } // let the coordinator to handle heartbeat - coordinator.consumerHeartbeat( + coordinator.heartbeat( heartbeatRequest.groupId(), heartbeatRequest.consumerId(), heartbeatRequest.groupGenerationId(), diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c63f4ba..1d44107 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -141,7 +141,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg kafkaController.startup() /* start kafka coordinator */ - consumerCoordinator = new ConsumerCoordinator(config, zkClient) + consumerCoordinator = new ConsumerCoordinator(config, zkClient, offsetManager) consumerCoordinator.startup() /* start processing requests */ diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 420e2c3..d7e80c9 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -427,7 +427,7 @@ class OffsetManager(val config: OffsetManagerConfig, hw } - private def leaderIsLocal(partition: Int) = { getHighWatermark(partition) != -1L } + def leaderIsLocal(partition: Int) = { getHighWatermark(partition) != -1L } /** * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to -- 1.7.12.4