From 121887bbabe4edc669d4a378657846e40f44376a Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 4 Jun 2015 17:57:12 -0700 Subject: [PATCH 1/5] in-progress --- .../clients/consumer/internals/Coordinator.java | 20 +++- .../org/apache/kafka/common/protocol/Errors.java | 6 +- .../common/requests/OffsetCommitResponse.java | 8 +- .../kafka/common/requests/OffsetFetchResponse.java | 4 +- core/src/main/scala/kafka/admin/TopicCommand.scala | 4 +- core/src/main/scala/kafka/cluster/Partition.scala | 5 +- .../kafka/common/OffsetMetadataAndError.scala | 14 ++- core/src/main/scala/kafka/common/Topic.scala | 4 +- .../kafka/coordinator/ConsumerCoordinator.scala | 116 ++++++++++++++++++--- .../kafka/coordinator/CoordinatorMetadata.scala | 4 +- core/src/main/scala/kafka/server/KafkaApis.scala | 36 +++---- core/src/main/scala/kafka/server/KafkaServer.scala | 40 +++---- .../main/scala/kafka/server/OffsetManager.scala | 32 +++--- .../main/scala/kafka/server/ReplicaManager.scala | 8 +- .../scala/integration/kafka/api/ConsumerTest.scala | 7 +- .../kafka/api/IntegrationTestHarness.scala | 9 +- .../scala/unit/kafka/admin/TopicCommandTest.scala | 8 +- .../unit/kafka/consumer/TopicFilterTest.scala | 9 +- 18 files changed, 219 insertions(+), 115 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index c1496a0..1e77019 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -257,14 +257,15 @@ public final class Coordinator { // just retry offsetsReady = false; Utils.sleep(this.retryBackoffMs); + } else if (data.errorCode == Errors.UNKNOWN_CONSUMER_ID.code() + || data.errorCode == Errors.ILLEGAL_GENERATION.code()) { + // need to re-join group + subscriptions.needReassignment(); } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { // re-discover the coordinator and retry coordinatorDead(); offsetsReady = false; Utils.sleep(this.retryBackoffMs); - } else if (data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { - // just ignore this partition - log.debug("Unknown topic or partition for " + tp); } else { throw new KafkaException("Unexpected error in fetch offset response: " + Errors.forCode(data.errorCode).exception().getMessage()); @@ -542,12 +543,23 @@ public final class Coordinator { } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { coordinatorDead(); - } else { + } else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code() + || errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) { // do not need to throw the exception but just log the error log.error("Error committing partition {} at offset {}: {}", tp, offset, Errors.forCode(errorCode).exception().getMessage()); + } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code() + || errorCode == Errors.ILLEGAL_GENERATION.code()) { + // need to re-join group + subscriptions.needReassignment(); + } else { + // re-throw the exception as these should not happen + log.error("Error committing partition {} at offset {}: {}", + tp, + offset, + Errors.forCode(errorCode).exception().getMessage()); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 5b898c8..4c0ecc3 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -77,7 +77,11 @@ public enum Errors { UNKNOWN_CONSUMER_ID(25, new ApiException("The coordinator is not aware of this consumer.")), INVALID_SESSION_TIMEOUT(26, - new ApiException("The session timeout is not within an acceptable range.")); + new ApiException("The session timeout is not within an acceptable range.")), + COMMITTING_PARTITIONS_NOT_ASSIGNED(27, + new ApiException("Some of the committing partitions are not assigned the committer")), + INVALID_COMMIT_OFFSET_SIZE(28, + new ApiException("The committing offset data size is not valid")); private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 70844d6..a163333 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -41,7 +41,13 @@ public class OffsetCommitResponse extends AbstractRequestResponse { /** * Possible error code: * - * TODO + * OFFSET_METADATA_TOO_LARGE (12) + * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_CONSUMER (16) + * ILLEGAL_GENERATION (22) + * UNKNOWN_CONSUMER_ID (25) + * COMMITTING_PARTITIONS_NOT_ASSIGNED (27) + * INVALID_COMMIT_OFFSET_SIZE (28) */ private final Map responseData; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 512a0ef..096d979 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -47,10 +47,10 @@ public class OffsetFetchResponse extends AbstractRequestResponse { /** * Possible error code: * - * UNKNOWN_TOPIC_OR_PARTITION (3) * OFFSET_LOAD_IN_PROGRESS (14) * NOT_COORDINATOR_FOR_CONSUMER (16) - * NO_OFFSETS_FETCHABLE (23) + * ILLEGAL_GENERATION (22) + * UNKNOWN_CONSUMER_ID (25) */ private final Map responseData; diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index dacbdd0..a2ecb96 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -27,8 +27,8 @@ import scala.collection._ import scala.collection.JavaConversions._ import kafka.log.LogConfig import kafka.consumer.Whitelist -import kafka.server.OffsetManager import org.apache.kafka.common.utils.Utils +import kafka.coordinator.ConsumerCoordinator object TopicCommand { @@ -111,7 +111,7 @@ object TopicCommand { println("Updated config for topic \"%s\".".format(topic)) } if(opts.options.has(opts.partitionsOpt)) { - if (topic == OffsetManager.OffsetsTopicName) { + if (topic == ConsumerCoordinator.OffsetsTopicName) { throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") } println("WARNING: If partitions are increased for a topic that has a key, the partition " + diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 730a232..74a4965 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.immutable.Set import com.yammer.metrics.core.Gauge +import kafka.coordinator.ConsumerCoordinator /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR @@ -186,7 +187,7 @@ class Partition(val topic: String, if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(newLeaderReplica) - if (topic == OffsetManager.OffsetsTopicName) + if (topic == ConsumerCoordinator.OffsetsTopicName) offsetManager.loadOffsetsFromLog(partitionId) true } @@ -216,7 +217,7 @@ class Partition(val topic: String, zkVersion = leaderAndIsr.zkVersion leaderReplicaIdOpt.foreach { leaderReplica => - if (topic == OffsetManager.OffsetsTopicName && + if (topic == ConsumerCoordinator.OffsetsTopicName && /* if we are making a leader->follower transition */ leaderReplica == localBrokerId) offsetManager.removeOffsetsFromCacheForPartition(partitionId) diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index 6b4242c..deb48b1 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -17,6 +17,8 @@ package kafka.common +import org.apache.kafka.common.protocol.Errors + case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata) { override def toString = "OffsetMetadata[%d,%s]" .format(offset, @@ -51,7 +53,7 @@ object OffsetAndMetadata { def apply(offset: Long) = new OffsetAndMetadata(OffsetMetadata(offset, OffsetMetadata.NoMetadata)) } -case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = ErrorMapping.NoError) { +case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = Errors.NONE.code) { def offset = offsetMetadata.offset def metadata = offsetMetadata.metadata @@ -60,10 +62,12 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = } object OffsetMetadataAndError { - val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NoError) - val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.OffsetsLoadInProgressCode) - val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.UnknownTopicOrPartitionCode) - val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NotCoordinatorForConsumerCode) + val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE.code) + val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.OFFSET_LOAD_IN_PROGRESS.code) + val UnknownConsumer = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_CONSUMER_ID.code) + val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_CONSUMER.code) + val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION.code) def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), ErrorMapping.NoError) diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index ad75978..32595d6 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -18,7 +18,7 @@ package kafka.common import util.matching.Regex -import kafka.server.OffsetManager +import kafka.coordinator.ConsumerCoordinator object Topic { @@ -26,7 +26,7 @@ object Topic { private val maxNameLength = 255 private val rgx = new Regex(legalChars + "+") - val InternalTopics = Set(OffsetManager.OffsetsTopicName) + val InternalTopics = Set(ConsumerCoordinator.OffsetsTopicName) def validate(topic: String) { if (topic.length <= 0) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 51e89c8..cade77e 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -16,7 +16,8 @@ */ package kafka.coordinator -import kafka.common.TopicAndPartition +import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition} +import kafka.log.LogConfig import kafka.server._ import kafka.utils._ import org.apache.kafka.common.protocol.Errors @@ -24,7 +25,12 @@ import org.apache.kafka.common.requests.JoinGroupRequest import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean +import java.util.Properties +import scala.collection.{Map, Seq, immutable} +import scala.Seq +case class GroupManagerConfig(consumerMinSessionTimeoutMs: Int, + consumerMaxSessionTimeoutMs: Int) /** * ConsumerCoordinator handles consumer group and consumer offset management. @@ -33,17 +39,28 @@ import java.util.concurrent.atomic.AtomicBoolean * consumer groups. Consumer groups are assigned to coordinators based on their * group names. */ -class ConsumerCoordinator(val config: KafkaConfig, - val zkClient: ZkClient, - val offsetManager: OffsetManager) extends Logging { +class ConsumerCoordinator(val brokerId: Int, + val offsetConfig: OffsetManagerConfig, + val groupConfig: GroupManagerConfig, + val replicaManager: ReplicaManager, + val zkClient: ZkClient) extends Logging { - this.logIdent = "[ConsumerCoordinator " + config.brokerId + "]: " + this.logIdent = "[ConsumerCoordinator " + brokerId + "]: " private val isActive = new AtomicBoolean(false) private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null private var coordinatorMetadata: CoordinatorMetadata = null + private var offsetManager: OffsetManager = null + + def offsetsTopicConfigs: Properties = { + val props = new Properties + props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString) + props.put(LogConfig.CompressionTypeProp, ConsumerCoordinatorConfig.DefaultOffsetsTopicCompressionCodec.name) + props + } /** * NOTE: If a group lock and coordinatorLock are simultaneously needed, @@ -55,9 +72,10 @@ class ConsumerCoordinator(val config: KafkaConfig, */ def startup() { info("Starting up.") - heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId) - rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId) - coordinatorMetadata = new CoordinatorMetadata(config, zkClient, maybePrepareRebalance) + heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) + rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", brokerId) + coordinatorMetadata = new CoordinatorMetadata(brokerId, zkClient, maybePrepareRebalance) + offsetManager = new OffsetManager(offsetConfig, replicaManager, zkClient, kafkaScheduler, metadataCache) isActive.set(true) info("Startup complete.") } @@ -69,6 +87,7 @@ class ConsumerCoordinator(val config: KafkaConfig, def shutdown() { info("Shutting down.") isActive.set(false) + offsetManager.shutdown() coordinatorMetadata.shutdown() heartbeatPurgatory.shutdown() rebalancePurgatory.shutdown() @@ -87,7 +106,8 @@ class ConsumerCoordinator(val config: KafkaConfig, responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code) } else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) { responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code) - } else if (sessionTimeoutMs < config.consumerMinSessionTimeoutMs || sessionTimeoutMs > config.consumerMaxSessionTimeoutMs) { + } else if (sessionTimeoutMs < groupConfig.consumerMinSessionTimeoutMs || + sessionTimeoutMs > groupConfig.consumerMaxSessionTimeoutMs) { responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code) } else { // only try to create the group if the group is not unknown AND @@ -196,6 +216,69 @@ class ConsumerCoordinator(val config: KafkaConfig, } } + def handleCommitOffsets(groupId: String, + consumerId: String, + generationId: Int, + offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], + responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { + if (!isActive.get) { + responseCallback(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code) + } else if (!isCoordinatorForGroup(groupId)) { + responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code) + } else { + val group = coordinatorMetadata.getGroup(groupId) + if (group == null) { + // if the group does not exist, it means this group is not relying + // on Kafka for partition management, and hence never send join-group + // request to the coordinator before; in this case blindly commit the offsets + offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback) + } else { + group synchronized { + if (group.is(Dead)) { + responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) + } else if (!group.has(consumerId)) { + responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) + } else if (generationId != group.generationId) { + responseCallback(Errors.ILLEGAL_GENERATION.code) + } else if (!offsetMetadata.keySet.subsetOf(group.get(consumerId).assignedTopicPartitions)) { + responseCallback(Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code) + } else { + offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback) + } + } + } + } + } + + def handleFetchOffsets(groupId: String, + partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = { + if (!isActive.get) { + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)} + } else if (!isCoordinatorForGroup(groupId)) { + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)} + } else { + val group = coordinatorMetadata.getGroup(groupId) + if (group == null) { + // if the group does not exist, it means this group is not relying + // on Kafka for partition management, and hence never send join-group + // request to the coordinator before; in this case blindly fetch the offsets + offsetManager.getOffsets(groupId, partitions) + } else { + group synchronized { + if (group.is(Dead)) { + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)} + } else if (!group.has(consumerId)) { + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)} + } else if (generationId != group.generationId) { + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.IllegalGroupGenerationId)} + } else { + offsetManager.getOffsets(groupId, partitions) + } + } + } + } + } + /** * Complete existing DelayedHeartbeats for the given consumer and schedule the next one */ @@ -275,8 +358,6 @@ class ConsumerCoordinator(val config: KafkaConfig, maybePrepareRebalance(group) } - private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId)) - private def reassignPartitions(group: ConsumerGroupMetadata) = { val assignor = PartitionAssignor.createInstance(group.partitionAssignmentStrategy) val topicsPerConsumer = group.topicsPerConsumer @@ -345,8 +426,19 @@ class ConsumerCoordinator(val config: KafkaConfig, } } - def onCompleteHeartbeat() {} + def onCompleteHeartbeat() { + // TODO: add metrics for complete heartbeats + } + + def partitionFor(group: String): Int = offsetManager.partitionFor(group) private def shouldKeepConsumerAlive(consumer: ConsumerMetadata, heartbeatDeadline: Long) = consumer.awaitingRebalanceCallback != null || consumer.latestHeartbeat + consumer.sessionTimeoutMs > heartbeatDeadline + + private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId)) +} + +object ConsumerCoordinator { + + val OffsetsTopicName = "__consumer_offsets" } diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index c39e6de..a49647a 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -32,7 +32,7 @@ import scala.collection.mutable * It delegates all group logic to the callers. */ @threadsafe -private[coordinator] class CoordinatorMetadata(config: KafkaConfig, +private[coordinator] class CoordinatorMetadata(brokerId: Int, zkClient: ZkClient, maybePrepareRebalance: ConsumerGroupMetadata => Unit) { @@ -179,7 +179,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, * Zookeeper listener to handle topic partition changes */ class TopicPartitionChangeListener extends IZkDataListener with Logging { - this.logIdent = "[TopicPartitionChangeListener on Coordinator " + config.brokerId + "]: " + this.logIdent = "[TopicPartitionChangeListener on Coordinator " + brokerId + "]: " override def handleDataChange(dataPath: String, data: Object) { info("Handling data change for path: %s data: %s".format(dataPath, data)) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d63bc18..ca1aa3a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -37,7 +37,6 @@ import org.I0Itec.zkclient.ZkClient */ class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, - val offsetManager: OffsetManager, val coordinator: ConsumerCoordinator, val controller: KafkaController, val zkClient: ZkClient, @@ -189,7 +188,7 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetRetention = if (offsetCommitRequest.versionId <= 1 || offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) { - offsetManager.config.offsetsRetentionMs + coordinator.offsetConfig.offsetsRetentionMs } else { offsetCommitRequest.retentionMs } @@ -215,8 +214,8 @@ class KafkaApis(val requestChannel: RequestChannel, ) ) - // call offset manager to store offsets - offsetManager.storeOffsets( + // call coordinator to handle commit offset + coordinator.handleCommitOffsets( offsetCommitRequest.groupId, offsetCommitRequest.consumerId, offsetCommitRequest.groupGenerationId, @@ -422,9 +421,9 @@ class KafkaApis(val requestChannel: RequestChannel, if (topics.size > 0 && topicResponses.size != topics.size) { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => - if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) { + if (topic == ConsumerCoordinator.OffsetsTopicName || config.autoCreateTopicsEnable) { try { - if (topic == OffsetManager.OffsetsTopicName) { + if (topic == ConsumerCoordinator.OffsetsTopicName) { val aliveBrokers = metadataCache.getAliveBrokers val offsetsTopicReplicationFactor = if (aliveBrokers.length > 0) @@ -433,7 +432,7 @@ class KafkaApis(val requestChannel: RequestChannel, config.offsetsTopicReplicationFactor AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor, - offsetManager.offsetsTopicConfig) + coordinator.offsetsTopicConfigs) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor)) } @@ -496,26 +495,19 @@ class KafkaApis(val requestChannel: RequestChannel, OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId) } else { - // version 1 reads offsets from Kafka - val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => - metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty - ) - val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap - val knownStatus = - if (knownTopicPartitions.size > 0) - offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap - else - Map.empty[TopicAndPartition, OffsetMetadataAndError] - val status = unknownStatus ++ knownStatus + // version 1 reads offsets from Kafka; + val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, offsetFetchRequest.requestInfo).toMap - OffsetFetchResponse(status, offsetFetchRequest.correlationId) + // Note that we do not need to filter the partitions in the + // metadata cache as the topic partitions will be filtered + // in coordinator's offset manager through the offset cache + OffsetFetchResponse(offsets, offsetFetchRequest.correlationId) } trace("Sending offset fetch response %s for correlation id %d to client %s." .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) - } /* @@ -524,10 +516,10 @@ class KafkaApis(val requestChannel: RequestChannel, def handleConsumerMetadataRequest(request: RequestChannel.Request) { val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest] - val partition = offsetManager.partitionFor(consumerMetadataRequest.group) + val partition = coordinator.partitionFor(consumerMetadataRequest.group) // get metadata (and create the topic if necessary) - val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName), request.securityProtocol).head + val offsetsTopicMetadata = getTopicMetadata(Set(ConsumerCoordinator.OffsetsTopicName), request.securityProtocol).head val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b320ce9..7e5d332 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -38,7 +38,7 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker import kafka.network.{BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -import kafka.coordinator.ConsumerCoordinator +import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinatorConfig, ConsumerCoordinator} /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -72,8 +72,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var logManager: LogManager = null - var offsetManager: OffsetManager = null - var replicaManager: ReplicaManager = null var topicConfigManager: TopicConfigManager = null @@ -154,19 +152,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) replicaManager.startup() - /* start offset manager */ - offsetManager = createOffsetManager() - /* start kafka controller */ kafkaController = new KafkaController(config, zkClient, brokerState) kafkaController.startup() /* start kafka coordinator */ - consumerCoordinator = new ConsumerCoordinator(config, zkClient, offsetManager) + consumerCoordinator = createConsumerCoordinator(zkClient) consumerCoordinator.startup() /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, + apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config, metadataCache) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) @@ -346,8 +341,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg CoreUtils.swallow(socketServer.shutdown()) if(requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown()) - if(offsetManager != null) - offsetManager.shutdown() CoreUtils.swallow(kafkaScheduler.shutdown()) if(apis != null) CoreUtils.swallow(apis.close()) @@ -387,6 +380,20 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def boundPort(): Int = socketServer.boundPort() + private def createConsumerCoordinator(zkClient: ZkClient, replicaManager: ReplicaManager): ConsumerCoordinator = { + val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, + loadBufferSize = config.offsetsLoadBufferSize, + offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, + offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, + offsetsTopicNumPartitions = config.offsetsTopicPartitions, + offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, + offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, + offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) + val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs, + consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs) + new ConsumerCoordinator(config.brokerId, offsetConfig, groupConfig, replicaManager, zkClient) + } + private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, segmentMs = config.logRollTimeMillis, @@ -428,19 +435,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg time = time) } - private def createOffsetManager(): OffsetManager = { - val offsetManagerConfig = OffsetManagerConfig( - maxMetadataSize = config.offsetMetadataMaxSize, - loadBufferSize = config.offsetsLoadBufferSize, - offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, - offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, - offsetsTopicNumPartitions = config.offsetsTopicPartitions, - offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, - offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, - offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) - new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler, metadataCache) - } - /** * Generates new brokerId or reads from meta.properties based on following conditions *
    diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 5cca85c..ae42cce 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -42,6 +42,8 @@ import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Gauge import org.I0Itec.zkclient.ZkClient +import kafka.coordinator.{ConsumerCoordinator, ConsumerCoordinatorConfig} +import org.apache.kafka.common.protocol.Errors /** * Configuration settings for in-built offset management @@ -84,7 +86,7 @@ object OffsetManagerConfig { val DefaultOffsetCommitRequiredAcks = (-1).toShort } -class OffsetManager(val config: OffsetManagerConfig, +class OffsetManager(val config: ConsumerCoordinatorConfig, replicaManager: ReplicaManager, zkClient: ZkClient, scheduler: Scheduler, @@ -143,9 +145,9 @@ class OffsetManager(val config: OffsetManagerConfig, // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say, // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles. tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) => - val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition) + val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition) partitionOpt.map { partition => - val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition) + val appendPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition) val messages = tombstones.map(_._2).toSeq trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition)) @@ -170,14 +172,6 @@ class OffsetManager(val config: OffsetManagerConfig, } - def offsetsTopicConfig: Properties = { - val props = new Properties - props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString) - props.put(LogConfig.CleanupPolicyProp, "compact") - props.put(LogConfig.CompressionTypeProp, "uncompressed") - props - } - def partitionFor(group: String): Int = Utils.abs(group.hashCode) % config.offsetsTopicNumPartitions /** @@ -240,7 +234,7 @@ class OffsetManager(val config: OffsetManagerConfig, ) }.toSeq - val offsetTopicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, partitionFor(groupId)) + val offsetTopicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, partitionFor(groupId)) val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) @@ -271,6 +265,10 @@ class OffsetManager(val config: OffsetManagerConfig, ErrorMapping.ConsumerCoordinatorNotAvailableCode else if (status.error == ErrorMapping.NotLeaderForPartitionCode) ErrorMapping.NotCoordinatorForConsumerCode + else if (status.error == ErrorMapping.MessageSizeTooLargeCode + || status.error == ErrorMapping.MessageSetSizeTooLargeCode + || status.error == ErrorMapping.InvalidFetchSizeCode) + Errors.INVALID_COMMIT_OFFSET_SIZE.code else status.error } @@ -338,7 +336,7 @@ class OffsetManager(val config: OffsetManagerConfig, debug("Could not fetch offsets for group %s (not offset coordinator).".format(group)) topicPartitions.map { topicAndPartition => val groupTopicPartition = GroupTopicPartition(group, topicAndPartition) - (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotOffsetManagerForGroup) + (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup) }.toMap } } @@ -349,7 +347,7 @@ class OffsetManager(val config: OffsetManagerConfig, */ def loadOffsetsFromLog(offsetsPartition: Int) { - val topicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition) + val topicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition) loadingPartitions synchronized { if (loadingPartitions.contains(offsetsPartition)) { @@ -421,7 +419,7 @@ class OffsetManager(val config: OffsetManagerConfig, } private def getHighWatermark(partitionId: Int): Long = { - val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, partitionId) + val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, partitionId) val hw = partitionOpt.map { partition => partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L) @@ -449,7 +447,7 @@ class OffsetManager(val config: OffsetManagerConfig, } if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition." - .format(numRemoved, TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition))) + .format(numRemoved, TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition))) } @@ -461,8 +459,6 @@ class OffsetManager(val config: OffsetManagerConfig, object OffsetManager { - val OffsetsTopicName = "__consumer_offsets" - private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema) private val CURRENT_OFFSET_SCHEMA_VERSION = 1.toShort diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 59c9bc3..05ee813 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -393,10 +393,10 @@ class ReplicaManager(val config: KafkaConfig, (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe))) case nle: NotLeaderForPartitionException => (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle))) - case mtl: MessageSizeTooLargeException => - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtl))) - case mstl: MessageSetSizeTooLargeException => - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstl))) + case mtle: MessageSizeTooLargeException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle))) + case mstle: MessageSetSizeTooLargeException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle))) case imse : InvalidMessageSizeException => (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse))) case t: Throwable => diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 17b17b9..92ffb91 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -25,12 +25,13 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.NoOffsetForPartitionException import kafka.utils.{TestUtils, Logging} -import kafka.server.{KafkaConfig, OffsetManager} +import kafka.server.KafkaConfig import java.util.ArrayList import org.junit.Assert._ import scala.collection.JavaConversions._ +import kafka.coordinator.ConsumerCoordinator /** @@ -158,9 +159,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumer0.poll(50) // get metadata for the topic - var parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName) + var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName) while(parts == null) - parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName) + parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName) assertEquals(1, parts.size) assertNotNull(parts(0).leader()) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 07b1ff4..afcc349 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.KafkaProducer import kafka.server.{OffsetManager, KafkaConfig} import kafka.integration.KafkaServerTestHarness import scala.collection.mutable.Buffer +import kafka.coordinator.ConsumerCoordinator /** * A helper class for writing integration tests that involve producers, consumers, and servers @@ -63,11 +64,11 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { consumers += new KafkaConsumer(consumerConfig) // create the consumer offset topic - TestUtils.createTopic(zkClient, OffsetManager.OffsetsTopicName, - serverConfig.getProperty("offsets.topic.num.partitions").toInt, - serverConfig.getProperty("offsets.topic.replication.factor").toInt, + TestUtils.createTopic(zkClient, ConsumerCoordinator.OffsetsTopicName, + serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt, + serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt, servers, - servers(0).offsetManager.offsetsTopicConfig) + servers(0).consumerCoordinator.offsetsTopicConfigs) } override def tearDown() { diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index c7136f2..dcd6988 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -22,9 +22,9 @@ import org.scalatest.junit.JUnit3Suite import kafka.utils.Logging import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness -import kafka.server.{OffsetManager, KafkaConfig} import kafka.admin.TopicCommand.TopicCommandOptions import kafka.utils.ZkUtils +import kafka.coordinator.ConsumerCoordinator class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @@ -87,12 +87,12 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin // create the offset topic val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, "--replication-factor", "1", - "--topic", OffsetManager.OffsetsTopicName)) + "--topic", ConsumerCoordinator.OffsetsTopicName)) TopicCommand.createTopic(zkClient, createOffsetTopicOpts) // try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't - val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", OffsetManager.OffsetsTopicName)) - val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(OffsetManager.OffsetsTopicName) + val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", ConsumerCoordinator.OffsetsTopicName)) + val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName) assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deleteOffsetTopicPath)) intercept[AdminOperationException] { TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts) diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index 4f124af..4b326d0 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -22,6 +22,7 @@ import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.Test import kafka.server.OffsetManager +import kafka.coordinator.ConsumerCoordinator class TopicFilterTest extends JUnitSuite { @@ -37,8 +38,8 @@ class TopicFilterTest extends JUnitSuite { val topicFilter2 = new Whitelist(".+") assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true)) - assertFalse(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true)) - assertTrue(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false)) + assertFalse(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true)) + assertTrue(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false)) val topicFilter3 = new Whitelist("white_listed-topic.+") assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true)) @@ -57,8 +58,8 @@ class TopicFilterTest extends JUnitSuite { assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true)) assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false)) - assertFalse(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true)) - assertTrue(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false)) + assertFalse(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true)) + assertTrue(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false)) } @Test -- 1.7.12.4 From 315e63fddd19482b30dd6f72d1ed6515ed63e8cf Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sat, 6 Jun 2015 18:41:35 -0700 Subject: [PATCH 2/5] still in progress --- core/src/main/scala/kafka/cluster/Partition.scala | 17 +---- .../kafka/coordinator/ConsumerCoordinator.scala | 36 ++++++---- core/src/main/scala/kafka/server/KafkaApis.scala | 32 +++++++-- core/src/main/scala/kafka/server/KafkaServer.scala | 10 +-- .../main/scala/kafka/server/OffsetManager.scala | 28 +++----- .../main/scala/kafka/server/ReplicaManager.scala | 81 ++++++++++++++-------- 6 files changed, 118 insertions(+), 86 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 74a4965..addd7a3 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,7 +22,7 @@ import kafka.utils.CoreUtils.{inReadLock,inWriteLock} import kafka.admin.AdminUtils import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, LogReadResult, ReplicaManager} +import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, LogReadResult, ReplicaManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -32,7 +32,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.immutable.Set import com.yammer.metrics.core.Gauge -import kafka.coordinator.ConsumerCoordinator /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR @@ -161,8 +160,7 @@ class Partition(val topic: String, * and setting the new leader and ISR */ def makeLeader(controllerId: Int, - partitionStateInfo: PartitionStateInfo, correlationId: Int, - offsetManager: OffsetManager): Boolean = { + partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = { inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch @@ -187,8 +185,6 @@ class Partition(val topic: String, if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(newLeaderReplica) - if (topic == ConsumerCoordinator.OffsetsTopicName) - offsetManager.loadOffsetsFromLog(partitionId) true } } @@ -199,7 +195,7 @@ class Partition(val topic: String, */ def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, - correlationId: Int, offsetManager: OffsetManager): Boolean = { + correlationId: Int): Boolean = { inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch @@ -216,13 +212,6 @@ class Partition(val topic: String, leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion - leaderReplicaIdOpt.foreach { leaderReplica => - if (topic == ConsumerCoordinator.OffsetsTopicName && - /* if we are making a leader->follower transition */ - leaderReplica == localBrokerId) - offsetManager.removeOffsetsFromCacheForPartition(partitionId) - } - if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) { false } diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index cade77e..238fbe9 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -27,7 +27,6 @@ import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean import java.util.Properties import scala.collection.{Map, Seq, immutable} -import scala.Seq case class GroupManagerConfig(consumerMinSessionTimeoutMs: Int, consumerMaxSessionTimeoutMs: Int) @@ -43,6 +42,7 @@ class ConsumerCoordinator(val brokerId: Int, val offsetConfig: OffsetManagerConfig, val groupConfig: GroupManagerConfig, val replicaManager: ReplicaManager, + val kafkaScheduler: KafkaScheduler, val zkClient: ZkClient) extends Logging { this.logIdent = "[ConsumerCoordinator " + brokerId + "]: " @@ -57,8 +57,8 @@ class ConsumerCoordinator(val brokerId: Int, def offsetsTopicConfigs: Properties = { val props = new Properties props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString) - props.put(LogConfig.CompressionTypeProp, ConsumerCoordinatorConfig.DefaultOffsetsTopicCompressionCodec.name) + props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString) + props.put(LogConfig.CompressionTypeProp, OffsetManagerConfig.DefaultOffsetsTopicCompressionCodec.name) props } @@ -75,7 +75,7 @@ class ConsumerCoordinator(val brokerId: Int, heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", brokerId) coordinatorMetadata = new CoordinatorMetadata(brokerId, zkClient, maybePrepareRebalance) - offsetManager = new OffsetManager(offsetConfig, replicaManager, zkClient, kafkaScheduler, metadataCache) + offsetManager = new OffsetManager(offsetConfig, replicaManager, zkClient, kafkaScheduler) isActive.set(true) info("Startup complete.") } @@ -222,9 +222,9 @@ class ConsumerCoordinator(val brokerId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { if (!isActive.get) { - responseCallback(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code) + responseCallback(offsetMetadata.mapValues(_ => Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code)) } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code) + responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_CONSUMER.code)) } else { val group = coordinatorMetadata.getGroup(groupId) if (group == null) { @@ -235,13 +235,13 @@ class ConsumerCoordinator(val brokerId: Int, } else { group synchronized { if (group.is(Dead)) { - responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) + responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code)) } else if (!group.has(consumerId)) { - responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) + responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code)) } else if (generationId != group.generationId) { - responseCallback(Errors.ILLEGAL_GENERATION.code) + responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) } else if (!offsetMetadata.keySet.subsetOf(group.get(consumerId).assignedTopicPartitions)) { - responseCallback(Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code) + responseCallback(offsetMetadata.mapValues(_ => Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code)) } else { offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback) } @@ -253,7 +253,7 @@ class ConsumerCoordinator(val brokerId: Int, def handleFetchOffsets(groupId: String, partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = { if (!isActive.get) { - partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)} + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap } else if (!isCoordinatorForGroup(groupId)) { partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)} } else { @@ -267,10 +267,6 @@ class ConsumerCoordinator(val brokerId: Int, group synchronized { if (group.is(Dead)) { partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)} - } else if (!group.has(consumerId)) { - partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)} - } else if (generationId != group.generationId) { - partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.IllegalGroupGenerationId)} } else { offsetManager.getOffsets(groupId, partitions) } @@ -279,6 +275,16 @@ class ConsumerCoordinator(val brokerId: Int, } } + def handleGroupImmigration(offsetTopicPartitionId: Int) = { + // TODO we may need to add more logic in KAFKA-2217 + offsetManager.loadOffsetsFromLog(offsetTopicPartitionId) + } + + def handleGroupEmigration(offsetTopicPartitionId: Int) = { + // TODO we may need to add more logic in KAFKA-2217 + offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId) + } + /** * Complete existing DelayedHeartbeats for the given consumer and schedule the next one */ diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ca1aa3a..555d9a1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -94,8 +94,22 @@ class KafkaApis(val requestChannel: RequestChannel, // stop serving data to clients for the topic being deleted val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] try { - val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager) - val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) + // call replica manager to handle updating partitions to become leader or follower + val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) + val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, result.errorCode) + // for each new leader or follower, call coordinator to handle + result.updatedLeaders.foreach { case partition => + if (partition.topic == ConsumerCoordinator.OffsetsTopicName) + coordinator.handleGroupImmigration(partition.partitionId) + } + result.updatedFollowers.foreach { case partition => + partition.leaderReplicaIdOpt.foreach { leaderReplica => + if (partition.topic == ConsumerCoordinator.OffsetsTopicName && + leaderReplica == brokerId) + coordinator.handleGroupEmigration(partition.partitionId) + } + } + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, leaderAndIsrResponse))) } catch { case e: KafkaStorageException => @@ -141,6 +155,12 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetCommitRequest(request: RequestChannel.Request) { val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + // filter non-exist topics + val invalidRequestsInfo = offsetCommitRequest.requestInfo.filter { case (topicAndPartition, offsetMetadata) => + !metadataCache.contains(topicAndPartition.topic) + } + val filteredRequestInfo = (offsetCommitRequest.requestInfo -- invalidRequestsInfo.keys) + // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) { commitStatus.foreach { case (topicAndPartition, errorCode) => @@ -153,14 +173,14 @@ class KafkaApis(val requestChannel: RequestChannel, topicAndPartition, ErrorMapping.exceptionNameFor(errorCode))) } } - - val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId) + val combinedCommitStatus = commitStatus ++ invalidRequestsInfo.map(_._1 -> ErrorMapping.UnknownTopicOrPartitionCode) + val response = OffsetCommitResponse(combinedCommitStatus, offsetCommitRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } if (offsetCommitRequest.versionId == 0) { // for version 0 always store offsets to ZK - val responseInfo = offsetCommitRequest.requestInfo.map { + val responseInfo = filteredRequestInfo.map { case (topicAndPartition, metaAndError) => { val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic) try { @@ -202,7 +222,7 @@ class KafkaApis(val requestChannel: RequestChannel, val currentTimestamp = SystemTime.milliseconds val defaultExpireTimestamp = offsetRetention + currentTimestamp - val offsetData = offsetCommitRequest.requestInfo.mapValues(offsetAndMetadata => + val offsetData = filteredRequestInfo.mapValues(offsetAndMetadata => offsetAndMetadata.copy( commitTimestamp = currentTimestamp, expireTimestamp = { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 7e5d332..5c8cbb1 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -38,7 +38,7 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker import kafka.network.{BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinatorConfig, ConsumerCoordinator} +import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator} /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -157,7 +157,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg kafkaController.startup() /* start kafka coordinator */ - consumerCoordinator = createConsumerCoordinator(zkClient) + consumerCoordinator = createConsumerCoordinator(zkClient, replicaManager, kafkaScheduler) consumerCoordinator.startup() /* start processing requests */ @@ -380,7 +380,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def boundPort(): Int = socketServer.boundPort() - private def createConsumerCoordinator(zkClient: ZkClient, replicaManager: ReplicaManager): ConsumerCoordinator = { + private def createConsumerCoordinator(zkClient: ZkClient, + replicaManager: ReplicaManager, + kafkaScheduler: KafkaScheduler): ConsumerCoordinator = { val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, loadBufferSize = config.offsetsLoadBufferSize, offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, @@ -391,7 +393,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs, consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs) - new ConsumerCoordinator(config.brokerId, offsetConfig, groupConfig, replicaManager, zkClient) + new ConsumerCoordinator(config.brokerId, offsetConfig, groupConfig, replicaManager, kafkaScheduler, zkClient) } private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index ae42cce..47b6ce9 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -17,6 +17,7 @@ package kafka.server +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.{Struct, Schema, Field} import org.apache.kafka.common.protocol.types.Type.STRING import org.apache.kafka.common.protocol.types.Type.INT32 @@ -25,25 +26,23 @@ import org.apache.kafka.common.utils.Utils import kafka.utils._ import kafka.common._ -import kafka.log.{FileMessageSet, LogConfig} +import kafka.log.FileMessageSet import kafka.message._ import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import kafka.tools.MessageFormatter import kafka.api.ProducerResponseStatus +import kafka.coordinator.ConsumerCoordinator import scala.Some import scala.collection._ import java.io.PrintStream import java.util.concurrent.atomic.AtomicBoolean import java.nio.ByteBuffer -import java.util.Properties import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Gauge import org.I0Itec.zkclient.ZkClient -import kafka.coordinator.{ConsumerCoordinator, ConsumerCoordinatorConfig} -import org.apache.kafka.common.protocol.Errors /** * Configuration settings for in-built offset management @@ -86,11 +85,10 @@ object OffsetManagerConfig { val DefaultOffsetCommitRequiredAcks = (-1).toShort } -class OffsetManager(val config: ConsumerCoordinatorConfig, +class OffsetManager(val config: OffsetManagerConfig, replicaManager: ReplicaManager, zkClient: ZkClient, - scheduler: Scheduler, - metadataCache: MetadataCache) extends Logging with KafkaMetricsGroup { + scheduler: Scheduler) extends Logging with KafkaMetricsGroup { /* offsets and metadata cache */ private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata] @@ -208,22 +206,14 @@ class OffsetManager(val config: ConsumerCoordinatorConfig, /** * Store offsets by appending it to the replicated log and then inserting to cache */ - // TODO: generation id and consumer id is needed by coordinator to do consumer checking in the future def storeOffsets(groupId: String, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { - // check if there are any non-existent topics - val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata) => - !metadataCache.contains(topicAndPartition.topic) - } - - // first filter out partitions with offset metadata size exceeding limit or - // if its a non existing topic - // TODO: in the future we may want to only support atomic commit and hence fail the whole commit + // first filter out partitions with offset metadata size exceeding limit val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition) + validateOffsetMetadataLength(offsetAndMetadata.metadata) } // construct the message set to append @@ -276,9 +266,7 @@ class OffsetManager(val config: ConsumerCoordinatorConfig, // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => - if (nonExistentTopics.contains(topicAndPartition)) - (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode) - else if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) + if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicAndPartition, responseCode) else (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 05ee813..ee27164 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -23,19 +23,23 @@ import kafka.cluster.{BrokerEndPoint, Partition, Replica} import kafka.log.{LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController -import kafka.common.TopicAndPartition import kafka.message.{ByteBufferMessageSet, MessageSet} +import kafka.api.ProducerResponseStatus +import kafka.common.TopicAndPartition +import kafka.api.PartitionFetchInfo + +import org.apache.kafka.common.protocol.Errors import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} import java.util.concurrent.TimeUnit -import org.apache.kafka.common.protocol.Errors +import scala.Some import scala.Predef._ import scala.collection._ import scala.collection.mutable.HashMap -import scala.collection.Map -import scala.collection.Set +import scala.collection.mutable.Set +import scala.collection.mutable.Map import org.I0Itec.zkclient.ZkClient import com.yammer.metrics.core.Gauge @@ -84,6 +88,17 @@ object LogReadResult { false) } +case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[(String, Int), Short], + updatedLeaders: Set[Partition], + updatedFollowers: Set[Partition], + errorCode: Short) { + + override def toString = { + "updated leaders: [%s], updated followers: [%s], update results: [%s], global error: [%d]" + .format(updatedLeaders, updatedFollowers, responseMap, errorCode) + } +} + object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" } @@ -544,8 +559,7 @@ class ReplicaManager(val config: KafkaConfig, } } - def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, - offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = { + def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): BecomeLeaderOrFollowerResult = { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId, @@ -553,13 +567,13 @@ class ReplicaManager(val config: KafkaConfig, } replicaStateChangeLock synchronized { val responseMap = new collection.mutable.HashMap[(String, Int), Short] - if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { + if (leaderAndISRRequest.controllerEpoch < controllerEpoch) { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " + "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch)) } - (responseMap, ErrorMapping.StaleControllerEpochCode) + BecomeLeaderOrFollowerResult(responseMap, Set.empty[Partition], Set.empty[Partition], ErrorMapping.StaleControllerEpochCode) } else { val controllerId = leaderAndISRRequest.controllerId val correlationId = leaderAndISRRequest.correlationId @@ -567,7 +581,7 @@ class ReplicaManager(val config: KafkaConfig, // First check partition's leader epoch val partitionState = new HashMap[Partition, PartitionStateInfo]() - leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) => + leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partitionId), partitionStateInfo) => val partition = getOrCreatePartition(topic, partitionId) val partitionLeaderEpoch = partition.getLeaderEpoch() // If the leader epoch is valid record the epoch of the controller that made the leadership decision. @@ -591,14 +605,19 @@ class ReplicaManager(val config: KafkaConfig, } } - val partitionsTobeLeader = partitionState - .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId} + val partitionsTobeLeader = partitionState.filter { case (partition, partitionStateInfo) => + partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId + } val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys) - if (!partitionsTobeLeader.isEmpty) - makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager) - if (!partitionsToBeFollower.isEmpty) - makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager) + val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty) + makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap) + else + Set.empty[Partition] + val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty) + makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap) + else + Set.empty[Partition] // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions // have been completely populated before starting the checkpointing there by avoiding weird race conditions @@ -607,7 +626,7 @@ class ReplicaManager(val config: KafkaConfig, hwThreadInitialized = true } replicaFetcherManager.shutdownIdleFetcherThreads() - (responseMap, ErrorMapping.NoError) + BecomeLeaderOrFollowerResult(responseMap, partitionsBecomeLeader, partitionsBecomeFollower, ErrorMapping.NoError) } } } @@ -623,10 +642,11 @@ class ReplicaManager(val config: KafkaConfig, * the error message will be set on each partition since we do not know which partition caused it * TODO: the above may need to be fixed later */ - private def makeLeaders(controllerId: Int, epoch: Int, + private def makeLeaders(controllerId: Int, + epoch: Int, partitionState: Map[Partition, PartitionStateInfo], - correlationId: Int, responseMap: mutable.Map[(String, Int), Short], - offsetManager: OffsetManager) = { + correlationId: Int, + responseMap: mutable.Map[(String, Int), Short]): Set[Partition] = { partitionState.foreach(state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-leader transition for partition %s") @@ -645,7 +665,7 @@ class ReplicaManager(val config: KafkaConfig, } // Update the partition information to be the leader partitionState.foreach{ case (partition, partitionStateInfo) => - partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)} + partition.makeLeader(controllerId, partitionStateInfo, correlationId)} } catch { case e: Throwable => @@ -664,6 +684,8 @@ class ReplicaManager(val config: KafkaConfig, "for the become-leader transition for partition %s") .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) } + + partitionState.keySet } /* @@ -682,9 +704,12 @@ class ReplicaManager(val config: KafkaConfig, * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where * the error message will be set on each partition since we do not know which partition caused it */ - private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], - leaders: Set[BrokerEndPoint], correlationId: Int, responseMap: mutable.Map[(String, Int), Short], - offsetManager: OffsetManager) { + private def makeFollowers(controllerId: Int, + epoch: Int, + partitionState: Map[Partition, PartitionStateInfo], + leaders: Set[BrokerEndPoint], + correlationId: Int, + responseMap: Map[(String, Int), Short]) : Set[Partition] = { partitionState.foreach { state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partition %s") @@ -694,18 +719,18 @@ class ReplicaManager(val config: KafkaConfig, for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) - try { + val partitionsToMakeFollower: Set[Partition] = Set() - var partitionsToMakeFollower: Set[Partition] = Set() + try { - // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 + // TODO: Delete leaders from LeaderAndIsrRequest partitionState.foreach{ case (partition, partitionStateInfo) => val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader leaders.find(_.id == newLeaderBrokerId) match { // Only change partition state when the leader is available case Some(leaderBroker) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager)) + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) partitionsToMakeFollower += partition else stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + @@ -775,6 +800,8 @@ class ReplicaManager(val config: KafkaConfig, "for the become-follower transition for partition %s") .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) } + + partitionsToMakeFollower } private def maybeShrinkIsr(): Unit = { -- 1.7.12.4 From 67235c65adfa7ecfa0f49e20ff4835b9dcc95f91 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 8 Jun 2015 14:52:20 -0700 Subject: [PATCH 3/5] minor fix --- .../apache/kafka/common/requests/OffsetFetchRequest.java | 3 --- .../apache/kafka/common/requests/OffsetFetchResponse.java | 1 + .../kafka/clients/consumer/internals/CoordinatorTest.java | 4 ---- .../main/scala/kafka/coordinator/ConsumerCoordinator.scala | 4 ++-- core/src/main/scala/kafka/server/ReplicaManager.scala | 14 +++++--------- .../unit/kafka/coordinator/CoordinatorMetadataTest.scala | 2 +- 6 files changed, 9 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index deec1fa..872aeac 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -42,9 +42,6 @@ public class OffsetFetchRequest extends AbstractRequest { // partition level field names private static final String PARTITION_KEY_NAME = "partition"; - public static final int DEFAULT_GENERATION_ID = -1; - public static final String DEFAULT_CONSUMER_ID = ""; - private final String groupId; private final List partitions; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 096d979..ec05619 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -47,6 +47,7 @@ public class OffsetFetchResponse extends AbstractRequestResponse { /** * Possible error code: * + * UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v1 * OFFSET_LOAD_IN_PROGRESS (14) * NOT_COORDINATOR_FOR_CONSUMER (16) * ILLEGAL_GENERATION (22) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index b06c4a7..e3518be 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -247,10 +247,6 @@ public class CoordinatorTest { client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L)); assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size()); - // fetch with offset topic unknown - client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L)); - assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size()); - // fetch with offset -1 client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L)); assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size()); diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 238fbe9..ea542f0 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -255,7 +255,7 @@ class ConsumerCoordinator(val brokerId: Int, if (!isActive.get) { partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap } else if (!isCoordinatorForGroup(groupId)) { - partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)} + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap } else { val group = coordinatorMetadata.getGroup(groupId) if (group == null) { @@ -266,7 +266,7 @@ class ConsumerCoordinator(val brokerId: Int, } else { group synchronized { if (group.is(Dead)) { - partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)} + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)}.toMap } else { offsetManager.getOffsets(groupId, partitions) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ee27164..795220e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -35,11 +35,7 @@ import java.io.{IOException, File} import java.util.concurrent.TimeUnit import scala.Some -import scala.Predef._ import scala.collection._ -import scala.collection.mutable.HashMap -import scala.collection.mutable.Set -import scala.collection.mutable.Map import org.I0Itec.zkclient.ZkClient import com.yammer.metrics.core.Gauge @@ -431,7 +427,7 @@ class ReplicaManager(val config: KafkaConfig, def fetchMessages(timeout: Long, replicaId: Int, fetchMinBytes: Int, - fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], + fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo], responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { val isFromFollower = replicaId >= 0 @@ -566,7 +562,7 @@ class ReplicaManager(val config: KafkaConfig, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition)) } replicaStateChangeLock synchronized { - val responseMap = new collection.mutable.HashMap[(String, Int), Short] + val responseMap = new mutable.HashMap[(String, Int), Short] if (leaderAndISRRequest.controllerEpoch < controllerEpoch) { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " + @@ -580,7 +576,7 @@ class ReplicaManager(val config: KafkaConfig, controllerEpoch = leaderAndISRRequest.controllerEpoch // First check partition's leader epoch - val partitionState = new HashMap[Partition, PartitionStateInfo]() + val partitionState = new mutable.HashMap[Partition, PartitionStateInfo]() leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partitionId), partitionStateInfo) => val partition = getOrCreatePartition(topic, partitionId) val partitionLeaderEpoch = partition.getLeaderEpoch() @@ -709,7 +705,7 @@ class ReplicaManager(val config: KafkaConfig, partitionState: Map[Partition, PartitionStateInfo], leaders: Set[BrokerEndPoint], correlationId: Int, - responseMap: Map[(String, Int), Short]) : Set[Partition] = { + responseMap: mutable.Map[(String, Int), Short]) : Set[Partition] = { partitionState.foreach { state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partition %s") @@ -719,7 +715,7 @@ class ReplicaManager(val config: KafkaConfig, for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) - val partitionsToMakeFollower: Set[Partition] = Set() + val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() try { diff --git a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala index 08854c5..2cbf6e2 100644 --- a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala @@ -40,7 +40,7 @@ class CoordinatorMetadataTest extends JUnitSuite { def setUp() { val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") zkClient = EasyMock.createStrictMock(classOf[ZkClient]) - coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props), zkClient, null) + coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkClient, null) } @Test -- 1.7.12.4 From fc597e3a59a7a56c5522da62469bc97326db92fb Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 8 Jun 2015 16:06:30 -0700 Subject: [PATCH 4/5] test passed --- .../src/main/scala/kafka/coordinator/ConsumerCoordinator.scala | 3 ++- core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala | 10 +++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index ea542f0..da80128 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -17,6 +17,7 @@ package kafka.coordinator import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition} +import kafka.message.UncompressedCodec import kafka.log.LogConfig import kafka.server._ import kafka.utils._ @@ -58,7 +59,7 @@ class ConsumerCoordinator(val brokerId: Int, val props = new Properties props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString) - props.put(LogConfig.CompressionTypeProp, OffsetManagerConfig.DefaultOffsetsTopicCompressionCodec.name) + props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name) props } diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 528525b..39a6852 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -120,7 +120,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val fetchRequest2 = OffsetFetchRequest(group, Seq(unknownTopicAndPartition)) val fetchResponse2 = simpleConsumer.fetchOffsets(fetchRequest2) - assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get) + assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get) assertEquals(1, fetchResponse2.requestInfo.size) } @@ -166,14 +166,14 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error) - assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error) - assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error) + assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error) assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get) - assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error) - assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error) + assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get) assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata) assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata) -- 1.7.12.4 From decc8046325614c70958da6594159340aadfd40e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 8 Jun 2015 16:10:31 -0700 Subject: [PATCH 5/5] comments --- .../main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index ec05619..3dc8521 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -47,7 +47,7 @@ public class OffsetFetchResponse extends AbstractRequestResponse { /** * Possible error code: * - * UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v1 + * UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0 * OFFSET_LOAD_IN_PROGRESS (14) * NOT_COORDINATOR_FOR_CONSUMER (16) * ILLEGAL_GENERATION (22) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 555d9a1..0dc0e0b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -98,6 +98,7 @@ class KafkaApis(val requestChannel: RequestChannel, val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, result.errorCode) // for each new leader or follower, call coordinator to handle + // consumer group migration result.updatedLeaders.foreach { case partition => if (partition.topic == ConsumerCoordinator.OffsetsTopicName) coordinator.handleGroupImmigration(partition.partitionId) -- 1.7.12.4