From 70dd7e0e70ddb7a89da444ba124926d592c52e6f Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 30 Jan 2015 10:39:40 -0800 Subject: [PATCH 1/4] dummy --- .../main/scala/kafka/server/DelayedOperationKey.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala index fb7e9ed..93e1b87 100644 --- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala +++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala @@ -30,9 +30,28 @@ object DelayedOperationKey { val globalLabel = "All" } +/* used by delayed-produce and delayed-fetch operations */ case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey { def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) override def keyLabel = "%s-%d".format(topic, partition) } + +/* used by bucketized delayed-heartbeat operations */ +case class TimeMsKey(time: Long) extends DelayedOperationKey { + + override def keyLabel = "%ld".format(time) +} + +/* used by delayed-join-group operations */ +case class ConsumerKey(groupId: String, consumerId: String) extends DelayedOperationKey { + + override def keyLabel = "%s-%d".format(groupId, consumerId) +} + +/* used by delayed-rebalance operations */ +case class ConsumerGroupKey(groupId: String) extends DelayedOperationKey { + + override def keyLabel = "%s".format(groupId) +} -- 1.7.12.4 From a0baf199b7b830c1d6397163a36e90a2b5e4a32d Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 30 Jan 2015 12:01:45 -0800 Subject: [PATCH 2/4] dummy 2 --- core/src/main/scala/kafka/server/KafkaApis.scala | 67 ++++++++++++++-------- core/src/main/scala/kafka/server/KafkaServer.scala | 46 ++++++++++----- 2 files changed, 74 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f2b027b..8426bbd 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -20,21 +20,18 @@ package kafka.server import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.JoinGroupResponse import org.apache.kafka.common.requests.HeartbeatResponse -import org.apache.kafka.common.requests.ResponseHeader -import org.apache.kafka.common.protocol.types.Struct +import org.apache.kafka.common.TopicPartition import kafka.api._ +import kafka.admin.AdminUtils import kafka.common._ +import kafka.controller.KafkaController +import kafka.coordinator.ConsumerCoordinator import kafka.log._ import kafka.network._ -import kafka.admin.AdminUtils import kafka.network.RequestChannel.Response -import kafka.controller.KafkaController import kafka.utils.{SystemTime, Logging} -import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic._ import scala.collection._ import org.I0Itec.zkclient.ZkClient @@ -45,14 +42,14 @@ import org.I0Itec.zkclient.ZkClient class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, val offsetManager: OffsetManager, + val coordinator: ConsumerCoordinator, + val controller: KafkaController, val zkClient: ZkClient, val brokerId: Int, - val config: KafkaConfig, - val controller: KafkaController) extends Logging { + val config: KafkaConfig) extends Logging { this.logIdent = "[KafkaApi-%d] ".format(brokerId) val metadataCache = new MetadataCache - private var consumerGroupGenerationId = 0 /** * Top-level method that handles all requests and multiplexes to the right api @@ -137,7 +134,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetCommitRequest(request: RequestChannel.Request) { val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] - // the callback for sending the response + // the callback for sending offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) { commitStatus.foreach { case (topicAndPartition, errorCode) => // we only print warnings for known errors here; only replica manager could see an unknown @@ -169,7 +166,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] - // the callback for sending the response + // the callback for sending produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { var errorInResponse = false responseStatus.foreach { case (topicAndPartition, status) => @@ -224,7 +221,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - // the callback for sending the response + // the callback for sending fetch response def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { responsePartitionData.foreach { case (topicAndPartition, data) => // we only print warnings for known errors here; if it is unknown, it will cause @@ -456,20 +453,42 @@ class KafkaApis(val requestChannel: RequestChannel, def handleJoinGroupRequest(request: RequestChannel.Request) { import JavaConversions._ - val joinGroupReq = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader] - val topics = joinGroupReq.body.topics().toSet - val partitions = this.replicaManager.logManager.allLogs.filter(log => topics.contains(log.topicAndPartition.topic)) - val partitionList = partitions.map(_.topicAndPartition).map(tp => new org.apache.kafka.common.TopicPartition(tp.topic, tp.partition)).toBuffer - this.consumerGroupGenerationId += 1 - val response = new JoinGroupResponse(ErrorMapping.NoError, this.consumerGroupGenerationId, joinGroupReq.body.consumerId, partitionList) - val send = new BoundedByteBufferSend(new JoinGroupResponseAndHeader(joinGroupReq.correlationId, response)) - requestChannel.sendResponse(new RequestChannel.Response(request, send)) + + val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader] + + // the callback for sending join-group reponse + def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { + val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer + val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.body.consumerId, partitionList) + val response = new JoinGroupResponseAndHeader(joinGroupRequest.correlationId, responseBody) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + + // let the coordinator to handle join-group + coordinator.handleJoinGroup( + joinGroupRequest.body.groupId(), + joinGroupRequest.body.consumerId(), + joinGroupRequest.body.topics(), + joinGroupRequest.body.sessionTimeout(), + joinGroupRequest.body.strategy(), + sendResponseCallback) } def handleHeartbeatRequest(request: RequestChannel.Request) { - val hbReq = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader] - val send = new BoundedByteBufferSend(new HeartbeatResponseAndHeader(hbReq.correlationId, new HeartbeatResponse(Errors.NONE.code))) - requestChannel.sendResponse(new RequestChannel.Response(request, send)) + val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader] + + // the callback for sending heartbeat response + def sendResponseCallback(errorCode: Short) { + val response = new HeartbeatResponseAndHeader(heartbeatRequest.correlationId, new HeartbeatResponse(errorCode)) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + + // let the coordinator to handle heartbeat + coordinator.handleHeartbeat( + heartbeatRequest.body.groupId(), + heartbeatRequest.body.consumerId(), + heartbeatRequest.body.groupGenerationId(), + sendResponseCallback) } def close() { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 89200da..046f5d3 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -34,30 +34,41 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker import kafka.network.{Receive, BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge +import kafka.coordinator.ConsumerCoordinator /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. */ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { - private var isShuttingDown = new AtomicBoolean(false) - private var shutdownLatch = new CountDownLatch(1) - private var startupComplete = new AtomicBoolean(false) - private var brokerId: Int = -1 + private val startupComplete = new AtomicBoolean(false) + private val isShuttingDown = new AtomicBoolean(false) + private val shutdownLatch = new CountDownLatch(1) val brokerState: BrokerState = new BrokerState - val correlationId: AtomicInteger = new AtomicInteger(0) + + var apis: KafkaApis = null var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null + var logManager: LogManager = null + var offsetManager: OffsetManager = null - var kafkaHealthcheck: KafkaHealthcheck = null - var topicConfigManager: TopicConfigManager = null + var replicaManager: ReplicaManager = null - var apis: KafkaApis = null + + var topicConfigManager: TopicConfigManager = null + + var consumerCoordinator: ConsumerCoordinator = null + var kafkaController: KafkaController = null + val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) + + var kafkaHealthcheck: KafkaHealthcheck = null + var zkClient: ZkClient = null + val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap @@ -76,8 +87,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg try { info("starting") brokerState.newState(Starting) - isShuttingDown = new AtomicBoolean(false) - shutdownLatch = new CountDownLatch(1) /* start scheduler */ kafkaScheduler.startup() @@ -106,24 +115,29 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.maxConnectionsPerIpOverrides) socketServer.startup() + /* start replica manager */ replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) + replicaManager.startup() /* start offset manager */ offsetManager = createOffsetManager() + /* start kafka controller */ kafkaController = new KafkaController(config, zkClient, brokerState) + kafkaController.startup() + + /* start kafka coordinator */ + consumerCoordinator = new ConsumerCoordinator(config, zkClient) + consumerCoordinator.startup() /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) + apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() - replicaManager.startup() - - kafkaController.startup() - + /* start topic config manager */ topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() @@ -290,6 +304,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(replicaManager.shutdown()) if(logManager != null) Utils.swallow(logManager.shutdown()) + if(consumerCoordinator != null) + Utils.swallow(consumerCoordinator.shutdown()) if(kafkaController != null) Utils.swallow(kafkaController.shutdown()) if(zkClient != null) -- 1.7.12.4 From 5b6fa361d9e7b2146fb9b91ba0580b8d777281b8 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sat, 31 Jan 2015 17:35:32 -0800 Subject: [PATCH 3/4] minor --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8426bbd..0b78838 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -468,7 +468,7 @@ class KafkaApis(val requestChannel: RequestChannel, coordinator.handleJoinGroup( joinGroupRequest.body.groupId(), joinGroupRequest.body.consumerId(), - joinGroupRequest.body.topics(), + joinGroupRequest.body.topics().toList, joinGroupRequest.body.sessionTimeout(), joinGroupRequest.body.strategy(), sendResponseCallback) -- 1.7.12.4 From e6aadeb724091c585f2476adcd10ba66c9fcb294 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sat, 31 Jan 2015 17:37:41 -0800 Subject: [PATCH 4/4] three new file --- .../kafka/coordinator/ConsumerCoordinator.scala | 395 +++++++++++++++++++++ .../scala/kafka/coordinator/GroupRegistry.scala | 117 ++++++ .../scala/kafka/coordinator/HeartbeatBucket.scala | 36 ++ 3 files changed, 548 insertions(+) create mode 100644 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala create mode 100644 core/src/main/scala/kafka/coordinator/GroupRegistry.scala create mode 100644 core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala new file mode 100644 index 0000000..cdf1ae4 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator + +import org.apache.kafka.common.protocol.Errors + +import kafka.server._ +import kafka.utils._ + +import java.util.concurrent.atomic.AtomicBoolean +import java.util.HashMap +import java.util.Set + +import org.I0Itec.zkclient.{IZkChildListener, ZkClient} +import kafka.common.TopicAndPartition + + +/** + * Kafka coordinator handles consumer group and consumer offset management. + * + * Each Kafka server instantiates a coordinator, which is responsible for a set of + * consumer groups; the consumer groups are assigned to coordinators based on their + * group names. + */ +class ConsumerCoordinator(val config: KafkaConfig, + val zkClient: ZkClient) extends Logging { + + this.logIdent = "[Kafka Coordinator " + config.brokerId + "]: " + + /* the consumer registry synchronization lock */ + private val consumerRegistryLock = new Object + + /* zookeeper listener for topic-partition changes */ + private val topicPartitionChangeListeners = new HashMap[String, TopicPartitionChangeListener] + + /* the consumer group registry cache */ + private val consumerGroupRegistries = new HashMap[String, GroupRegistry] + + /* the list of subscribed groups per topic */ + private val consumerGroupsPerTopic = new HashMap[String, List[String]] + + /* the delayed operation purgatory for heartbeat-based failure detection */ + private val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](config.brokerId) + + /* the delayed operation purgatory for preparing rebalance process */ + private val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](config.brokerId) + + /* the delayed operation purgatory for handling join-group requests */ + private val joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](config.brokerId) + + /* latest consumer heartbeat bucket's end timestamp in milliseconds */ + private var latestHeartbeatBucketEndMs = SystemTime.milliseconds + + /** + * Upon starting up, do the following: + * + * 1. Initialize consumer heartbeat scheduler + * 2. Initialize group partition rebalance processor + * 3. Register Zookeeper listener for topic-partition changes + */ + def startup() { + + // TODO + } + + /** + * Upon shutting down, do the following (ordering of actions should be reverse of the start-up process): + * + * 1. De-register Zookeeper listeners for topic-partition changes + * 2. If group partition rebalance processor exists, shut it down + * 3. If consumer heartbeat scheduler exists, shut it down + */ + def shutdown() { + + // TODO + } + + /** + * Handle JoinGroupRequest + */ + def handleJoinGroup(groupId: String, + consumerId: String, + topics: List[String], + sessionTimeoutMs: Int, + partitionAssignmentStrategy: String, + responseCallback:(List[TopicAndPartition], Int, Short) => Unit ) { + + // if the group does not exist yet, create one + if (!consumerGroupRegistries.containsKey(groupId)) + handleNewGroup(groupId, partitionAssignmentStrategy) + + // if the consumer id is unknown or it does exists in + // the group yet, register this consumer to the group + // TODO + + // if the current group is under rebalance process, + // indicate the rebalance processor of the received join request + // TODO + + // just return all the partitions of the subscribed topics + val partitionIdsPerTopic = ZkUtils.getPartitionsForTopics(zkClient, topics) + val partitions = partitionIdsPerTopic.flatMap{ case (topic, partitionIds) => + partitionIds.map(partition => { + TopicAndPartition(topic, partition) + }) + }.toList + + responseCallback(partitions, 1 /* generation id */, Errors.NONE.code) + + info("Handled join-group from consumer " + consumerId + " to group " + groupId) + } + + /** + * Handle HeartbeatRequest + */ + def handleHeartbeat(groupId: String, + consumerId: String, + generationId: Int, + responseCallback: Short => Unit) { + + // check that the group already exists + // TODO + + // check that the consumer has already registered for the group + // TODO + + // check if the consumer generation id is correct + // TODO + + // remove the consumer from its current heartbeat bucket, and add it back to the corresponding bucket + // TODO + + // create the heartbeat response, if partition rebalance is triggered set the corresponding error code + // TODO + + info("Handled heartbeat of consumer " + consumerId + " from group " + groupId) + + responseCallback(Errors.NONE.code) + } + + /** + * Handle new consumer registration to the group + */ + private def handleNewConsumer(consumerId: String, + topics: List[String], + sessionTimeoutMs: Int, + groupRegistry: GroupRegistry) { + debug("Registering consumer " + consumerId + " for group " + groupRegistry.groupId) + + // create the new consumer registry entry + // TODO: specify consumerId as unknown and update at the end of the prepare-rebalance phase + + // Check if the partition assignment strategy is consistent with the group + // TODO + + // Add the group to the subscribed topics + // TODO + + // Schedule heartbeat tasks for the consumer + // TODO + + // Add the member registry entry to the group + // TODO + + // Trigger the partition rebalance process + // TODO + + info("Registered consumer " + consumerId + " for group " + groupRegistry.groupId) + } + + /** + * Handle new group creation + */ + private def handleNewGroup(groupId: String, partitionAssignmentStrategy: String) { + debug("Creating new group " + groupId) + + consumerRegistryLock synchronized { + consumerGroupRegistries.put(groupId, new GroupRegistry(groupId, partitionAssignmentStrategy)) + } + + info("Created new group registry " + groupId) + } + + /** + * Handle consumer heartbeat expiration + */ + private def handleConsumerHeartbeatExpired(groupId: String, consumerId: String) { + + // If the consumer does not exist in group registry anymore, do nothing + // TODO + + // Record heartbeat failure + // TODO + + // If the maximum failures has been reached, mark consumer as failed + // TODO + } + + /** + * Handle consumer failure + */ + private def handleConsumerFailure(groupId: String, consumerId: String) { + + // Stop scheduling heartbeat tasks for this consumer + // TODO + + // Remove the consumer from its group registry info + // TODO + + // Cut the socket connection tothe consumer + // TODO: howto ?? + + // If the group has no consumer members any more, remove the group + // TODO + // Otherwise trigger a rebalance process if it is not under rebalance yet + // TODO + + } + + /** + * Prepare partition rebalance for the group + */ + private def handlePrepareRebalance(groupId: String) { + + // try to change the group state to PrepareRebalance + + // TODO + } + + /** + * Start partition rebalance for the group + */ + private def handleStartRebalance(groupId: String) { + + // try to change the group state to UnderRebalance + + // compute new assignment based on the strategy + + // send back the join-group response + + // TODO + } + + /** + * Fail current partition rebalance for the group + */ + + /** + * Register ZK listeners for topic-partition changes + */ + private def registerTopicChangeListener(topic: String) = { + if (!topicPartitionChangeListeners.containsKey(topic)) { + val listener = new TopicPartitionChangeListener(config) + topicPartitionChangeListeners.put(topic, listener) + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getTopicPath(topic)) + zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), listener) + } + } + + /** + * De-register ZK listeners for topic-partition changes + */ + private def deregisterTopicChangeListener(topic: String) = { + val listener = Utils.notNull(topicPartitionChangeListeners.get(topic)) + zkClient.unsubscribeChildChanges(ZkUtils.getTopicPath(topic), listener) + topicPartitionChangeListeners.remove(topic) + } + + /** + * Delayed heartbeat operations that are added to the purgatory for session-timeout checking + * + * These operations will always be expired. Once it has expired, all its + * currently contained consumers are marked as heartbeat timed out. + */ + class DelayedHeartbeat(val bucket: HeartbeatBucket, + val sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { + + /* this function should never be called */ + override def tryComplete(): Boolean = { + + throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket") + } + + /* mark all consumers within the heartbeat as heartbeat timed out */ + override def onComplete() { + for (registry <- bucket.consumerRegistryList) + handleConsumerHeartbeatExpired(registry.groupRegistry.groupId, registry.consumerId) + } + } + + /** + * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance + * + * Whenever a join-group request is received, check if all known consumers have requested + * to re-join the group; if yes, complete this operation to proceed rebalance. + * + * When the operation has expired, any known consumers that have not requested to re-join + * the group are marked as failed, and complete this operation to proceed rebalance with + * the rest of the group. + */ + class DelayedRebalance(val groupRegistry: GroupRegistry, + val sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { + + val allConsumersJoinedGroup = new AtomicBoolean(false) + + /* check if all known consumers have requested to re-join group */ + override def tryComplete(): Boolean = { + allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft + (true) ((agg, cur) => agg && cur.joinGroupReceived.get())) + + if (allConsumersJoinedGroup.get()) + forceComplete() + else + false + } + + /* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */ + override def onComplete() { + groupRegistry.memberRegistries.values.foreach(consumerRegistry => + if (!consumerRegistry.joinGroupReceived.get()) + handleConsumerFailure(groupRegistry.groupId, consumerRegistry.consumerId) + ) + + handleStartRebalance(groupRegistry.groupId) + } + } + + /** + * Delayed join-group operations that are kept in the purgatory before the partition assignment completed + * + * These operation should never expire; when the rebalance has completed, all consumer's + * join-group operations will be completed by sending back the response with the + * calculated partition assignment. + */ + class DelayedJoinGroup(val consumerRegistry: ConsumerRegistry, + val sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { + + /* always successfully complete the operation once called */ + override def tryComplete(): Boolean = { + forceComplete() + } + + /* always assume the partition is already assigned as this delayed operation should never time-out */ + override def onComplete() { + + // TODO + } + } + + /** + * Zookeeper listener that catch topic-partition changes + */ + class TopicPartitionChangeListener(val config: KafkaConfig) extends IZkChildListener with Logging { + + this.logIdent = "[TopicChangeListener on coordinator " + config.brokerId + "]: " + + /** + * Try to trigger a rebalance for each group subscribed in the changed topic + * + * @throws Exception + * On any error. + */ + def handleChildChange(parentPath: String , curChilds: java.util.List[String]) { + consumerRegistryLock synchronized { + debug("Fired for path %s with children %s".format(parentPath, curChilds)) + + // get the topic + val topic = parentPath.split("/").last + + // get groups that subscribed to this topic + val groups = Utils.notNull(consumerGroupsPerTopic.get(topic)) + + for (groupId <- groups) { + handlePrepareRebalance(groupId) + } + } + } + } +} + + diff --git a/core/src/main/scala/kafka/coordinator/GroupRegistry.scala b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala new file mode 100644 index 0000000..a3becc8 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import java.util.concurrent.atomic._ +import java.util.HashMap + +import scala.collection.mutable + +object ConsumerRegistry { + val UnknownConsumerId = "unknown-consumer-id" +} + +/** + * Consumer registry metadata contains the following metadata: + * + * Heartbeat metadata: + * 1. negotiated heartbeat session timeout. + * 2. recorded number of timed-out heartbeats. + * 3. associated heartbeat bucket in the purgatory. + * + * Subscription metadata: + * 1. subscribed topic list + * 2. assigned partitions for the subscribed topics. + */ +class ConsumerRegistry(val consumerId: String, + val subscribedTopics: List[String], + val sessionTimeoutMs: Int, + val groupRegistry: GroupRegistry) { + + /* number of expired heartbeat recorded */ + val numExpiredHeartbeat = new AtomicInteger(0) + + /* flag indicating if join group request is received */ + val joinGroupReceived = new AtomicBoolean(false) + + /* assigned partitions per subscribed topic */ + val assignedPartitions = new HashMap[String, List[Int]] + + /* associated heartbeat bucket */ + var currentHeartbeatBucket = null + +} + +sealed trait GroupStates { def state: Byte } + +/** + * Consumer group is preparing start rebalance + * + * action: respond consumer heartbeat with error code, + * transition: all known consumers has re-joined group => UnderRebalance + */ +case object PrepareRebalance extends GroupStates { val state: Byte = 1 } + +/** + * Consumer group is under rebalance + * + * action: send the join-group response with new assignment + * transition: all consumers has heartbeat with the new generation id => Fetching + * new consumer join-group received => PrepareRebalance + */ +case object UnderRebalance extends GroupStates { val state: Byte = 2 } + +/** + * Consumer group is fetching data + * + * action: respond consumer heartbeat normally + * transition: consumer failure detected via heartbeat => PrepareRebalance + * consumer join-group received => PrepareRebalance + * zookeeper watcher fired => PrepareRebalance + */ +case object Fetching extends GroupStates { val state: Byte = 3 } + +case class GroupState() { + @volatile var currentState: Byte = PrepareRebalance.state +} + +object GroupRegistry { + val RangeAssignmentStrategy = "range-assignment-strategy" + val RoundRobinAssignmentStrategy = "round-robin-assignment-strategy" +} + +/* Group registry contains the following metadata of a registered group in the coordinator: + * + * Membership metadata: + * 1. List of consumers registered in this group + * 2. Partition assignment strategy for this group + * + * State metadata: + * 1. Current group state + * 2. Current group generation id + */ +class GroupRegistry(val groupId: String, + val partitionAssignmentStrategy: String) { + + val memberRegistries = new mutable.HashMap[String, ConsumerRegistry]() + + val state = new GroupState() + + val generationId = 1 +} + diff --git a/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala new file mode 100644 index 0000000..821e26e --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import scala.collection.mutable + +/** + * A bucket of consumers that are scheduled for heartbeat expiration. + * + * The motivation behind this is to avoid expensive fine-grained per-consumer + * heartbeat expiration but use coarsen-grained methods that group consumers + * with similar deadline together. This will result in some consumers not + * being expired for heartbeats in time but is tolerable. + */ +class HeartbeatBucket(val startMs: Long, endMs: Long) { + + /* The list of consumers that are contained in this bucket */ + val consumerRegistryList = new mutable.HashSet[ConsumerRegistry] + + // TODO +} -- 1.7.12.4