From 79761d3a77a89784746d432b195296c3950e10b4 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 18 May 2015 16:27:09 -0700 Subject: [PATCH 1/4] dummy --- .../kafka/coordinator/ConsumerCoordinator.scala | 39 ++++++++++++++++++++++ core/src/main/scala/kafka/coordinator/Group.scala | 4 ++- core/src/main/scala/kafka/utils/ZkUtils.scala | 21 +++++++++--- 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 6f05488..3549555 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -24,6 +24,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable // TODO: expose MinSessionTimeoutMs and MaxSessionTimeoutMs in broker configs object ConsumerCoordinator { @@ -359,4 +360,42 @@ class ConsumerCoordinator(val config: KafkaConfig, private def shouldKeepConsumerAlive(consumer: Consumer, heartbeatDeadline: Long) = consumer.awaitingRebalance || consumer.latestHeartbeat > heartbeatDeadline - consumer.sessionTimeoutMs + + + private def getRegisteredConsumerGroup(zkClient: ZkClient, groupId: String): Option[Group] = { + val jsonPartitionMapOpt = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getConsumerGroupPath(groupId))._1 + jsonPartitionMapOpt match { + case Some(jsonPartitionMap) => Json.parseFull(jsonPartitionMap) match { + case Some(m) => { + val groupRegistryMap = m.asInstanceOf[Map[String, Any]] + // we can ignore the version number since we are at version 0 now + + val generationId: Int = groupRegistryMap.get("generationId") match { + case Some(id) => id.asInstanceOf[Int] + case None => 0 // set to default generation id + } + + val partitionStrategy: String = groupRegistryMap.get("partitionStrategy") match { + case Some(strategy) => strategy.asInstanceOf[String] + case None => PartitionAssignor.createInstance("roundrobin") // set to default partition strategy + } + + val members: mutable.HashMap[String, Consumer] = groupRegistryMap.get("members") match { + case Some(repl) => + val consumerList = repl.asInstanceOf[Seq[Any]] + consumerList.map { case consumer => + val consumerRegistryMap = consumer.asInstanceOf[Map[String, Any]] + val consumerId = consumer + } + case None => Map.empty[String, Consumer] + } + + Some(new Group(groupId, generationId, partitionStrategy)) + } + case None => None + } + case None => None + } + } + } diff --git a/core/src/main/scala/kafka/coordinator/Group.scala b/core/src/main/scala/kafka/coordinator/Group.scala index 048eeee..60fafbe 100644 --- a/core/src/main/scala/kafka/coordinator/Group.scala +++ b/core/src/main/scala/kafka/coordinator/Group.scala @@ -75,8 +75,11 @@ private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } */ @nonthreadsafe private[coordinator] class Group(val groupId: String, + var generationId: Int, val partitionAssignmentStrategy: String) { + def this(groupId: String, partitionAssignmentStrategy: String) = this(groupId, 0, partitionAssignmentStrategy) + private val validPreviousStates: Map[GroupState, Set[GroupState]] = Map(Dead -> Set(PreparingRebalance), Stable -> Set(Rebalancing), @@ -85,7 +88,6 @@ private[coordinator] class Group(val groupId: String, private val consumers = new mutable.HashMap[String, Consumer] private var state: GroupState = Stable - var generationId = 0 def is(groupState: GroupState) = state == groupState def has(consumerId: String) = consumers.contains(consumerId) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 2618dd3..4132b1b 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -36,17 +36,25 @@ import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition object ZkUtils extends Logging { - val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" + val BrokerSequenceIdPath = "/brokers/seqid" + + // TODO: this path is going to be replaced by consumer groups path + val ConsumersPath = "/consumers" + val TopicConfigPath = "/config/topics" val TopicConfigChangesPath = "/config/changes" + val ControllerPath = "/controller" val ControllerEpochPath = "/controller_epoch" - val ReassignPartitionsPath = "/admin/reassign_partitions" + + val CoordinatorPath = "/coordinator" + val ConsumerGroupsPath = "/coordinator/consumers" + val DeleteTopicsPath = "/admin/delete_topics" + val ReassignPartitionsPath = "/admin/reassign_partitions" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" - val BrokerSequenceIdPath = "/brokers/seqid" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -69,6 +77,9 @@ object ZkUtils extends Logging { } } + def getConsumerGroupPath(group: String): String = + ConsumerGroupsPath + "/" + group + def getTopicPartitionPath(topic: String, partitionId: Int): String = getTopicPartitionsPath(topic) + "/" + partitionId @@ -93,7 +104,7 @@ object ZkUtils extends Logging { def setupCommonPaths(zkClient: ZkClient) { for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, - DeleteTopicsPath, BrokerSequenceIdPath)) + DeleteTopicsPath, BrokerSequenceIdPath, ControllerPath, CoordinatorPath)) makeSurePersistentPathExists(zkClient, path) } @@ -599,7 +610,7 @@ object ZkUtils extends Logging { } } - // Parses without deduplicating keys so the the data can be checked before allowing reassignment to proceed + // Parses without deduplicating keys so that the data can be checked before allowing reassignment to proceed def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = { Json.parseFull(jsonData) match { case Some(m) => -- 1.7.12.4 From fb948f0890b7bad022dd5812d45c1a009621e965 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 19 May 2015 09:58:13 -0700 Subject: [PATCH 2/4] dummy 2 --- .../kafka/coordinator/ConsumerCoordinator.scala | 24 ++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 3549555..7665c53 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -25,6 +25,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable +import org.apache.kafka.common.KafkaException // TODO: expose MinSessionTimeoutMs and MaxSessionTimeoutMs in broker configs object ConsumerCoordinator { @@ -380,17 +381,32 @@ class ConsumerCoordinator(val config: KafkaConfig, case None => PartitionAssignor.createInstance("roundrobin") // set to default partition strategy } - val members: mutable.HashMap[String, Consumer] = groupRegistryMap.get("members") match { + val members: Seq[Consumer] = groupRegistryMap.get("members") match { case Some(repl) => val consumerList = repl.asInstanceOf[Seq[Any]] consumerList.map { case consumer => val consumerRegistryMap = consumer.asInstanceOf[Map[String, Any]] - val consumerId = consumer + val consumerId: String = consumerRegistryMap.get("consumerId") match { + case Some(id) => id.asInstanceOf[String] + case None => throw KafkaException("Some consumer ids are not specified for group %s while reading from ZK".format(groupId)) + } + val subscriptions: Set[String] = consumerRegistryMap.get("subscriptions") match { + case Some(topics) => topics.asInstanceOf[String].split(",").toSet + case None => throw KafkaException("Some consumer topic subscriptions are not specified for group %s while reading from ZK".format(groupId)) + } + val sessionTimeoutMs: Int = consumerRegistryMap.get("sessionTimeoutMs") match { + case Some(timeout) => timeout.asInstanceOf[Int] + case None => throw KafkaException("Some consumer session timeouts are not specified for group %s while reading from ZK".format(groupId)) + } + new Consumer(consumerId, groupId, subscriptions, sessionTimeoutMs) } case None => Map.empty[String, Consumer] } - - Some(new Group(groupId, generationId, partitionStrategy)) + val group = new Group(groupId, generationId, partitionStrategy) + for (member <- members) { + group.add(member.consumerId, member) + } + Some(group) } case None => None } -- 1.7.12.4 From a2c1f7e67da188795466c5476be578361e68d7e9 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 19 May 2015 13:58:33 -0700 Subject: [PATCH 3/4] dummy 3 --- .../kafka/coordinator/ConsumerCoordinator.scala | 52 ++++++++++++++++++++-- core/src/main/scala/kafka/coordinator/Group.scala | 2 + 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 7665c53..7d1faa5 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -16,16 +16,15 @@ */ package kafka.coordinator -import kafka.common.TopicAndPartition +import kafka.common.{KafkaException, TopicAndPartition} import kafka.server._ import kafka.utils._ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.JoinGroupRequest import org.I0Itec.zkclient.ZkClient +import org.I0Itec.zkclient.exception.ZkNoNodeException import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.mutable -import org.apache.kafka.common.KafkaException // TODO: expose MinSessionTimeoutMs and MaxSessionTimeoutMs in broker configs object ConsumerCoordinator { @@ -362,8 +361,53 @@ class ConsumerCoordinator(val config: KafkaConfig, private def shouldKeepConsumerAlive(consumer: Consumer, heartbeatDeadline: Long) = consumer.awaitingRebalance || consumer.latestHeartbeat > heartbeatDeadline - consumer.sessionTimeoutMs + private def updateConsumerGroupToZK(zkClient: ZkClient, group: Group) { + val zkPath = ZkUtils.getConsumerGroupPath(group.groupId) + // first verify the generation id + val jsonPartitionMapOpt = ZkUtils.readDataMaybeNull(zkClient, zkPath)._1 + val oldGenerationId: Int = jsonPartitionMapOpt match { + case Some(jsonPartitionMap) => Json.parseFull(jsonPartitionMap) match { + case Some(m) => { + m.asInstanceOf[Map[String, Any]].get("generationId") match { + case Some(id) => id.asInstanceOf[Int] + case None => 0 + } + } + case None => 0 + } + case None => 0 + } + + if (group.generationId != oldGenerationId + 1) { + info("Failed to update consumer group registry in ZK due to invalid generation id: old value %d, update value %d" + .format(oldGenerationId, group.generationId)) + } + + // write the group registry to ZK + val jsonData = Json.encode( + Map("version" -> 1, + "generationId" -> group.generationId, + "partitionStrategy" -> group.partitionAssignmentStrategy, + "members" -> group.allConsumers.map(consumer => + Map("consumerId" -> consumer.consumerId, + "subscriptions" -> consumer.topics.mkString(","), + "sessionTimeoutMs" -> consumer.sessionTimeoutMs) + ) + ) + ) + + try { + ZkUtils.updatePersistentPath(zkClient, zkPath, jsonData) + debug("Updated consumer group registry path %s as %s".format(zkPath, jsonData)) + } catch { + case nne: ZkNoNodeException => + ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) + debug("Created consumer group registry path %s as %s".format(zkPath, jsonData)) + case e2: Throwable => throw new KafkaException(e2.toString) + } + } - private def getRegisteredConsumerGroup(zkClient: ZkClient, groupId: String): Option[Group] = { + private def getConsumerGroupFromZK(zkClient: ZkClient, groupId: String): Option[Group] = { val jsonPartitionMapOpt = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getConsumerGroupPath(groupId))._1 jsonPartitionMapOpt match { case Some(jsonPartitionMap) => Json.parseFull(jsonPartitionMap) match { diff --git a/core/src/main/scala/kafka/coordinator/Group.scala b/core/src/main/scala/kafka/coordinator/Group.scala index 60fafbe..4111b70 100644 --- a/core/src/main/scala/kafka/coordinator/Group.scala +++ b/core/src/main/scala/kafka/coordinator/Group.scala @@ -111,6 +111,8 @@ private[coordinator] class Group(val groupId: String, def notYetRejoinedConsumers = consumers.values.filter(!_.awaitingRebalance).toList + def allConsumers = consumers.values.toList + def rebalanceTimeout = consumers.values.foldLeft(0) {(timeout, consumer) => timeout.max(consumer.sessionTimeoutMs) } -- 1.7.12.4 From f91155e7a28eb27b590775b51917d8501e19e356 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 19 May 2015 17:21:51 -0700 Subject: [PATCH 4/4] dummy 5 --- .../kafka/coordinator/ConsumerCoordinator.scala | 44 +++++++++++++++++----- .../kafka/coordinator/CoordinatorMetadata.scala | 15 ++++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +- 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 7d1faa5..77c3357 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -67,6 +67,7 @@ class ConsumerCoordinator(val config: KafkaConfig, joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup]("JoinGroup", config.brokerId) rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId) coordinatorMetadata = new CoordinatorMetadata(config, zkClient, maybePrepareRebalance) + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.ConsumerGroupsPath) isActive.set(true) info("Startup complete.") } @@ -100,12 +101,22 @@ class ConsumerCoordinator(val config: KafkaConfig, } else if (sessionTimeoutMs < MinSessionTimeoutMs || sessionTimeoutMs > MaxSessionTimeoutMs) { responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code) } else { - val group = coordinatorMetadata.getGroup(groupId) + var group = coordinatorMetadata.getGroup(groupId) + // if the group is not in the metadata cache try to get it from ZK, + // if not in ZK either then try to create a new group in cache, which + // will only be registered in ZK after its first rebalance is done if (group == null) { - if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) { - responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) + group = getConsumerGroupFromZK(zkClient, groupId).orNull + + if (group == null) { + if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) { + responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) + } else { + group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) + doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) + } } else { - val group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) + group = coordinatorMetadata.tryAddGroup(group) doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) } } else { @@ -323,6 +334,7 @@ class ConsumerCoordinator(val config: KafkaConfig, def onCompleteRebalance(group: Group) { group synchronized { + // remove any not-yet-rejoined consumers from the group val failedConsumers = group.notYetRejoinedConsumers if (group.isEmpty || !failedConsumers.isEmpty) { failedConsumers.foreach { failedConsumer => @@ -330,12 +342,18 @@ class ConsumerCoordinator(val config: KafkaConfig, // TODO: cut the socket connection to the consumer } + // update the group registry accordingly if (group.isEmpty) { group.transitionTo(Dead) - info("Group %s generation %s is dead".format(group.groupId, group.generationId)) coordinatorMetadata.removeGroup(group.groupId, group.topics) + removeConsumerGroupFromZK(zkClient, group.groupId) + info("Group %s generation %d is dead".format(group.groupId, group.generationId)) + } else { + updateConsumerGroupToZK(zkClient, group) + info("Group %s has been updated to generation %d") } } + if (!group.is(Dead)) rebalance(group) } @@ -407,6 +425,12 @@ class ConsumerCoordinator(val config: KafkaConfig, } } + private def removeConsumerGroupFromZK(zkClient: ZkClient, groupId: String) { + // note that it is OK if deletePath return false, which mean either someone + // else has removed it or it has not been registered in ZK yet. + ZkUtils.deletePath(zkClient, ZkUtils.getConsumerGroupPath(groupId)) + } + private def getConsumerGroupFromZK(zkClient: ZkClient, groupId: String): Option[Group] = { val jsonPartitionMapOpt = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getConsumerGroupPath(groupId))._1 jsonPartitionMapOpt match { @@ -422,7 +446,7 @@ class ConsumerCoordinator(val config: KafkaConfig, val partitionStrategy: String = groupRegistryMap.get("partitionStrategy") match { case Some(strategy) => strategy.asInstanceOf[String] - case None => PartitionAssignor.createInstance("roundrobin") // set to default partition strategy + case None => "roundrobin" // set to default partition strategy } val members: Seq[Consumer] = groupRegistryMap.get("members") match { @@ -432,19 +456,19 @@ class ConsumerCoordinator(val config: KafkaConfig, val consumerRegistryMap = consumer.asInstanceOf[Map[String, Any]] val consumerId: String = consumerRegistryMap.get("consumerId") match { case Some(id) => id.asInstanceOf[String] - case None => throw KafkaException("Some consumer ids are not specified for group %s while reading from ZK".format(groupId)) + case None => throw new KafkaException("Some consumer ids are not specified for group %s while reading from ZK".format(groupId)) } val subscriptions: Set[String] = consumerRegistryMap.get("subscriptions") match { case Some(topics) => topics.asInstanceOf[String].split(",").toSet - case None => throw KafkaException("Some consumer topic subscriptions are not specified for group %s while reading from ZK".format(groupId)) + case None => throw new KafkaException("Some consumer topic subscriptions are not specified for group %s while reading from ZK".format(groupId)) } val sessionTimeoutMs: Int = consumerRegistryMap.get("sessionTimeoutMs") match { case Some(timeout) => timeout.asInstanceOf[Int] - case None => throw KafkaException("Some consumer session timeouts are not specified for group %s while reading from ZK".format(groupId)) + case None => throw new KafkaException("Some consumer session timeouts are not specified for group %s while reading from ZK".format(groupId)) } new Consumer(consumerId, groupId, subscriptions, sessionTimeoutMs) } - case None => Map.empty[String, Consumer] + case None => Seq.empty[Consumer] } val group = new Group(groupId, generationId, partitionStrategy) for (member <- members) { diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index 88e82b6..a71b6ea 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -26,6 +26,7 @@ import org.I0Itec.zkclient.{ZkClient, IZkDataListener} import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.mutable +import org.apache.kafka.common.utils.Utils /** * CoordinatorMetadata manages group and topic metadata. @@ -85,6 +86,20 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, } /** + * Try to add a group with the given groupId if it does not exist yet + */ + def tryAddGroup(group: Group) = { + inWriteLock(metadataLock) { + if (!groups.contains(group.groupId)) { + groups.put(group.groupId, group) + group + } else { + Utils.notNull(groups.get(group.groupId).orNull) + } + } + } + + /** * Remove all metadata associated with the group, including its topics * @param groupId the groupId of the group we are removing * @param topicsForGroup topics that consumers in the group were subscribed to diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 4132b1b..e7b56aa 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -104,7 +104,7 @@ object ZkUtils extends Logging { def setupCommonPaths(zkClient: ZkClient) { for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, - DeleteTopicsPath, BrokerSequenceIdPath, ControllerPath, CoordinatorPath)) + DeleteTopicsPath, BrokerSequenceIdPath)) makeSurePersistentPathExists(zkClient, path) } -- 1.7.12.4