From a61b277a0439f049f494c00fa65d98452db816c2 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 30 Jan 2015 10:39:40 -0800 Subject: [PATCH 1/5] dummy --- .../main/scala/kafka/server/DelayedOperationKey.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala index fb7e9ed..93e1b87 100644 --- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala +++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala @@ -30,9 +30,28 @@ object DelayedOperationKey { val globalLabel = "All" } +/* used by delayed-produce and delayed-fetch operations */ case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey { def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) override def keyLabel = "%s-%d".format(topic, partition) } + +/* used by bucketized delayed-heartbeat operations */ +case class TimeMsKey(time: Long) extends DelayedOperationKey { + + override def keyLabel = "%ld".format(time) +} + +/* used by delayed-join-group operations */ +case class ConsumerKey(groupId: String, consumerId: String) extends DelayedOperationKey { + + override def keyLabel = "%s-%d".format(groupId, consumerId) +} + +/* used by delayed-rebalance operations */ +case class ConsumerGroupKey(groupId: String) extends DelayedOperationKey { + + override def keyLabel = "%s".format(groupId) +} -- 1.7.12.4 From e879d545090d98353ffa6d79aa4685985527fb12 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 30 Jan 2015 12:01:45 -0800 Subject: [PATCH 2/5] dummy 2 --- core/src/main/scala/kafka/server/KafkaApis.scala | 67 ++++++++++++++-------- core/src/main/scala/kafka/server/KafkaServer.scala | 46 ++++++++++----- 2 files changed, 74 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f2b027b..8426bbd 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -20,21 +20,18 @@ package kafka.server import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.JoinGroupResponse import org.apache.kafka.common.requests.HeartbeatResponse -import org.apache.kafka.common.requests.ResponseHeader -import org.apache.kafka.common.protocol.types.Struct +import org.apache.kafka.common.TopicPartition import kafka.api._ +import kafka.admin.AdminUtils import kafka.common._ +import kafka.controller.KafkaController +import kafka.coordinator.ConsumerCoordinator import kafka.log._ import kafka.network._ -import kafka.admin.AdminUtils import kafka.network.RequestChannel.Response -import kafka.controller.KafkaController import kafka.utils.{SystemTime, Logging} -import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic._ import scala.collection._ import org.I0Itec.zkclient.ZkClient @@ -45,14 +42,14 @@ import org.I0Itec.zkclient.ZkClient class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, val offsetManager: OffsetManager, + val coordinator: ConsumerCoordinator, + val controller: KafkaController, val zkClient: ZkClient, val brokerId: Int, - val config: KafkaConfig, - val controller: KafkaController) extends Logging { + val config: KafkaConfig) extends Logging { this.logIdent = "[KafkaApi-%d] ".format(brokerId) val metadataCache = new MetadataCache - private var consumerGroupGenerationId = 0 /** * Top-level method that handles all requests and multiplexes to the right api @@ -137,7 +134,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetCommitRequest(request: RequestChannel.Request) { val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] - // the callback for sending the response + // the callback for sending offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) { commitStatus.foreach { case (topicAndPartition, errorCode) => // we only print warnings for known errors here; only replica manager could see an unknown @@ -169,7 +166,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] - // the callback for sending the response + // the callback for sending produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { var errorInResponse = false responseStatus.foreach { case (topicAndPartition, status) => @@ -224,7 +221,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - // the callback for sending the response + // the callback for sending fetch response def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { responsePartitionData.foreach { case (topicAndPartition, data) => // we only print warnings for known errors here; if it is unknown, it will cause @@ -456,20 +453,42 @@ class KafkaApis(val requestChannel: RequestChannel, def handleJoinGroupRequest(request: RequestChannel.Request) { import JavaConversions._ - val joinGroupReq = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader] - val topics = joinGroupReq.body.topics().toSet - val partitions = this.replicaManager.logManager.allLogs.filter(log => topics.contains(log.topicAndPartition.topic)) - val partitionList = partitions.map(_.topicAndPartition).map(tp => new org.apache.kafka.common.TopicPartition(tp.topic, tp.partition)).toBuffer - this.consumerGroupGenerationId += 1 - val response = new JoinGroupResponse(ErrorMapping.NoError, this.consumerGroupGenerationId, joinGroupReq.body.consumerId, partitionList) - val send = new BoundedByteBufferSend(new JoinGroupResponseAndHeader(joinGroupReq.correlationId, response)) - requestChannel.sendResponse(new RequestChannel.Response(request, send)) + + val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader] + + // the callback for sending join-group reponse + def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { + val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer + val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.body.consumerId, partitionList) + val response = new JoinGroupResponseAndHeader(joinGroupRequest.correlationId, responseBody) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + + // let the coordinator to handle join-group + coordinator.handleJoinGroup( + joinGroupRequest.body.groupId(), + joinGroupRequest.body.consumerId(), + joinGroupRequest.body.topics(), + joinGroupRequest.body.sessionTimeout(), + joinGroupRequest.body.strategy(), + sendResponseCallback) } def handleHeartbeatRequest(request: RequestChannel.Request) { - val hbReq = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader] - val send = new BoundedByteBufferSend(new HeartbeatResponseAndHeader(hbReq.correlationId, new HeartbeatResponse(Errors.NONE.code))) - requestChannel.sendResponse(new RequestChannel.Response(request, send)) + val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader] + + // the callback for sending heartbeat response + def sendResponseCallback(errorCode: Short) { + val response = new HeartbeatResponseAndHeader(heartbeatRequest.correlationId, new HeartbeatResponse(errorCode)) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + + // let the coordinator to handle heartbeat + coordinator.handleHeartbeat( + heartbeatRequest.body.groupId(), + heartbeatRequest.body.consumerId(), + heartbeatRequest.body.groupGenerationId(), + sendResponseCallback) } def close() { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 89200da..046f5d3 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -34,30 +34,41 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker import kafka.network.{Receive, BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge +import kafka.coordinator.ConsumerCoordinator /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. */ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { - private var isShuttingDown = new AtomicBoolean(false) - private var shutdownLatch = new CountDownLatch(1) - private var startupComplete = new AtomicBoolean(false) - private var brokerId: Int = -1 + private val startupComplete = new AtomicBoolean(false) + private val isShuttingDown = new AtomicBoolean(false) + private val shutdownLatch = new CountDownLatch(1) val brokerState: BrokerState = new BrokerState - val correlationId: AtomicInteger = new AtomicInteger(0) + + var apis: KafkaApis = null var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null + var logManager: LogManager = null + var offsetManager: OffsetManager = null - var kafkaHealthcheck: KafkaHealthcheck = null - var topicConfigManager: TopicConfigManager = null + var replicaManager: ReplicaManager = null - var apis: KafkaApis = null + + var topicConfigManager: TopicConfigManager = null + + var consumerCoordinator: ConsumerCoordinator = null + var kafkaController: KafkaController = null + val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) + + var kafkaHealthcheck: KafkaHealthcheck = null + var zkClient: ZkClient = null + val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap @@ -76,8 +87,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg try { info("starting") brokerState.newState(Starting) - isShuttingDown = new AtomicBoolean(false) - shutdownLatch = new CountDownLatch(1) /* start scheduler */ kafkaScheduler.startup() @@ -106,24 +115,29 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.maxConnectionsPerIpOverrides) socketServer.startup() + /* start replica manager */ replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) + replicaManager.startup() /* start offset manager */ offsetManager = createOffsetManager() + /* start kafka controller */ kafkaController = new KafkaController(config, zkClient, brokerState) + kafkaController.startup() + + /* start kafka coordinator */ + consumerCoordinator = new ConsumerCoordinator(config, zkClient) + consumerCoordinator.startup() /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) + apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() - replicaManager.startup() - - kafkaController.startup() - + /* start topic config manager */ topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() @@ -290,6 +304,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(replicaManager.shutdown()) if(logManager != null) Utils.swallow(logManager.shutdown()) + if(consumerCoordinator != null) + Utils.swallow(consumerCoordinator.shutdown()) if(kafkaController != null) Utils.swallow(kafkaController.shutdown()) if(zkClient != null) -- 1.7.12.4 From df254348dc0e07f3e6d0257b2a4766606002771e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sat, 31 Jan 2015 17:35:32 -0800 Subject: [PATCH 3/5] minor --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8426bbd..0b78838 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -468,7 +468,7 @@ class KafkaApis(val requestChannel: RequestChannel, coordinator.handleJoinGroup( joinGroupRequest.body.groupId(), joinGroupRequest.body.consumerId(), - joinGroupRequest.body.topics(), + joinGroupRequest.body.topics().toList, joinGroupRequest.body.sessionTimeout(), joinGroupRequest.body.strategy(), sendResponseCallback) -- 1.7.12.4 From e0e0fc173cf7e3347794decb4cda1f411c33dba0 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sat, 31 Jan 2015 17:37:41 -0800 Subject: [PATCH 4/5] three new file --- .../kafka/coordinator/ConsumerCoordinator.scala | 395 +++++++++++++++++++++ .../scala/kafka/coordinator/GroupRegistry.scala | 117 ++++++ .../scala/kafka/coordinator/HeartbeatBucket.scala | 36 ++ 3 files changed, 548 insertions(+) create mode 100644 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala create mode 100644 core/src/main/scala/kafka/coordinator/GroupRegistry.scala create mode 100644 core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala new file mode 100644 index 0000000..cdf1ae4 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator + +import org.apache.kafka.common.protocol.Errors + +import kafka.server._ +import kafka.utils._ + +import java.util.concurrent.atomic.AtomicBoolean +import java.util.HashMap +import java.util.Set + +import org.I0Itec.zkclient.{IZkChildListener, ZkClient} +import kafka.common.TopicAndPartition + + +/** + * Kafka coordinator handles consumer group and consumer offset management. + * + * Each Kafka server instantiates a coordinator, which is responsible for a set of + * consumer groups; the consumer groups are assigned to coordinators based on their + * group names. + */ +class ConsumerCoordinator(val config: KafkaConfig, + val zkClient: ZkClient) extends Logging { + + this.logIdent = "[Kafka Coordinator " + config.brokerId + "]: " + + /* the consumer registry synchronization lock */ + private val consumerRegistryLock = new Object + + /* zookeeper listener for topic-partition changes */ + private val topicPartitionChangeListeners = new HashMap[String, TopicPartitionChangeListener] + + /* the consumer group registry cache */ + private val consumerGroupRegistries = new HashMap[String, GroupRegistry] + + /* the list of subscribed groups per topic */ + private val consumerGroupsPerTopic = new HashMap[String, List[String]] + + /* the delayed operation purgatory for heartbeat-based failure detection */ + private val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](config.brokerId) + + /* the delayed operation purgatory for preparing rebalance process */ + private val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](config.brokerId) + + /* the delayed operation purgatory for handling join-group requests */ + private val joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](config.brokerId) + + /* latest consumer heartbeat bucket's end timestamp in milliseconds */ + private var latestHeartbeatBucketEndMs = SystemTime.milliseconds + + /** + * Upon starting up, do the following: + * + * 1. Initialize consumer heartbeat scheduler + * 2. Initialize group partition rebalance processor + * 3. Register Zookeeper listener for topic-partition changes + */ + def startup() { + + // TODO + } + + /** + * Upon shutting down, do the following (ordering of actions should be reverse of the start-up process): + * + * 1. De-register Zookeeper listeners for topic-partition changes + * 2. If group partition rebalance processor exists, shut it down + * 3. If consumer heartbeat scheduler exists, shut it down + */ + def shutdown() { + + // TODO + } + + /** + * Handle JoinGroupRequest + */ + def handleJoinGroup(groupId: String, + consumerId: String, + topics: List[String], + sessionTimeoutMs: Int, + partitionAssignmentStrategy: String, + responseCallback:(List[TopicAndPartition], Int, Short) => Unit ) { + + // if the group does not exist yet, create one + if (!consumerGroupRegistries.containsKey(groupId)) + handleNewGroup(groupId, partitionAssignmentStrategy) + + // if the consumer id is unknown or it does exists in + // the group yet, register this consumer to the group + // TODO + + // if the current group is under rebalance process, + // indicate the rebalance processor of the received join request + // TODO + + // just return all the partitions of the subscribed topics + val partitionIdsPerTopic = ZkUtils.getPartitionsForTopics(zkClient, topics) + val partitions = partitionIdsPerTopic.flatMap{ case (topic, partitionIds) => + partitionIds.map(partition => { + TopicAndPartition(topic, partition) + }) + }.toList + + responseCallback(partitions, 1 /* generation id */, Errors.NONE.code) + + info("Handled join-group from consumer " + consumerId + " to group " + groupId) + } + + /** + * Handle HeartbeatRequest + */ + def handleHeartbeat(groupId: String, + consumerId: String, + generationId: Int, + responseCallback: Short => Unit) { + + // check that the group already exists + // TODO + + // check that the consumer has already registered for the group + // TODO + + // check if the consumer generation id is correct + // TODO + + // remove the consumer from its current heartbeat bucket, and add it back to the corresponding bucket + // TODO + + // create the heartbeat response, if partition rebalance is triggered set the corresponding error code + // TODO + + info("Handled heartbeat of consumer " + consumerId + " from group " + groupId) + + responseCallback(Errors.NONE.code) + } + + /** + * Handle new consumer registration to the group + */ + private def handleNewConsumer(consumerId: String, + topics: List[String], + sessionTimeoutMs: Int, + groupRegistry: GroupRegistry) { + debug("Registering consumer " + consumerId + " for group " + groupRegistry.groupId) + + // create the new consumer registry entry + // TODO: specify consumerId as unknown and update at the end of the prepare-rebalance phase + + // Check if the partition assignment strategy is consistent with the group + // TODO + + // Add the group to the subscribed topics + // TODO + + // Schedule heartbeat tasks for the consumer + // TODO + + // Add the member registry entry to the group + // TODO + + // Trigger the partition rebalance process + // TODO + + info("Registered consumer " + consumerId + " for group " + groupRegistry.groupId) + } + + /** + * Handle new group creation + */ + private def handleNewGroup(groupId: String, partitionAssignmentStrategy: String) { + debug("Creating new group " + groupId) + + consumerRegistryLock synchronized { + consumerGroupRegistries.put(groupId, new GroupRegistry(groupId, partitionAssignmentStrategy)) + } + + info("Created new group registry " + groupId) + } + + /** + * Handle consumer heartbeat expiration + */ + private def handleConsumerHeartbeatExpired(groupId: String, consumerId: String) { + + // If the consumer does not exist in group registry anymore, do nothing + // TODO + + // Record heartbeat failure + // TODO + + // If the maximum failures has been reached, mark consumer as failed + // TODO + } + + /** + * Handle consumer failure + */ + private def handleConsumerFailure(groupId: String, consumerId: String) { + + // Stop scheduling heartbeat tasks for this consumer + // TODO + + // Remove the consumer from its group registry info + // TODO + + // Cut the socket connection tothe consumer + // TODO: howto ?? + + // If the group has no consumer members any more, remove the group + // TODO + // Otherwise trigger a rebalance process if it is not under rebalance yet + // TODO + + } + + /** + * Prepare partition rebalance for the group + */ + private def handlePrepareRebalance(groupId: String) { + + // try to change the group state to PrepareRebalance + + // TODO + } + + /** + * Start partition rebalance for the group + */ + private def handleStartRebalance(groupId: String) { + + // try to change the group state to UnderRebalance + + // compute new assignment based on the strategy + + // send back the join-group response + + // TODO + } + + /** + * Fail current partition rebalance for the group + */ + + /** + * Register ZK listeners for topic-partition changes + */ + private def registerTopicChangeListener(topic: String) = { + if (!topicPartitionChangeListeners.containsKey(topic)) { + val listener = new TopicPartitionChangeListener(config) + topicPartitionChangeListeners.put(topic, listener) + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getTopicPath(topic)) + zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), listener) + } + } + + /** + * De-register ZK listeners for topic-partition changes + */ + private def deregisterTopicChangeListener(topic: String) = { + val listener = Utils.notNull(topicPartitionChangeListeners.get(topic)) + zkClient.unsubscribeChildChanges(ZkUtils.getTopicPath(topic), listener) + topicPartitionChangeListeners.remove(topic) + } + + /** + * Delayed heartbeat operations that are added to the purgatory for session-timeout checking + * + * These operations will always be expired. Once it has expired, all its + * currently contained consumers are marked as heartbeat timed out. + */ + class DelayedHeartbeat(val bucket: HeartbeatBucket, + val sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { + + /* this function should never be called */ + override def tryComplete(): Boolean = { + + throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket") + } + + /* mark all consumers within the heartbeat as heartbeat timed out */ + override def onComplete() { + for (registry <- bucket.consumerRegistryList) + handleConsumerHeartbeatExpired(registry.groupRegistry.groupId, registry.consumerId) + } + } + + /** + * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance + * + * Whenever a join-group request is received, check if all known consumers have requested + * to re-join the group; if yes, complete this operation to proceed rebalance. + * + * When the operation has expired, any known consumers that have not requested to re-join + * the group are marked as failed, and complete this operation to proceed rebalance with + * the rest of the group. + */ + class DelayedRebalance(val groupRegistry: GroupRegistry, + val sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { + + val allConsumersJoinedGroup = new AtomicBoolean(false) + + /* check if all known consumers have requested to re-join group */ + override def tryComplete(): Boolean = { + allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft + (true) ((agg, cur) => agg && cur.joinGroupReceived.get())) + + if (allConsumersJoinedGroup.get()) + forceComplete() + else + false + } + + /* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */ + override def onComplete() { + groupRegistry.memberRegistries.values.foreach(consumerRegistry => + if (!consumerRegistry.joinGroupReceived.get()) + handleConsumerFailure(groupRegistry.groupId, consumerRegistry.consumerId) + ) + + handleStartRebalance(groupRegistry.groupId) + } + } + + /** + * Delayed join-group operations that are kept in the purgatory before the partition assignment completed + * + * These operation should never expire; when the rebalance has completed, all consumer's + * join-group operations will be completed by sending back the response with the + * calculated partition assignment. + */ + class DelayedJoinGroup(val consumerRegistry: ConsumerRegistry, + val sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { + + /* always successfully complete the operation once called */ + override def tryComplete(): Boolean = { + forceComplete() + } + + /* always assume the partition is already assigned as this delayed operation should never time-out */ + override def onComplete() { + + // TODO + } + } + + /** + * Zookeeper listener that catch topic-partition changes + */ + class TopicPartitionChangeListener(val config: KafkaConfig) extends IZkChildListener with Logging { + + this.logIdent = "[TopicChangeListener on coordinator " + config.brokerId + "]: " + + /** + * Try to trigger a rebalance for each group subscribed in the changed topic + * + * @throws Exception + * On any error. + */ + def handleChildChange(parentPath: String , curChilds: java.util.List[String]) { + consumerRegistryLock synchronized { + debug("Fired for path %s with children %s".format(parentPath, curChilds)) + + // get the topic + val topic = parentPath.split("/").last + + // get groups that subscribed to this topic + val groups = Utils.notNull(consumerGroupsPerTopic.get(topic)) + + for (groupId <- groups) { + handlePrepareRebalance(groupId) + } + } + } + } +} + + diff --git a/core/src/main/scala/kafka/coordinator/GroupRegistry.scala b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala new file mode 100644 index 0000000..a3becc8 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import java.util.concurrent.atomic._ +import java.util.HashMap + +import scala.collection.mutable + +object ConsumerRegistry { + val UnknownConsumerId = "unknown-consumer-id" +} + +/** + * Consumer registry metadata contains the following metadata: + * + * Heartbeat metadata: + * 1. negotiated heartbeat session timeout. + * 2. recorded number of timed-out heartbeats. + * 3. associated heartbeat bucket in the purgatory. + * + * Subscription metadata: + * 1. subscribed topic list + * 2. assigned partitions for the subscribed topics. + */ +class ConsumerRegistry(val consumerId: String, + val subscribedTopics: List[String], + val sessionTimeoutMs: Int, + val groupRegistry: GroupRegistry) { + + /* number of expired heartbeat recorded */ + val numExpiredHeartbeat = new AtomicInteger(0) + + /* flag indicating if join group request is received */ + val joinGroupReceived = new AtomicBoolean(false) + + /* assigned partitions per subscribed topic */ + val assignedPartitions = new HashMap[String, List[Int]] + + /* associated heartbeat bucket */ + var currentHeartbeatBucket = null + +} + +sealed trait GroupStates { def state: Byte } + +/** + * Consumer group is preparing start rebalance + * + * action: respond consumer heartbeat with error code, + * transition: all known consumers has re-joined group => UnderRebalance + */ +case object PrepareRebalance extends GroupStates { val state: Byte = 1 } + +/** + * Consumer group is under rebalance + * + * action: send the join-group response with new assignment + * transition: all consumers has heartbeat with the new generation id => Fetching + * new consumer join-group received => PrepareRebalance + */ +case object UnderRebalance extends GroupStates { val state: Byte = 2 } + +/** + * Consumer group is fetching data + * + * action: respond consumer heartbeat normally + * transition: consumer failure detected via heartbeat => PrepareRebalance + * consumer join-group received => PrepareRebalance + * zookeeper watcher fired => PrepareRebalance + */ +case object Fetching extends GroupStates { val state: Byte = 3 } + +case class GroupState() { + @volatile var currentState: Byte = PrepareRebalance.state +} + +object GroupRegistry { + val RangeAssignmentStrategy = "range-assignment-strategy" + val RoundRobinAssignmentStrategy = "round-robin-assignment-strategy" +} + +/* Group registry contains the following metadata of a registered group in the coordinator: + * + * Membership metadata: + * 1. List of consumers registered in this group + * 2. Partition assignment strategy for this group + * + * State metadata: + * 1. Current group state + * 2. Current group generation id + */ +class GroupRegistry(val groupId: String, + val partitionAssignmentStrategy: String) { + + val memberRegistries = new mutable.HashMap[String, ConsumerRegistry]() + + val state = new GroupState() + + val generationId = 1 +} + diff --git a/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala new file mode 100644 index 0000000..821e26e --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import scala.collection.mutable + +/** + * A bucket of consumers that are scheduled for heartbeat expiration. + * + * The motivation behind this is to avoid expensive fine-grained per-consumer + * heartbeat expiration but use coarsen-grained methods that group consumers + * with similar deadline together. This will result in some consumers not + * being expired for heartbeats in time but is tolerable. + */ +class HeartbeatBucket(val startMs: Long, endMs: Long) { + + /* The list of consumers that are contained in this bucket */ + val consumerRegistryList = new mutable.HashSet[ConsumerRegistry] + + // TODO +} -- 1.7.12.4 From 01de221cc13d4c7f1a620df5f5b0ad90fc75760e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 6 Feb 2015 15:02:30 -0800 Subject: [PATCH 5/5] Address Jay and Onur's comments --- .../kafka/coordinator/ConsumerCoordinator.scala | 247 ++++++++------------- .../scala/kafka/coordinator/ConsumerRegistry.scala | 52 +++++ .../scala/kafka/coordinator/DelayedHeartbeat.scala | 44 ++++ .../scala/kafka/coordinator/DelayedJoinGroup.scala | 44 ++++ .../scala/kafka/coordinator/DelayedRebalance.scala | 62 ++++++ .../scala/kafka/coordinator/GroupRegistry.scala | 47 +--- .../main/scala/kafka/network/SocketServer.scala | 2 +- .../scala/kafka/server/DelayedOperationKey.scala | 8 +- core/src/main/scala/kafka/server/KafkaApis.scala | 16 +- core/src/main/scala/kafka/server/KafkaServer.scala | 158 +++++++------ .../main/scala/kafka/server/MetadataCache.scala | 17 +- .../main/scala/kafka/server/OffsetManager.scala | 2 + core/src/main/scala/kafka/tools/MirrorMaker.scala | 6 +- core/src/test/resources/log4j.properties | 6 +- .../kafka/api/ProducerFailureHandlingTest.scala | 2 +- .../unit/kafka/server/ServerShutdownTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 6 + 17 files changed, 427 insertions(+), 294 deletions(-) create mode 100644 core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala create mode 100644 core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala create mode 100644 core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala create mode 100644 core/src/main/scala/kafka/coordinator/DelayedRebalance.scala diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index cdf1ae4..ecef0d2 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -18,15 +18,13 @@ package kafka.coordinator import org.apache.kafka.common.protocol.Errors +import kafka.common.TopicAndPartition import kafka.server._ import kafka.utils._ -import java.util.concurrent.atomic.AtomicBoolean -import java.util.HashMap -import java.util.Set +import scala.collection.mutable.HashMap import org.I0Itec.zkclient.{IZkChildListener, ZkClient} -import kafka.common.TopicAndPartition /** @@ -41,76 +39,96 @@ class ConsumerCoordinator(val config: KafkaConfig, this.logIdent = "[Kafka Coordinator " + config.brokerId + "]: " - /* the consumer registry synchronization lock */ - private val consumerRegistryLock = new Object - /* zookeeper listener for topic-partition changes */ private val topicPartitionChangeListeners = new HashMap[String, TopicPartitionChangeListener] /* the consumer group registry cache */ + // TODO: access to this map needs to be synchronized private val consumerGroupRegistries = new HashMap[String, GroupRegistry] /* the list of subscribed groups per topic */ + // TODO: access to this map needs to be synchronized private val consumerGroupsPerTopic = new HashMap[String, List[String]] /* the delayed operation purgatory for heartbeat-based failure detection */ - private val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](config.brokerId) - - /* the delayed operation purgatory for preparing rebalance process */ - private val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](config.brokerId) + private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null /* the delayed operation purgatory for handling join-group requests */ - private val joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](config.brokerId) + private var joinGroupPurgatory: DelayedOperationPurgatory[DelayedJoinGroup] = null + + /* the delayed operation purgatory for preparing rebalance process */ + private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null /* latest consumer heartbeat bucket's end timestamp in milliseconds */ - private var latestHeartbeatBucketEndMs = SystemTime.milliseconds + private var latestHeartbeatBucketEndMs: Long = SystemTime.milliseconds /** - * Upon starting up, do the following: - * - * 1. Initialize consumer heartbeat scheduler - * 2. Initialize group partition rebalance processor - * 3. Register Zookeeper listener for topic-partition changes + * Start-up logic executed at the same time when the server starts up. */ def startup() { - // TODO + // Initialize consumer group registries and heartbeat bucket metadata + latestHeartbeatBucketEndMs = SystemTime.milliseconds + + // Initialize purgatories for delayed heartbeat, join-group and rebalance operations + heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](config.brokerId) + joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](config.brokerId) + rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](config.brokerId) + } /** - * Upon shutting down, do the following (ordering of actions should be reverse of the start-up process): + * Shut-down logic executed at the same time when server shuts down, + * ordering of actions should be reversed from the start-up process * - * 1. De-register Zookeeper listeners for topic-partition changes - * 2. If group partition rebalance processor exists, shut it down - * 3. If consumer heartbeat scheduler exists, shut it down */ def shutdown() { - // TODO + // De-register all Zookeeper listeners for topic-partition changes + for (topic <- topicPartitionChangeListeners.keys) { + deregisterTopicChangeListener(topic) + } + topicPartitionChangeListeners.clear() + + // Shutdown purgatories for delayed heartbeat, join-group and rebalance operations + heartbeatPurgatory.shutdown() + joinGroupPurgatory.shutdown() + rebalancePurgatory.shutdown() + + // Clean up consumer group registries metadata + consumerGroupRegistries.clear() + consumerGroupsPerTopic.clear() } /** - * Handle JoinGroupRequest + * Process a join-group request from a consumer to join as a new group member */ - def handleJoinGroup(groupId: String, - consumerId: String, - topics: List[String], - sessionTimeoutMs: Int, - partitionAssignmentStrategy: String, - responseCallback:(List[TopicAndPartition], Int, Short) => Unit ) { + def consumerJoinGroup(groupId: String, + consumerId: String, + topics: List[String], + sessionTimeoutMs: Int, + partitionAssignmentStrategy: String, + responseCallback:(List[TopicAndPartition], Int, Short) => Unit ) { // if the group does not exist yet, create one - if (!consumerGroupRegistries.containsKey(groupId)) - handleNewGroup(groupId, partitionAssignmentStrategy) + if (!consumerGroupRegistries.contains(groupId)) + createNewGroup(groupId, partitionAssignmentStrategy) // if the consumer id is unknown or it does exists in // the group yet, register this consumer to the group // TODO + // add a delayed join-group operation to the purgatory + // TODO + // if the current group is under rebalance process, - // indicate the rebalance processor of the received join request + // check if the delayed rebalance operation can be finished // TODO + // TODO -------------------------------------------------------------- + // TODO: this is just a stub for new consumer testing, + // TODO: needs to be replaced with the logic above + // TODO -------------------------------------------------------------- // just return all the partitions of the subscribed topics val partitionIdsPerTopic = ZkUtils.getPartitionsForTopics(zkClient, topics) val partitions = partitionIdsPerTopic.flatMap{ case (topic, partitionIds) => @@ -125,9 +143,9 @@ class ConsumerCoordinator(val config: KafkaConfig, } /** - * Handle HeartbeatRequest + * Process a heartbeat request from a consumer */ - def handleHeartbeat(groupId: String, + def consumerHeartbeat(groupId: String, consumerId: String, generationId: Int, responseCallback: Short => Unit) { @@ -153,9 +171,9 @@ class ConsumerCoordinator(val config: KafkaConfig, } /** - * Handle new consumer registration to the group + * Create a new consumer */ - private def handleNewConsumer(consumerId: String, + private def createNewConsumer(consumerId: String, topics: List[String], sessionTimeoutMs: Int, groupRegistry: GroupRegistry) { @@ -164,69 +182,65 @@ class ConsumerCoordinator(val config: KafkaConfig, // create the new consumer registry entry // TODO: specify consumerId as unknown and update at the end of the prepare-rebalance phase - // Check if the partition assignment strategy is consistent with the group + // check if the partition assignment strategy is consistent with the group // TODO - // Add the group to the subscribed topics + // add the group to the subscribed topics // TODO - // Schedule heartbeat tasks for the consumer + // schedule heartbeat tasks for the consumer // TODO - // Add the member registry entry to the group + // add the member registry entry to the group // TODO - // Trigger the partition rebalance process + // start preparing group partition rebalance // TODO info("Registered consumer " + consumerId + " for group " + groupRegistry.groupId) } /** - * Handle new group creation + * Create a new consumer group in the registry */ - private def handleNewGroup(groupId: String, partitionAssignmentStrategy: String) { + private def createNewGroup(groupId: String, partitionAssignmentStrategy: String) { debug("Creating new group " + groupId) - consumerRegistryLock synchronized { - consumerGroupRegistries.put(groupId, new GroupRegistry(groupId, partitionAssignmentStrategy)) - } + val groupRegistry = new GroupRegistry(groupId, partitionAssignmentStrategy) + + consumerGroupRegistries.put(groupId, groupRegistry) info("Created new group registry " + groupId) } /** - * Handle consumer heartbeat expiration + * Callback invoked when a consumer's heartbeat has expired */ - private def handleConsumerHeartbeatExpired(groupId: String, consumerId: String) { + private def onConsumerHeartbeatExpired(groupId: String, consumerId: String) { - // If the consumer does not exist in group registry anymore, do nothing + // if the consumer does not exist in group registry anymore, do nothing // TODO - // Record heartbeat failure + // record heartbeat failure // TODO - // If the maximum failures has been reached, mark consumer as failed + // if the maximum failures has been reached, mark consumer as failed // TODO } /** - * Handle consumer failure + * Callback invoked when a consumer is marked as failed */ - private def handleConsumerFailure(groupId: String, consumerId: String) { - - // Stop scheduling heartbeat tasks for this consumer - // TODO + private def onConsumerFailure(groupId: String, consumerId: String) { - // Remove the consumer from its group registry info + // remove the consumer from its group registry metadata // TODO - // Cut the socket connection tothe consumer + // cut the socket connection to the consumer // TODO: howto ?? - // If the group has no consumer members any more, remove the group - // TODO - // Otherwise trigger a rebalance process if it is not under rebalance yet + // if the group has no consumer members any more, remove the group + // otherwise start preparing group partition rebalance // TODO } @@ -234,17 +248,19 @@ class ConsumerCoordinator(val config: KafkaConfig, /** * Prepare partition rebalance for the group */ - private def handlePrepareRebalance(groupId: String) { + private def prepareRebalance(groupId: String) { // try to change the group state to PrepareRebalance + // add a task to the delayed rebalance purgatory + // TODO } /** * Start partition rebalance for the group */ - private def handleStartRebalance(groupId: String) { + private def startRebalance(groupId: String) { // try to change the group state to UnderRebalance @@ -263,7 +279,7 @@ class ConsumerCoordinator(val config: KafkaConfig, * Register ZK listeners for topic-partition changes */ private def registerTopicChangeListener(topic: String) = { - if (!topicPartitionChangeListeners.containsKey(topic)) { + if (!topicPartitionChangeListeners.contains(topic)) { val listener = new TopicPartitionChangeListener(config) topicPartitionChangeListeners.put(topic, listener) ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getTopicPath(topic)) @@ -275,93 +291,12 @@ class ConsumerCoordinator(val config: KafkaConfig, * De-register ZK listeners for topic-partition changes */ private def deregisterTopicChangeListener(topic: String) = { - val listener = Utils.notNull(topicPartitionChangeListeners.get(topic)) + val listener = topicPartitionChangeListeners.get(topic).get zkClient.unsubscribeChildChanges(ZkUtils.getTopicPath(topic), listener) topicPartitionChangeListeners.remove(topic) } /** - * Delayed heartbeat operations that are added to the purgatory for session-timeout checking - * - * These operations will always be expired. Once it has expired, all its - * currently contained consumers are marked as heartbeat timed out. - */ - class DelayedHeartbeat(val bucket: HeartbeatBucket, - val sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { - - /* this function should never be called */ - override def tryComplete(): Boolean = { - - throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket") - } - - /* mark all consumers within the heartbeat as heartbeat timed out */ - override def onComplete() { - for (registry <- bucket.consumerRegistryList) - handleConsumerHeartbeatExpired(registry.groupRegistry.groupId, registry.consumerId) - } - } - - /** - * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance - * - * Whenever a join-group request is received, check if all known consumers have requested - * to re-join the group; if yes, complete this operation to proceed rebalance. - * - * When the operation has expired, any known consumers that have not requested to re-join - * the group are marked as failed, and complete this operation to proceed rebalance with - * the rest of the group. - */ - class DelayedRebalance(val groupRegistry: GroupRegistry, - val sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { - - val allConsumersJoinedGroup = new AtomicBoolean(false) - - /* check if all known consumers have requested to re-join group */ - override def tryComplete(): Boolean = { - allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft - (true) ((agg, cur) => agg && cur.joinGroupReceived.get())) - - if (allConsumersJoinedGroup.get()) - forceComplete() - else - false - } - - /* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */ - override def onComplete() { - groupRegistry.memberRegistries.values.foreach(consumerRegistry => - if (!consumerRegistry.joinGroupReceived.get()) - handleConsumerFailure(groupRegistry.groupId, consumerRegistry.consumerId) - ) - - handleStartRebalance(groupRegistry.groupId) - } - } - - /** - * Delayed join-group operations that are kept in the purgatory before the partition assignment completed - * - * These operation should never expire; when the rebalance has completed, all consumer's - * join-group operations will be completed by sending back the response with the - * calculated partition assignment. - */ - class DelayedJoinGroup(val consumerRegistry: ConsumerRegistry, - val sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { - - /* always successfully complete the operation once called */ - override def tryComplete(): Boolean = { - forceComplete() - } - - /* always assume the partition is already assigned as this delayed operation should never time-out */ - override def onComplete() { - - // TODO - } - } - - /** * Zookeeper listener that catch topic-partition changes */ class TopicPartitionChangeListener(val config: KafkaConfig) extends IZkChildListener with Logging { @@ -375,18 +310,16 @@ class ConsumerCoordinator(val config: KafkaConfig, * On any error. */ def handleChildChange(parentPath: String , curChilds: java.util.List[String]) { - consumerRegistryLock synchronized { - debug("Fired for path %s with children %s".format(parentPath, curChilds)) + debug("Fired for path %s with children %s".format(parentPath, curChilds)) - // get the topic - val topic = parentPath.split("/").last + // get the topic + val topic = parentPath.split("/").last - // get groups that subscribed to this topic - val groups = Utils.notNull(consumerGroupsPerTopic.get(topic)) + // get groups that subscribed to this topic + val groups = consumerGroupsPerTopic.get(topic).get - for (groupId <- groups) { - handlePrepareRebalance(groupId) - } + for (groupId <- groups) { + prepareRebalance(groupId) } } } diff --git a/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala new file mode 100644 index 0000000..b65c04d --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.HashMap + +/** + * Consumer registry metadata contains the following metadata: + * + * Heartbeat metadata: + * 1. negotiated heartbeat session timeout. + * 2. recorded number of timed-out heartbeats. + * 3. associated heartbeat bucket in the purgatory. + * + * Subscription metadata: + * 1. subscribed topic list + * 2. assigned partitions for the subscribed topics. + */ +class ConsumerRegistry(val consumerId: String, + val subscribedTopics: List[String], + val sessionTimeoutMs: Int, + val groupRegistry: GroupRegistry) { + + /* number of expired heartbeat recorded */ + val numExpiredHeartbeat = new AtomicInteger(0) + + /* flag indicating if join group request is received */ + val joinGroupReceived = new AtomicBoolean(false) + + /* assigned partitions per subscribed topic */ + val assignedPartitions = new HashMap[String, List[Int]] + + /* associated heartbeat bucket */ + var currentHeartbeatBucket = null + +} diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala new file mode 100644 index 0000000..894d6ed --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.server.DelayedOperation + +/** + * Delayed heartbeat operations that are added to the purgatory for session-timeout checking + * + * These operations will always be expired. Once it has expired, all its + * currently contained consumers are marked as heartbeat timed out. + */ +class DelayedHeartbeat(sessionTimeout: Long, + bucket: HeartbeatBucket, + expireCallback: (String, String) => Unit) + extends DelayedOperation(sessionTimeout) { + + /* this function should never be called */ + override def tryComplete(): Boolean = { + + throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket") + } + + /* mark all consumers within the heartbeat as heartbeat timed out */ + override def onComplete() { + for (registry <- bucket.consumerRegistryList) + expireCallback(registry.groupRegistry.groupId, registry.consumerId) + } +} diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala new file mode 100644 index 0000000..445bfa1 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.server.DelayedOperation + +/** + * Delayed join-group operations that are kept in the purgatory before the partition assignment completed + * + * These operation should never expire; when the rebalance has completed, all consumer's + * join-group operations will be completed by sending back the response with the + * calculated partition assignment. + */ +class DelayedJoinGroup(sessionTimeout: Long, + consumerRegistry: ConsumerRegistry, + responseCallback: () => Unit) extends DelayedOperation(sessionTimeout) { + + /* always successfully complete the operation once called */ + override def tryComplete(): Boolean = { + forceComplete() + } + + /* always assume the partition is already assigned as this delayed operation should never time-out */ + override def onComplete() { + + // TODO + responseCallback + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala new file mode 100644 index 0000000..b3b3749 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.server.DelayedOperation +import java.util.concurrent.atomic.AtomicBoolean + + +/** + * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance + * + * Whenever a join-group request is received, check if all known consumers have requested + * to re-join the group; if yes, complete this operation to proceed rebalance. + * + * When the operation has expired, any known consumers that have not requested to re-join + * the group are marked as failed, and complete this operation to proceed rebalance with + * the rest of the group. + */ +class DelayedRebalance(sessionTimeout: Long, + groupRegistry: GroupRegistry, + rebalanceCallback: String => Unit, + failureCallback: (String, String) => Unit) + extends DelayedOperation(sessionTimeout) { + + val allConsumersJoinedGroup = new AtomicBoolean(false) + + /* check if all known consumers have requested to re-join group */ + override def tryComplete(): Boolean = { + allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft + (true) ((agg, cur) => agg && cur.joinGroupReceived.get())) + + if (allConsumersJoinedGroup.get()) + forceComplete() + else + false + } + + /* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */ + override def onComplete() { + groupRegistry.memberRegistries.values.foreach(consumerRegistry => + if (!consumerRegistry.joinGroupReceived.get()) + failureCallback(groupRegistry.groupId, consumerRegistry.consumerId) + ) + + rebalanceCallback(groupRegistry.groupId) + } +} diff --git a/core/src/main/scala/kafka/coordinator/GroupRegistry.scala b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala index a3becc8..7d17e10 100644 --- a/core/src/main/scala/kafka/coordinator/GroupRegistry.scala +++ b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala @@ -17,46 +17,8 @@ package kafka.coordinator -import java.util.concurrent.atomic._ -import java.util.HashMap - import scala.collection.mutable -object ConsumerRegistry { - val UnknownConsumerId = "unknown-consumer-id" -} - -/** - * Consumer registry metadata contains the following metadata: - * - * Heartbeat metadata: - * 1. negotiated heartbeat session timeout. - * 2. recorded number of timed-out heartbeats. - * 3. associated heartbeat bucket in the purgatory. - * - * Subscription metadata: - * 1. subscribed topic list - * 2. assigned partitions for the subscribed topics. - */ -class ConsumerRegistry(val consumerId: String, - val subscribedTopics: List[String], - val sessionTimeoutMs: Int, - val groupRegistry: GroupRegistry) { - - /* number of expired heartbeat recorded */ - val numExpiredHeartbeat = new AtomicInteger(0) - - /* flag indicating if join group request is received */ - val joinGroupReceived = new AtomicBoolean(false) - - /* assigned partitions per subscribed topic */ - val assignedPartitions = new HashMap[String, List[Int]] - - /* associated heartbeat bucket */ - var currentHeartbeatBucket = null - -} - sealed trait GroupStates { def state: Byte } /** @@ -90,11 +52,6 @@ case class GroupState() { @volatile var currentState: Byte = PrepareRebalance.state } -object GroupRegistry { - val RangeAssignmentStrategy = "range-assignment-strategy" - val RoundRobinAssignmentStrategy = "round-robin-assignment-strategy" -} - /* Group registry contains the following metadata of a registered group in the coordinator: * * Membership metadata: @@ -110,8 +67,8 @@ class GroupRegistry(val groupId: String, val memberRegistries = new mutable.HashMap[String, ConsumerRegistry]() - val state = new GroupState() + val state: GroupState = new GroupState() - val generationId = 1 + var generationId: Int = 1 } diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 39b1651..76ce41a 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -107,7 +107,7 @@ class SocketServer(val brokerId: Int, */ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging { - protected val selector = Selector.open(); + protected val selector = Selector.open() private val startupLatch = new CountDownLatch(1) private val shutdownLatch = new CountDownLatch(1) private val alive = new AtomicBoolean(true) diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala index 93e1b87..b673e43 100644 --- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala +++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala @@ -39,19 +39,19 @@ case class TopicPartitionOperationKey(topic: String, partition: Int) extends Del } /* used by bucketized delayed-heartbeat operations */ -case class TimeMsKey(time: Long) extends DelayedOperationKey { +case class TTimeMsKey(time: Long) extends DelayedOperationKey { - override def keyLabel = "%ld".format(time) + override def keyLabel = "%d".format(time) } /* used by delayed-join-group operations */ case class ConsumerKey(groupId: String, consumerId: String) extends DelayedOperationKey { - override def keyLabel = "%s-%d".format(groupId, consumerId) + override def keyLabel = "%s-%s".format(groupId, consumerId) } /* used by delayed-rebalance operations */ case class ConsumerGroupKey(groupId: String) extends DelayedOperationKey { - override def keyLabel = "%s".format(groupId) + override def keyLabel = groupId } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0b78838..9241c0c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -49,7 +49,7 @@ class KafkaApis(val requestChannel: RequestChannel, val config: KafkaConfig) extends Logging { this.logIdent = "[KafkaApi-%d] ".format(brokerId) - val metadataCache = new MetadataCache + val metadataCache = new MetadataCache(brokerId) /** * Top-level method that handles all requests and multiplexes to the right api @@ -134,7 +134,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetCommitRequest(request: RequestChannel.Request) { val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] - // the callback for sending offset commit response + // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) { commitStatus.foreach { case (topicAndPartition, errorCode) => // we only print warnings for known errors here; only replica manager could see an unknown @@ -166,7 +166,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] - // the callback for sending produce response + // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { var errorInResponse = false responseStatus.foreach { case (topicAndPartition, status) => @@ -221,7 +221,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - // the callback for sending fetch response + // the callback for sending a fetch response def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { responsePartitionData.foreach { case (topicAndPartition, data) => // we only print warnings for known errors here; if it is unknown, it will cause @@ -456,7 +456,7 @@ class KafkaApis(val requestChannel: RequestChannel, val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader] - // the callback for sending join-group reponse + // the callback for sending a join-group response def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.body.consumerId, partitionList) @@ -465,7 +465,7 @@ class KafkaApis(val requestChannel: RequestChannel, } // let the coordinator to handle join-group - coordinator.handleJoinGroup( + coordinator.consumerJoinGroup( joinGroupRequest.body.groupId(), joinGroupRequest.body.consumerId(), joinGroupRequest.body.topics().toList, @@ -477,14 +477,14 @@ class KafkaApis(val requestChannel: RequestChannel, def handleHeartbeatRequest(request: RequestChannel.Request) { val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader] - // the callback for sending heartbeat response + // the callback for sending a heartbeat response def sendResponseCallback(errorCode: Short) { val response = new HeartbeatResponseAndHeader(heartbeatRequest.correlationId, new HeartbeatResponse(errorCode)) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } // let the coordinator to handle heartbeat - coordinator.handleHeartbeat( + coordinator.consumerHeartbeat( heartbeatRequest.body.groupId(), heartbeatRequest.body.consumerId(), heartbeatRequest.body.groupGenerationId(), diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 046f5d3..159266e 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -42,8 +42,11 @@ import kafka.coordinator.ConsumerCoordinator */ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { private val startupComplete = new AtomicBoolean(false) + private val shutdownComplete = new AtomicBoolean(true) private val isShuttingDown = new AtomicBoolean(false) - private val shutdownLatch = new CountDownLatch(1) + private val isStartingUp = new AtomicBoolean(false) + + private var shutdownLatch: CountDownLatch = null val brokerState: BrokerState = new BrokerState @@ -86,72 +89,88 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def startup() { try { info("starting") - brokerState.newState(Starting) - - /* start scheduler */ - kafkaScheduler.startup() - - /* setup zookeeper */ - zkClient = initZk() - - /* start log manager */ - logManager = createLogManager(zkClient, brokerState) - logManager.startup() - - /* generate brokerId */ - config.brokerId = getBrokerId - this.logIdent = "[Kafka Server " + config.brokerId + "], " - - socketServer = new SocketServer(config.brokerId, - config.hostName, - config.port, - config.numNetworkThreads, - config.queuedMaxRequests, - config.socketSendBufferBytes, - config.socketReceiveBufferBytes, - config.socketRequestMaxBytes, - config.maxConnectionsPerIp, - config.connectionsMaxIdleMs, - config.maxConnectionsPerIpOverrides) - socketServer.startup() - - /* start replica manager */ - replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) - replicaManager.startup() - - /* start offset manager */ - offsetManager = createOffsetManager() - - /* start kafka controller */ - kafkaController = new KafkaController(config, zkClient, brokerState) - kafkaController.startup() - - /* start kafka coordinator */ - consumerCoordinator = new ConsumerCoordinator(config, zkClient) - consumerCoordinator.startup() - - /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config) - requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) - brokerState.newState(RunningAsBroker) - - Mx4jLoader.maybeLoad() - - /* start topic config manager */ - topicConfigManager = new TopicConfigManager(zkClient, logManager) - topicConfigManager.startup() - - /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) - kafkaHealthcheck.startup() - - registerStats() - startupComplete.set(true) - info("started") + + if(isShuttingDown.get) + throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!") + + if(startupComplete.get) + return + + val canStartup = isStartingUp.compareAndSet(false, true) + if (canStartup) { + brokerState.newState(Starting) + + /* start scheduler */ + kafkaScheduler.startup() + + /* setup zookeeper */ + zkClient = initZk() + + /* start log manager */ + logManager = createLogManager(zkClient, brokerState) + logManager.startup() + + /* generate brokerId */ + config.brokerId = getBrokerId + this.logIdent = "[Kafka Server " + config.brokerId + "], " + + socketServer = new SocketServer(config.brokerId, + config.hostName, + config.port, + config.numNetworkThreads, + config.queuedMaxRequests, + config.socketSendBufferBytes, + config.socketReceiveBufferBytes, + config.socketRequestMaxBytes, + config.maxConnectionsPerIp, + config.connectionsMaxIdleMs, + config.maxConnectionsPerIpOverrides) + socketServer.startup() + + /* start replica manager */ + replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) + replicaManager.startup() + + /* start offset manager */ + offsetManager = createOffsetManager() + + /* start kafka controller */ + kafkaController = new KafkaController(config, zkClient, brokerState) + kafkaController.startup() + + /* start kafka coordinator */ + consumerCoordinator = new ConsumerCoordinator(config, zkClient) + consumerCoordinator.startup() + + /* start processing requests */ + apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config) + requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) + brokerState.newState(RunningAsBroker) + + Mx4jLoader.maybeLoad() + + /* start topic config manager */ + topicConfigManager = new TopicConfigManager(zkClient, logManager) + topicConfigManager.startup() + + /* tell everyone we are alive */ + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck.startup() + + /* register broker metrics */ + registerStats() + + startupComplete.set(true) + isStartingUp.set(false) + shutdownComplete.set(false) + shutdownLatch = new CountDownLatch(1) + info("started") + } } catch { case e: Throwable => fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e) + isStartingUp.set(false) shutdown() throw e } @@ -285,6 +304,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def shutdown() { try { info("shutting down") + + if(isStartingUp.get) + throw new IllegalStateException("Kafka server is still starting up, cannot shut down!") + + if(shutdownComplete.get) + return + val canShutdown = isShuttingDown.compareAndSet(false, true) if (canShutdown) { Utils.swallow(controlledShutdown()) @@ -312,13 +338,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(zkClient.close()) brokerState.newState(NotRunning) - shutdownLatch.countDown() + + shutdownComplete.set(true) + isShuttingDown.set(false) startupComplete.set(false) + shutdownLatch.countDown() info("shut down completed") } } catch { case e: Throwable => + isShuttingDown.set(false) fatal("Fatal error during KafkaServer shutdown.", e) throw e } diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index bf81a1a..4c70aa7 100644 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -17,25 +17,28 @@ package kafka.server -import scala.collection.{Seq, Set, mutable} import kafka.api._ +import kafka.common._ import kafka.cluster.Broker -import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.utils.Utils._ -import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException} -import kafka.common.TopicAndPartition import kafka.controller.KafkaController.StateChangeLogger +import scala.collection.{Seq, Set, mutable} +import kafka.utils.Logging +import kafka.utils.Utils._ + +import java.util.concurrent.locks.ReentrantReadWriteLock /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. */ -private[server] class MetadataCache { +private[server] class MetadataCache(brokerId: Int) extends Logging { private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() private var aliveBrokers: Map[Int, Broker] = Map() private val partitionMetadataLock = new ReentrantReadWriteLock() + this.logIdent = "[Kafka Metadata Cache on broker %d] ".format(brokerId) + def getTopicMetadata(topics: Set[String]) = { val isAllTopics = topics.isEmpty val topicsRequested = if(isAllTopics) cache.keySet else topics @@ -68,7 +71,7 @@ private[server] class MetadataCache { new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) } catch { case e: Throwable => - debug("Error while fetching metadata for %s. Possible cause: %s".format(topicPartition, e.getMessage)) + debug("Error while fetching metadata for %s: %s".format(topicPartition, e.toString)) new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 0bdd42f..83d5264 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -96,6 +96,8 @@ class OffsetManager(val config: OffsetManagerConfig, private val shuttingDown = new AtomicBoolean(false) + this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: " + scheduler.schedule(name = "offsets-cache-compactor", fun = compact, period = config.offsetsRetentionCheckIntervalMs, diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 81ae205..5374280 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -442,7 +442,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val producer: MirrorMakerBaseProducer, val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-producer-" + threadId - private val shutdownComplete: CountDownLatch = new CountDownLatch(1) + private val shutdownLatch: CountDownLatch = new CountDownLatch(1) this.logIdent = "[%s] ".format(threadName) setName(threadName) @@ -466,7 +466,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { case t: Throwable => fatal("Producer thread failure due to ", t) } finally { - shutdownComplete.countDown() + shutdownLatch.countDown() info("Producer thread stopped") // if it exits accidentally, stop the entire mirror maker if (!isShuttingdown.get()) { @@ -490,7 +490,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { def awaitShutdown() { try { - shutdownComplete.await() + shutdownLatch.await() producer.close() info("Producer thread shutdown complete") } catch { diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 1b7d5d8..8bcc927 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,14 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=OFF, stdout +log4j.rootLogger=TRACE, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=ERROR -log4j.logger.org.apache.kafka=ERROR +log4j.logger.kafka=TRACE +log4j.logger.org.apache.kafka=DEBUG # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 90c0b7a..11d6a97 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -272,7 +272,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { for (server <- servers) { server.shutdown() server.awaitShutdown() - server.startup + server.startup() Thread.sleep(2000) } diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index ba1e48e..78e37d5 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -119,7 +119,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val newProps = TestUtils.createBrokerConfig(0, port) newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535") val newConfig = new KafkaConfig(newProps) - var server = new KafkaServer(newConfig) + val server = new KafkaServer(newConfig) try { server.startup() fail("Expected KafkaServer setup to fail, throw exception") diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 54755e8..7c7a073 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -702,6 +702,12 @@ object TestUtils extends Logging { } def verifyNonDaemonThreadsStatus() { + + error("Kafka demon: " + Thread.getAllStackTraces.keySet().toArray + .map(_.asInstanceOf[Thread]) + .filter(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka")) + .map(t => t.getClass.getCanonicalName).toList) + assertEquals(0, Thread.getAllStackTraces.keySet().toArray .map(_.asInstanceOf[Thread]) .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) -- 1.7.12.4