From afa70bbb4b5f37842db63f72dee3d5860ad91aed Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 18 May 2015 16:27:09 -0700 Subject: [PATCH 1/6] dummy --- .../kafka/coordinator/ConsumerCoordinator.scala | 37 ++++++++++++++++++++++ .../kafka/coordinator/ConsumerGroupMetadata.scala | 4 ++- core/src/main/scala/kafka/utils/ZkUtils.scala | 21 +++++++++--- 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index af06ad4..b849fc5 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -24,6 +24,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable // TODO: expose MinSessionTimeoutMs and MaxSessionTimeoutMs in broker configs object ConsumerCoordinator { @@ -342,4 +343,40 @@ class ConsumerCoordinator(val config: KafkaConfig, private def shouldKeepConsumerAlive(consumer: ConsumerMetadata, heartbeatDeadline: Long) = consumer.awaitingRebalanceCallback != null || consumer.latestHeartbeat + consumer.sessionTimeoutMs > heartbeatDeadline + + private def getRegisteredConsumerGroup(zkClient: ZkClient, groupId: String): Option[ConsumerGroupMetadata] = { + val jsonPartitionMapOpt = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getConsumerGroupPath(groupId))._1 + jsonPartitionMapOpt match { + case Some(jsonPartitionMap) => Json.parseFull(jsonPartitionMap) match { + case Some(m) => { + val groupRegistryMap = m.asInstanceOf[Map[String, Any]] + // we can ignore the version number since we are at version 0 now + + val generationId: Int = groupRegistryMap.get("generationId") match { + case Some(id) => id.asInstanceOf[Int] + case None => 0 // set to default generation id + } + + val partitionStrategy: String = groupRegistryMap.get("partitionStrategy") match { + case Some(strategy) => strategy.asInstanceOf[String] + case None => PartitionAssignor.createInstance("roundrobin") // set to default partition strategy + } + + val members: mutable.HashMap[String, ConsumerMetadata] = groupRegistryMap.get("members") match { + case Some(repl) => + val consumerList = repl.asInstanceOf[Seq[Any]] + consumerList.map { case consumer => + val consumerRegistryMap = consumer.asInstanceOf[Map[String, Any]] + val consumerId = consumer + } + case None => Map.empty[String, ConsumerMetadata] + } + + Some(new ConsumerGroupMetadata(groupId, generationId, partitionStrategy)) + } + case None => None + } + case None => None + } + } } diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala index 47bdfa7..7841a78 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala @@ -75,8 +75,11 @@ private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } */ @nonthreadsafe private[coordinator] class ConsumerGroupMetadata(val groupId: String, + var generationId: Int, val partitionAssignmentStrategy: String) { + def this(groupId: String, partitionAssignmentStrategy: String) = this(groupId, 0, partitionAssignmentStrategy) + private val validPreviousStates: Map[GroupState, Set[GroupState]] = Map(Dead -> Set(PreparingRebalance), Stable -> Set(Rebalancing), @@ -85,7 +88,6 @@ private[coordinator] class ConsumerGroupMetadata(val groupId: String, private val consumers = new mutable.HashMap[String, ConsumerMetadata] private var state: GroupState = Stable - var generationId = 0 def is(groupState: GroupState) = state == groupState def has(consumerId: String) = consumers.contains(consumerId) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 2618dd3..4132b1b 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -36,17 +36,25 @@ import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition object ZkUtils extends Logging { - val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" + val BrokerSequenceIdPath = "/brokers/seqid" + + // TODO: this path is going to be replaced by consumer groups path + val ConsumersPath = "/consumers" + val TopicConfigPath = "/config/topics" val TopicConfigChangesPath = "/config/changes" + val ControllerPath = "/controller" val ControllerEpochPath = "/controller_epoch" - val ReassignPartitionsPath = "/admin/reassign_partitions" + + val CoordinatorPath = "/coordinator" + val ConsumerGroupsPath = "/coordinator/consumers" + val DeleteTopicsPath = "/admin/delete_topics" + val ReassignPartitionsPath = "/admin/reassign_partitions" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" - val BrokerSequenceIdPath = "/brokers/seqid" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -69,6 +77,9 @@ object ZkUtils extends Logging { } } + def getConsumerGroupPath(group: String): String = + ConsumerGroupsPath + "/" + group + def getTopicPartitionPath(topic: String, partitionId: Int): String = getTopicPartitionsPath(topic) + "/" + partitionId @@ -93,7 +104,7 @@ object ZkUtils extends Logging { def setupCommonPaths(zkClient: ZkClient) { for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, - DeleteTopicsPath, BrokerSequenceIdPath)) + DeleteTopicsPath, BrokerSequenceIdPath, ControllerPath, CoordinatorPath)) makeSurePersistentPathExists(zkClient, path) } @@ -599,7 +610,7 @@ object ZkUtils extends Logging { } } - // Parses without deduplicating keys so the the data can be checked before allowing reassignment to proceed + // Parses without deduplicating keys so that the data can be checked before allowing reassignment to proceed def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = { Json.parseFull(jsonData) match { case Some(m) => -- 1.7.12.4 From 3f667b8d18114568aa9e963ece7ba143afbe0381 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 19 May 2015 09:58:13 -0700 Subject: [PATCH 2/6] dummy 2 --- .../kafka/coordinator/ConsumerCoordinator.scala | 24 ++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index b849fc5..9877114 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -25,6 +25,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable +import org.apache.kafka.common.KafkaException // TODO: expose MinSessionTimeoutMs and MaxSessionTimeoutMs in broker configs object ConsumerCoordinator { @@ -362,17 +363,32 @@ class ConsumerCoordinator(val config: KafkaConfig, case None => PartitionAssignor.createInstance("roundrobin") // set to default partition strategy } - val members: mutable.HashMap[String, ConsumerMetadata] = groupRegistryMap.get("members") match { + val members: Seq[ConsumerMetadata] = groupRegistryMap.get("members") match { case Some(repl) => val consumerList = repl.asInstanceOf[Seq[Any]] consumerList.map { case consumer => val consumerRegistryMap = consumer.asInstanceOf[Map[String, Any]] - val consumerId = consumer + val consumerId: String = consumerRegistryMap.get("consumerId") match { + case Some(id) => id.asInstanceOf[String] + case None => throw KafkaException("Some consumer ids are not specified for group %s while reading from ZK".format(groupId)) + } + val subscriptions: Set[String] = consumerRegistryMap.get("subscriptions") match { + case Some(topics) => topics.asInstanceOf[String].split(",").toSet + case None => throw KafkaException("Some consumer topic subscriptions are not specified for group %s while reading from ZK".format(groupId)) + } + val sessionTimeoutMs: Int = consumerRegistryMap.get("sessionTimeoutMs") match { + case Some(timeout) => timeout.asInstanceOf[Int] + case None => throw KafkaException("Some consumer session timeouts are not specified for group %s while reading from ZK".format(groupId)) + } + new ConsumerMetadata(consumerId, groupId, subscriptions, sessionTimeoutMs) } case None => Map.empty[String, ConsumerMetadata] } - - Some(new ConsumerGroupMetadata(groupId, generationId, partitionStrategy)) + val group = new ConsumerGroupMetadata(groupId, generationId, partitionStrategy) + for (member <- members) { + group.add(member.consumerId, member) + } + Some(group) } case None => None } -- 1.7.12.4 From c72b29a934d1498a77819d1b3f4d73b9c6df7ebe Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 19 May 2015 13:58:33 -0700 Subject: [PATCH 3/6] dummy 3 --- .../kafka/coordinator/ConsumerCoordinator.scala | 53 ++++++++++++++++++++-- .../kafka/coordinator/ConsumerGroupMetadata.scala | 2 + 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 9877114..f52b3e6 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -16,16 +16,15 @@ */ package kafka.coordinator -import kafka.common.TopicAndPartition +import kafka.common.{KafkaException, TopicAndPartition} import kafka.server._ import kafka.utils._ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.JoinGroupRequest import org.I0Itec.zkclient.ZkClient +import org.I0Itec.zkclient.exception.ZkNoNodeException import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.mutable -import org.apache.kafka.common.KafkaException // TODO: expose MinSessionTimeoutMs and MaxSessionTimeoutMs in broker configs object ConsumerCoordinator { @@ -345,7 +344,53 @@ class ConsumerCoordinator(val config: KafkaConfig, private def shouldKeepConsumerAlive(consumer: ConsumerMetadata, heartbeatDeadline: Long) = consumer.awaitingRebalanceCallback != null || consumer.latestHeartbeat + consumer.sessionTimeoutMs > heartbeatDeadline - private def getRegisteredConsumerGroup(zkClient: ZkClient, groupId: String): Option[ConsumerGroupMetadata] = { + private def updateConsumerGroupToZK(zkClient: ZkClient, group: ConsumerGroupMetadata) { + val zkPath = ZkUtils.getConsumerGroupPath(group.groupId) + // first verify the generation id + val jsonPartitionMapOpt = ZkUtils.readDataMaybeNull(zkClient, zkPath)._1 + val oldGenerationId: Int = jsonPartitionMapOpt match { + case Some(jsonPartitionMap) => Json.parseFull(jsonPartitionMap) match { + case Some(m) => { + m.asInstanceOf[Map[String, Any]].get("generationId") match { + case Some(id) => id.asInstanceOf[Int] + case None => 0 + } + } + case None => 0 + } + case None => 0 + } + + if (group.generationId != oldGenerationId + 1) { + info("Failed to update consumer group registry in ZK due to invalid generation id: old value %d, update value %d" + .format(oldGenerationId, group.generationId)) + } + + // write the group registry to ZK + val jsonData = Json.encode( + Map("version" -> 1, + "generationId" -> group.generationId, + "partitionStrategy" -> group.partitionAssignmentStrategy, + "members" -> group.allConsumers.map(consumer => + Map("consumerId" -> consumer.consumerId, + "subscriptions" -> consumer.topics.mkString(","), + "sessionTimeoutMs" -> consumer.sessionTimeoutMs) + ) + ) + ) + + try { + ZkUtils.updatePersistentPath(zkClient, zkPath, jsonData) + debug("Updated consumer group registry path %s as %s".format(zkPath, jsonData)) + } catch { + case nne: ZkNoNodeException => + ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) + debug("Created consumer group registry path %s as %s".format(zkPath, jsonData)) + case e2: Throwable => throw new KafkaException(e2.toString) + } + } + + private def getConsumerGroupFromZK(zkClient: ZkClient, groupId: String): Option[ConsumerGroupMetadata] = { val jsonPartitionMapOpt = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getConsumerGroupPath(groupId))._1 jsonPartitionMapOpt match { case Some(jsonPartitionMap) => Json.parseFull(jsonPartitionMap) match { diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala index 7841a78..9d62b96 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala @@ -111,6 +111,8 @@ private[coordinator] class ConsumerGroupMetadata(val groupId: String, def allConsumers = consumers.values.toList + def allConsumers = consumers.values.toList + def rebalanceTimeout = consumers.values.foldLeft(0) {(timeout, consumer) => timeout.max(consumer.sessionTimeoutMs) } -- 1.7.12.4 From 7fafed29685213686b42e978b04731c373c7ed26 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 19 May 2015 17:21:51 -0700 Subject: [PATCH 4/6] dummy 5 --- .../kafka/coordinator/ConsumerCoordinator.scala | 48 +++++++++++++++++----- .../kafka/coordinator/CoordinatorMetadata.scala | 15 +++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +- 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index f52b3e6..b635838 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -65,6 +65,7 @@ class ConsumerCoordinator(val config: KafkaConfig, heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId) rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId) coordinatorMetadata = new CoordinatorMetadata(config, zkClient, maybePrepareRebalance) + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.ConsumerGroupsPath) isActive.set(true) info("Startup complete.") } @@ -97,12 +98,22 @@ class ConsumerCoordinator(val config: KafkaConfig, } else if (sessionTimeoutMs < MinSessionTimeoutMs || sessionTimeoutMs > MaxSessionTimeoutMs) { responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code) } else { - val group = coordinatorMetadata.getGroup(groupId) + var group = coordinatorMetadata.getGroup(groupId) + // if the group is not in the metadata cache try to get it from ZK, + // if not in ZK either then try to create a new group in cache, which + // will only be registered in ZK after its first rebalance is done if (group == null) { - if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) { - responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) + group = getConsumerGroupFromZK(zkClient, groupId).orNull + + if (group == null) { + if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) { + responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) + } else { + group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) + doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) + } } else { - val group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) + group = coordinatorMetadata.tryAddGroup(group) doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) } } else { @@ -296,6 +307,7 @@ class ConsumerCoordinator(val config: KafkaConfig, def onCompleteRebalance(group: ConsumerGroupMetadata) { group synchronized { + // remove any not-yet-rejoined consumers from the group val failedConsumers = group.notYetRejoinedConsumers if (group.isEmpty || !failedConsumers.isEmpty) { failedConsumers.foreach { failedConsumer => @@ -303,14 +315,24 @@ class ConsumerCoordinator(val config: KafkaConfig, // TODO: cut the socket connection to the consumer } + // update the group registry accordingly if (group.isEmpty) { group.transitionTo(Dead) - info("Group %s generation %s is dead".format(group.groupId, group.generationId)) coordinatorMetadata.removeGroup(group.groupId, group.topics) + removeConsumerGroupFromZK(zkClient, group.groupId) + info("Group %s generation %d is dead".format(group.groupId, group.generationId)) + } else { + updateConsumerGroupToZK(zkClient, group) + info("Group %s has been updated to generation %d") } } +<<<<<<< HEAD if (!group.is(Dead)) { // assign partitions to existing consumers of the group according to the partitioning strategy +======= + + if (!group.is(Dead)) +>>>>>>> dummy 5 rebalance(group) // trigger the awaiting join group response callback for all the consumers after rebalancing @@ -390,6 +412,12 @@ class ConsumerCoordinator(val config: KafkaConfig, } } + private def removeConsumerGroupFromZK(zkClient: ZkClient, groupId: String) { + // note that it is OK if deletePath return false, which mean either someone + // else has removed it or it has not been registered in ZK yet. + ZkUtils.deletePath(zkClient, ZkUtils.getConsumerGroupPath(groupId)) + } + private def getConsumerGroupFromZK(zkClient: ZkClient, groupId: String): Option[ConsumerGroupMetadata] = { val jsonPartitionMapOpt = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getConsumerGroupPath(groupId))._1 jsonPartitionMapOpt match { @@ -405,7 +433,7 @@ class ConsumerCoordinator(val config: KafkaConfig, val partitionStrategy: String = groupRegistryMap.get("partitionStrategy") match { case Some(strategy) => strategy.asInstanceOf[String] - case None => PartitionAssignor.createInstance("roundrobin") // set to default partition strategy + case None => "roundrobin" // set to default partition strategy } val members: Seq[ConsumerMetadata] = groupRegistryMap.get("members") match { @@ -415,19 +443,19 @@ class ConsumerCoordinator(val config: KafkaConfig, val consumerRegistryMap = consumer.asInstanceOf[Map[String, Any]] val consumerId: String = consumerRegistryMap.get("consumerId") match { case Some(id) => id.asInstanceOf[String] - case None => throw KafkaException("Some consumer ids are not specified for group %s while reading from ZK".format(groupId)) + case None => throw new KafkaException("Some consumer ids are not specified for group %s while reading from ZK".format(groupId)) } val subscriptions: Set[String] = consumerRegistryMap.get("subscriptions") match { case Some(topics) => topics.asInstanceOf[String].split(",").toSet - case None => throw KafkaException("Some consumer topic subscriptions are not specified for group %s while reading from ZK".format(groupId)) + case None => throw new KafkaException("Some consumer topic subscriptions are not specified for group %s while reading from ZK".format(groupId)) } val sessionTimeoutMs: Int = consumerRegistryMap.get("sessionTimeoutMs") match { case Some(timeout) => timeout.asInstanceOf[Int] - case None => throw KafkaException("Some consumer session timeouts are not specified for group %s while reading from ZK".format(groupId)) + case None => throw new KafkaException("Some consumer session timeouts are not specified for group %s while reading from ZK".format(groupId)) } new ConsumerMetadata(consumerId, groupId, subscriptions, sessionTimeoutMs) } - case None => Map.empty[String, ConsumerMetadata] + case None => Seq.empty[ConsumerMetadata] } val group = new ConsumerGroupMetadata(groupId, generationId, partitionStrategy) for (member <- members) { diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index c39e6de..055e214 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -26,6 +26,7 @@ import org.I0Itec.zkclient.{ZkClient, IZkDataListener} import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.mutable +import org.apache.kafka.common.utils.Utils /** * CoordinatorMetadata manages group and topic metadata. @@ -85,6 +86,20 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, } /** + * Try to add a group with the given groupId if it does not exist yet + */ + def tryAddGroup(group: Group) = { + inWriteLock(metadataLock) { + if (!groups.contains(group.groupId)) { + groups.put(group.groupId, group) + group + } else { + Utils.notNull(groups.get(group.groupId).orNull) + } + } + } + + /** * Remove all metadata associated with the group, including its topics * @param groupId the groupId of the group we are removing * @param topicsForGroup topics that consumers in the group were subscribed to diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 4132b1b..e7b56aa 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -104,7 +104,7 @@ object ZkUtils extends Logging { def setupCommonPaths(zkClient: ZkClient) { for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, - DeleteTopicsPath, BrokerSequenceIdPath, ControllerPath, CoordinatorPath)) + DeleteTopicsPath, BrokerSequenceIdPath)) makeSurePersistentPathExists(zkClient, path) } -- 1.7.12.4 From b5566e2db8a0e0696def4ba7b7e56c088900ed59 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 20 May 2015 09:10:23 -0700 Subject: [PATCH 5/6] dummy --- core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala | 7 +------ core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala | 2 -- core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala | 2 +- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index b635838..b45e759 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -326,13 +326,8 @@ class ConsumerCoordinator(val config: KafkaConfig, info("Group %s has been updated to generation %d") } } -<<<<<<< HEAD - if (!group.is(Dead)) { - // assign partitions to existing consumers of the group according to the partitioning strategy -======= - if (!group.is(Dead)) ->>>>>>> dummy 5 + if (!group.is(Dead)) { rebalance(group) // trigger the awaiting join group response callback for all the consumers after rebalancing diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala index 9d62b96..7841a78 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala @@ -111,8 +111,6 @@ private[coordinator] class ConsumerGroupMetadata(val groupId: String, def allConsumers = consumers.values.toList - def allConsumers = consumers.values.toList - def rebalanceTimeout = consumers.values.foldLeft(0) {(timeout, consumer) => timeout.max(consumer.sessionTimeoutMs) } diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index 055e214..7208ecb 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -88,7 +88,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, /** * Try to add a group with the given groupId if it does not exist yet */ - def tryAddGroup(group: Group) = { + def tryAddGroup(group: ConsumerGroupMetadata) = { inWriteLock(metadataLock) { if (!groups.contains(group.groupId)) { groups.put(group.groupId, group) -- 1.7.12.4 From a928ef0cc1dce327a13b82c9bec1cc2e20f3b757 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 21 May 2015 19:01:50 -0700 Subject: [PATCH 6/6] v2: Jun's comments --- .../kafka/coordinator/ConsumerCoordinator.scala | 32 ++++++++++------------ .../kafka/coordinator/CoordinatorMetadata.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 1 - 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index b45e759..1043325 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -390,27 +390,22 @@ class ConsumerCoordinator(val config: KafkaConfig, "partitionStrategy" -> group.partitionAssignmentStrategy, "members" -> group.allConsumers.map(consumer => Map("consumerId" -> consumer.consumerId, - "subscriptions" -> consumer.topics.mkString(","), + "subscriptions" -> consumer.topics.toArray, "sessionTimeoutMs" -> consumer.sessionTimeoutMs) ) ) ) - try { - ZkUtils.updatePersistentPath(zkClient, zkPath, jsonData) - debug("Updated consumer group registry path %s as %s".format(zkPath, jsonData)) - } catch { - case nne: ZkNoNodeException => - ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) - debug("Created consumer group registry path %s as %s".format(zkPath, jsonData)) - case e2: Throwable => throw new KafkaException(e2.toString) - } + ZkUtils.updatePersistentPath(zkClient, zkPath, jsonData) + debug("Updated consumer group registry path %s as %s".format(zkPath, jsonData)) } private def removeConsumerGroupFromZK(zkClient: ZkClient, groupId: String) { // note that it is OK if deletePath return false, which mean either someone // else has removed it or it has not been registered in ZK yet. - ZkUtils.deletePath(zkClient, ZkUtils.getConsumerGroupPath(groupId)) + val zkPath = ZkUtils.getConsumerGroupPath(groupId) + ZkUtils.deletePath(zkClient, zkPath) + debug("Deleted consumer group registry path %s".format(zkPath)) } private def getConsumerGroupFromZK(zkClient: ZkClient, groupId: String): Option[ConsumerGroupMetadata] = { @@ -423,12 +418,12 @@ class ConsumerCoordinator(val config: KafkaConfig, val generationId: Int = groupRegistryMap.get("generationId") match { case Some(id) => id.asInstanceOf[Int] - case None => 0 // set to default generation id + case None => throw new KafkaException("Error parsing consumer group registry: generation id not found from %s".format(groupRegistryMap)) } val partitionStrategy: String = groupRegistryMap.get("partitionStrategy") match { case Some(strategy) => strategy.asInstanceOf[String] - case None => "roundrobin" // set to default partition strategy + case None => throw new KafkaException("Error parsing consumer group registry: partition strategy not found from %s".format(groupRegistryMap)) } val members: Seq[ConsumerMetadata] = groupRegistryMap.get("members") match { @@ -438,15 +433,18 @@ class ConsumerCoordinator(val config: KafkaConfig, val consumerRegistryMap = consumer.asInstanceOf[Map[String, Any]] val consumerId: String = consumerRegistryMap.get("consumerId") match { case Some(id) => id.asInstanceOf[String] - case None => throw new KafkaException("Some consumer ids are not specified for group %s while reading from ZK".format(groupId)) + case None => throw new KafkaException(("Error parsing consumer group registry: " + + "some consumer ids are not specified for group %s in %s").format(groupId, consumerRegistryMap)) } val subscriptions: Set[String] = consumerRegistryMap.get("subscriptions") match { - case Some(topics) => topics.asInstanceOf[String].split(",").toSet - case None => throw new KafkaException("Some consumer topic subscriptions are not specified for group %s while reading from ZK".format(groupId)) + case Some(topics) => topics.asInstanceOf[Seq[String]].toSet + case None => throw new KafkaException(("Error parsing consumer group registry: " + + "some consumer topic subscriptions are not specified for group %s in %s").format(groupId, consumerRegistryMap)) } val sessionTimeoutMs: Int = consumerRegistryMap.get("sessionTimeoutMs") match { case Some(timeout) => timeout.asInstanceOf[Int] - case None => throw new KafkaException("Some consumer session timeouts are not specified for group %s while reading from ZK".format(groupId)) + case None => throw new KafkaException(("Error parsing consumer group registry: " + + "some consumer session timeouts are not specified for group %s in %s").format(groupId, consumerRegistryMap)) } new ConsumerMetadata(consumerId, groupId, subscriptions, sessionTimeoutMs) } diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index 7208ecb..fe666d4 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -94,7 +94,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, groups.put(group.groupId, group) group } else { - Utils.notNull(groups.get(group.groupId).orNull) + groups(group.groupId) } } } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index e7b56aa..f56e3ce 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -49,7 +49,6 @@ object ZkUtils extends Logging { val ControllerPath = "/controller" val ControllerEpochPath = "/controller_epoch" - val CoordinatorPath = "/coordinator" val ConsumerGroupsPath = "/coordinator/consumers" val DeleteTopicsPath = "/admin/delete_topics" -- 1.7.12.4