From 6a92883a843492c70e50fb700573ad674565a14a Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 20 May 2015 19:01:25 -0700 Subject: [PATCH 1/5] dummy --- .../clients/consumer/internals/Coordinator.java | 87 ++++++++++++++-------- .../kafka/clients/consumer/internals/Fetcher.java | 7 +- .../kafka/common/requests/HeartbeatResponse.java | 5 +- .../kafka/common/requests/JoinGroupResponse.java | 6 +- .../kafka/coordinator/ConsumerCoordinator.scala | 32 ++++---- .../kafka/coordinator/ConsumerGroupMetadata.scala | 24 ++++-- .../kafka/coordinator/CoordinatorMetadata.scala | 11 +++ .../main/scala/kafka/network/RequestChannel.scala | 9 ++- core/src/main/scala/kafka/server/KafkaConfig.scala | 28 +++++++ core/src/test/resources/log4j.properties | 6 +- .../integration/kafka/api/ConsumerBounceTest.scala | 10 ++- .../scala/integration/kafka/api/ConsumerTest.scala | 14 ++-- .../kafka/server/KafkaConfigConfigDefTest.scala | 5 ++ 13 files changed, 167 insertions(+), 77 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index b2764df..57f985d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -120,30 +120,57 @@ public final class Coordinator { // send a join group request to the coordinator log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics); - JoinGroupRequest request = new JoinGroupRequest(groupId, - (int) this.sessionTimeoutMs, - subscribedTopics, - this.consumerId, - this.assignmentStrategy); - ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now); - - // process the response - JoinGroupResponse response = new JoinGroupResponse(resp.responseBody()); - // TODO: needs to handle disconnects and errors, should not just throw exceptions - Errors.forCode(response.errorCode()).maybeThrow(); - this.consumerId = response.consumerId(); - this.generation = response.generationId(); - - // set the flag to refresh last committed offsets - this.subscriptions.needRefreshCommits(); - - log.debug("Joined group: {}", response); - - // record re-assignment time - this.sensors.partitionReassignments.record(time.milliseconds() - now); - - // return assigned partitions - return response.assignedPartitions(); + // repeat processing the response until succeed or fatal error + do { + JoinGroupRequest request = new JoinGroupRequest(groupId, + (int) this.sessionTimeoutMs, + subscribedTopics, + this.consumerId, + this.assignmentStrategy); + + ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now); + JoinGroupResponse response = new JoinGroupResponse(resp.responseBody()); + short errorCode = response.errorCode(); + + if (errorCode == Errors.NONE.code()) { + this.consumerId = response.consumerId(); + this.generation = response.generationId(); + + // set the flag to refresh last committed offsets + this.subscriptions.needRefreshCommits(); + + log.debug("Joined group: {}", response); + + // record re-assignment time + this.sensors.partitionReassignments.record(time.milliseconds() - now); + + // return assigned partitions + return response.assignedPartitions(); + } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) { + // reset the consumer id and retry immediately + this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID; + log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.", + groupId); + } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() + || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { + // re-discover the coordinator and retry with backoff + coordinatorDead(); + Utils.sleep(this.retryBackoffMs); + + log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", + groupId); + } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code() + || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) { + // log the error and re-throw the exception + log.error("Attempt to join group {} failed due to: {}", + groupId, Errors.forCode(errorCode).exception().getMessage()); + Errors.forCode(errorCode).maybeThrow(); + } else { + // unexpected error, throw the exception + throw new KafkaException("Unexpected error in join group response: " + + Errors.forCode(response.errorCode()).exception().getMessage()); + } + } while (true); } /** @@ -217,7 +244,6 @@ public final class Coordinator { // parse the response to get the offsets boolean offsetsReady = true; OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody()); - // TODO: needs to handle disconnects Map offsets = new HashMap(response.responseData().size()); for (Map.Entry entry : response.responseData().entrySet()) { TopicPartition tp = entry.getKey(); @@ -239,7 +265,8 @@ public final class Coordinator { // just ignore this partition log.debug("Unknown topic or partition for " + tp); } else { - throw new IllegalStateException("Unexpected error code " + data.errorCode + " while fetching offset"); + throw new KafkaException("Unexpected error in fetch offset response: " + + Errors.forCode(data.errorCode).exception().getMessage()); } } else if (data.offset >= 0) { // record the position with the offset (-1 indicates no committed offset to fetch) @@ -471,9 +498,10 @@ public final class Coordinator { if (response.errorCode() == Errors.NONE.code()) { log.debug("Received successful heartbeat response."); } else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() - || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { + || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { coordinatorDead(); - } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) { + } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code() + || response.errorCode() == Errors.UNKNOWN_CONSUMER_ID.code()) { subscriptions.needReassignment(); } else { throw new KafkaException("Unexpected error in heartbeat response: " @@ -506,9 +534,10 @@ public final class Coordinator { log.debug("Committed offset {} for partition {}", offset, tp); subscriptions.committed(tp, offset); } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() - || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { + || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { coordinatorDead(); } else { + // do not need to throw the exception but just log the error log.error("Error committing partition {} at offset {}: {}", tp, offset, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index ef9dd52..c5e577f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -231,13 +231,14 @@ public class Fetcher { log.debug("Fetched offset {} for partition {}", offset, topicPartition); return offset; } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() - || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { + || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", topicPartition); awaitMetadataUpdate(); } else { - // TODO: we should not just throw exceptions but should handle and log it. - Errors.forCode(errorCode).maybeThrow(); + log.error("Attempt to fetch offsets for partition {} failed due to: {}", + topicPartition, Errors.forCode(errorCode).exception().getMessage()); + awaitMetadataUpdate(); } } } else { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java index f548cd0..96e6ab0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java @@ -27,7 +27,10 @@ public class HeartbeatResponse extends AbstractRequestResponse { /** * Possible error code: * - * TODO + * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_CONSUMER (16) + * ILLEGAL_GENERATION (22) + * UNKNOWN_CONSUMER_ID (25) */ private final short errorCode; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index fd9c545..6512a08 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -30,7 +30,11 @@ public class JoinGroupResponse extends AbstractRequestResponse { /** * Possible error code: * - * TODO + * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_CONSUMER (16) + * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24) + * UNKNOWN_CONSUMER_ID (25) + * INVALID_SESSION_TIMEOUT (26) */ private static final String GENERATION_ID_KEY_NAME = "group_generation_id"; diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index af06ad4..89df13a 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -25,11 +25,6 @@ import org.apache.kafka.common.requests.JoinGroupRequest import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean -// TODO: expose MinSessionTimeoutMs and MaxSessionTimeoutMs in broker configs -object ConsumerCoordinator { - private val MinSessionTimeoutMs = 6000 - private val MaxSessionTimeoutMs = 30000 -} /** * ConsumerCoordinator handles consumer group and consumer offset management. @@ -41,7 +36,6 @@ object ConsumerCoordinator { class ConsumerCoordinator(val config: KafkaConfig, val zkClient: ZkClient, val offsetManager: OffsetManager) extends Logging { - import ConsumerCoordinator._ this.logIdent = "[ConsumerCoordinator " + config.brokerId + "]: " @@ -93,20 +87,17 @@ class ConsumerCoordinator(val config: KafkaConfig, responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code) } else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) { responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code) - } else if (sessionTimeoutMs < MinSessionTimeoutMs || sessionTimeoutMs > MaxSessionTimeoutMs) { + } else if (sessionTimeoutMs < config.consumerMinSessionTimeoutMs || sessionTimeoutMs > config.consumerMaxSessionTimeoutMs) { responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code) } else { - val group = coordinatorMetadata.getGroup(groupId) + // if the group is not unknown, create the group and add this consumer to it; + // the consumer id could either be UNKNOWN or some specific value, as the consumer + // group could just be migrated to this coordinator + var group = coordinatorMetadata.getGroup(groupId) if (group == null) { - if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) { - responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) - } else { - val group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) - doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) - } - } else { - doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) + group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) } + doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) } } @@ -118,10 +109,15 @@ class ConsumerCoordinator(val config: KafkaConfig, responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) { group synchronized { if (group.is(Dead)) { - responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) + // if the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator, so send the response to consumer to let it retry + responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code) } else if (partitionAssignmentStrategy != group.partitionAssignmentStrategy) { responseCallback(Set.empty, consumerId, 0, Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code) } else if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID && !group.has(consumerId)) { + // if the consumer trying to register with a un-recognized id, send the response to let + // it reset its consumer id and retry responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) } else if (group.has(consumerId) && group.is(Stable) && topics == group.get(consumerId).topics) { /* @@ -174,7 +170,7 @@ class ConsumerCoordinator(val config: KafkaConfig, } else { group synchronized { if (group.is(Dead)) { - responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) + responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code) } else if (!group.has(consumerId)) { responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) } else if (generationId != group.generationId) { diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala index 47bdfa7..c62428d 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala @@ -62,6 +62,14 @@ private[coordinator] case object Stable extends GroupState { val state: Byte = 3 private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } +private object ConsumerGroupMetadata { + private val validPreviousStates: Map[GroupState, Set[GroupState]] = + Map(Dead -> Set(PreparingRebalance), + Stable -> Set(Rebalancing), + PreparingRebalance -> Set(Stable), + Rebalancing -> Set(PreparingRebalance)) +} + /** * Group contains the following metadata: * @@ -77,12 +85,6 @@ private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } private[coordinator] class ConsumerGroupMetadata(val groupId: String, val partitionAssignmentStrategy: String) { - private val validPreviousStates: Map[GroupState, Set[GroupState]] = - Map(Dead -> Set(PreparingRebalance), - Stable -> Set(Rebalancing), - PreparingRebalance -> Set(Stable), - Rebalancing -> Set(PreparingRebalance)) - private val consumers = new mutable.HashMap[String, ConsumerMetadata] private var state: GroupState = Stable var generationId = 0 @@ -91,6 +93,12 @@ private[coordinator] class ConsumerGroupMetadata(val groupId: String, def has(consumerId: String) = consumers.contains(consumerId) def get(consumerId: String) = consumers(consumerId) + def reset() { + consumers.clear() + state = Stable + generationId = 0 + } + def add(consumerId: String, consumer: ConsumerMetadata) { consumers.put(consumerId, consumer) } @@ -124,8 +132,8 @@ private[coordinator] class ConsumerGroupMetadata(val groupId: String, } private def assertValidTransition(targetState: GroupState) { - if (!validPreviousStates(targetState).contains(state)) + if (!ConsumerGroupMetadata.validPreviousStates(targetState).contains(state)) throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state" - .format(groupId, validPreviousStates(targetState).mkString(","), targetState, state)) + .format(groupId, ConsumerGroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state)) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index c39e6de..faef181 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -85,6 +85,17 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, } /** + * Try to add the group if it does not exist yet + */ + def tryAddGroup(group: ConsumerGroupMetadata) { + inWriteLock(metadataLock) { + if (groups.get(group.groupId).isEmpty) + groups.put(group.groupId, group) + } + } + + + /** * Remove all metadata associated with the group, including its topics * @param groupId the groupId of the group we are removing * @param topicsForGroup topics that consumers in the group were subscribed to diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 1d0024c..24edb61 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -49,11 +49,14 @@ object RequestChannel extends Logging { @volatile var responseCompleteTimeMs = -1L @volatile var responseDequeueTimeMs = -1L val requestId = buffer.getShort() + // for server-side request / response format + // TODO: this will be removed once we migrated to client-side format val requestObj = if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId)) RequestKeys.deserializerForKey(requestId)(buffer) else null + // for client-side request / response format val header: RequestHeader = if (requestObj == null) { buffer.rewind @@ -68,7 +71,7 @@ object RequestChannel extends Logging { buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger") - trace("Processor %d received request : %s".format(processor, requestObj)) + trace("Processor %d received request : %s".format(processor, if (requestObj != null) requestObj.describe(false) else header.toString + " : " + body.toString)) def updateRequestMetrics() { val endTimeMs = SystemTime.milliseconds @@ -101,10 +104,10 @@ object RequestChannel extends Logging { } if(requestLogger.isTraceEnabled) requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" - .format(requestObj.describe(true), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) + .format(if (requestObj != null) requestObj.describe(true) else header.toString + " : " + body.toString, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) else if(requestLogger.isDebugEnabled) { requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" - .format(requestObj.describe(false), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) + .format(if (requestObj != null) requestObj.describe(false) else header.toString + " : " + body.toString, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) } } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9efa15c..6f25afd 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -111,6 +111,10 @@ object Defaults { val ControlledShutdownRetryBackoffMs = 5000 val ControlledShutdownEnable = true + /** ********* Consumer coordinator configuration ***********/ + val ConsumerMinSessionTimeoutMs = 6000 + val ConsumerMaxSessionTimeoutMs = 30000 + /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSize = OffsetManagerConfig.DefaultMaxMetadataSize val OffsetsLoadBufferSize = OffsetManagerConfig.DefaultLoadBufferSize @@ -218,6 +222,9 @@ object KafkaConfig { val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries" val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms" val ControlledShutdownEnableProp = "controlled.shutdown.enable" + /** ********* Consumer coordinator configuration ***********/ + val ConsumerMinSessionTimeoutMsProp = "consumer.min.session.timeout.ms" + val ConsumerMaxSessionTimeoutMsProp = "consumer.max.session.timeout.ms" /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes" val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size" @@ -343,6 +350,9 @@ object KafkaConfig { val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens" val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying." val ControlledShutdownEnableDoc = "Enable controlled shutdown of the server" + /** ********* Consumer coordinator configuration ***********/ + val ConsumerMinSessionTimeoutMsDoc = "The minimum allowed session timeout for registered consumers" + val ConsumerMaxSessionTimeoutMsDoc = "The maximum allowed session timeout for registered consumers" /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit" val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading offsets into the cache." @@ -461,11 +471,16 @@ object KafkaConfig { .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc) .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc) .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc) + /** ********* Controlled shutdown configuration ***********/ .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc) .define(ControlledShutdownRetryBackoffMsProp, INT, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc) .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc) + /** ********* Consumer coordinator configuration ***********/ + .define(ConsumerMinSessionTimeoutMsProp, INT, Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc) + .define(ConsumerMaxSessionTimeoutMsProp, INT, Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc) + /** ********* Offset management configuration ***********/ .define(OffsetMetadataMaxSizeProp, INT, Defaults.OffsetMetadataMaxSize, HIGH, OffsetMetadataMaxSizeDoc) .define(OffsetsLoadBufferSizeProp, INT, Defaults.OffsetsLoadBufferSize, atLeast(1), HIGH, OffsetsLoadBufferSizeDoc) @@ -581,11 +596,16 @@ object KafkaConfig { uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean], interBrokerSecurityProtocol = SecurityProtocol.valueOf(parsed.get(InterBrokerSecurityProtocolProp).asInstanceOf[String]), interBrokerProtocolVersion = ApiVersion(parsed.get(InterBrokerProtocolVersionProp).asInstanceOf[String]), + /** ********* Controlled shutdown configuration ***********/ controlledShutdownMaxRetries = parsed.get(ControlledShutdownMaxRetriesProp).asInstanceOf[Int], controlledShutdownRetryBackoffMs = parsed.get(ControlledShutdownRetryBackoffMsProp).asInstanceOf[Int], controlledShutdownEnable = parsed.get(ControlledShutdownEnableProp).asInstanceOf[Boolean], + /** ********* Consumer coordinator configuration ***********/ + consumerMinSessionTimeoutMs = parsed.get(ConsumerMinSessionTimeoutMsProp).asInstanceOf[Int], + consumerMaxSessionTimeoutMs = parsed.get(ConsumerMaxSessionTimeoutMsProp).asInstanceOf[Int], + /** ********* Offset management configuration ***********/ offsetMetadataMaxSize = parsed.get(OffsetMetadataMaxSizeProp).asInstanceOf[Int], offsetsLoadBufferSize = parsed.get(OffsetsLoadBufferSizeProp).asInstanceOf[Int], @@ -729,6 +749,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val controlledShutdownRetryBackoffMs: Int = Defaults.ControlledShutdownRetryBackoffMs, val controlledShutdownEnable: Boolean = Defaults.ControlledShutdownEnable, + /** ********* Consumer coordinator configuration ***********/ + val consumerMinSessionTimeoutMs: Int = Defaults.ConsumerMinSessionTimeoutMs, + val consumerMaxSessionTimeoutMs: Int = Defaults.ConsumerMaxSessionTimeoutMs, + /** ********* Offset management configuration ***********/ val offsetMetadataMaxSize: Int = Defaults.OffsetMetadataMaxSize, val offsetsLoadBufferSize: Int = Defaults.OffsetsLoadBufferSize, @@ -951,6 +975,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(ControlledShutdownRetryBackoffMsProp, controlledShutdownRetryBackoffMs.toString) props.put(ControlledShutdownEnableProp, controlledShutdownEnable.toString) + /** ********* Consumer coordinator configuration ***********/ + props.put(ConsumerMinSessionTimeoutMsProp, consumerMinSessionTimeoutMs.toString) + props.put(ConsumerMaxSessionTimeoutMsProp, consumerMaxSessionTimeoutMs.toString) + /** ********* Offset management configuration ***********/ props.put(OffsetMetadataMaxSizeProp, offsetMetadataMaxSize.toString) props.put(OffsetsLoadBufferSizeProp, offsetsLoadBufferSize.toString) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 1b7d5d8..9973dad 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,14 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=OFF, stdout +log4j.rootLogger=TRACE, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=ERROR -log4j.logger.org.apache.kafka=ERROR +log4j.logger.kafka=TRACE +log4j.logger.org.apache.kafka=TRACE # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 5c4cca6..9a993d9 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -42,12 +42,14 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { val tp = new TopicPartition(topic, part) // configure the servers and clients - this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown - this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset - this.serverConfig.setProperty("offsets.topic.num.partitions", "1") + this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") + this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "10") // set small enough session timeout this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") override def generateConfigs() = { @@ -62,7 +64,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers) } - def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5) + def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(20) /* * 1. Produce a bunch of messages diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index a1eed96..17b17b9 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -24,8 +24,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.NoOffsetForPartitionException -import kafka.utils.{ShutdownableThread, TestUtils, Logging} -import kafka.server.OffsetManager +import kafka.utils.{TestUtils, Logging} +import kafka.server.{KafkaConfig, OffsetManager} import java.util.ArrayList import org.junit.Assert._ @@ -47,9 +47,10 @@ class ConsumerTest extends IntegrationTestHarness with Logging { val tp = new TopicPartition(topic, part) // configure the servers and clients - this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown - this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset - this.serverConfig.setProperty("offsets.topic.num.partitions", "1") + this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") + this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) @@ -146,8 +147,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertNull(this.consumers(0).partitionsFor("non-exist-topic")) } - // TODO: fix test after fixing consumer-side Coordinator logic - def failingTestPartitionReassignmentCallback() { + def testPartitionReassignmentCallback() { val callback = new TestConsumerReassignmentCallback() this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer()) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 8014a5a..71f48c0 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -133,6 +133,9 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { Assert.assertEquals(expectedConfig.controlledShutdownRetryBackoffMs, actualConfig.controlledShutdownRetryBackoffMs) Assert.assertEquals(expectedConfig.controlledShutdownEnable, actualConfig.controlledShutdownEnable) + Assert.assertEquals(expectedConfig.consumerMinSessionTimeoutMs, actualConfig.consumerMinSessionTimeoutMs) + Assert.assertEquals(expectedConfig.consumerMaxSessionTimeoutMs, actualConfig.consumerMaxSessionTimeoutMs) + Assert.assertEquals(expectedConfig.offsetMetadataMaxSize, actualConfig.offsetMetadataMaxSize) Assert.assertEquals(expectedConfig.offsetsLoadBufferSize, actualConfig.offsetsLoadBufferSize) Assert.assertEquals(expectedConfig.offsetsTopicReplicationFactor, actualConfig.offsetsTopicReplicationFactor) @@ -330,6 +333,8 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") + case KafkaConfig.ConsumerMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ConsumerMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") -- 1.7.12.4 From f3e7f2859c4b6f9686f29ff90d076bf5ac6c4c4c Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 20 May 2015 19:04:45 -0700 Subject: [PATCH 2/5] revert log4j --- core/src/test/resources/log4j.properties | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 9973dad..1b7d5d8 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,14 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=TRACE, stdout +log4j.rootLogger=OFF, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=TRACE -log4j.logger.org.apache.kafka=TRACE +log4j.logger.kafka=ERROR +log4j.logger.org.apache.kafka=ERROR # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN -- 1.7.12.4 From cbf4d6979cc806ffa74d034277b8e6ceff7718fe Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 20 May 2015 19:14:49 -0700 Subject: [PATCH 3/5] dummy --- .../main/scala/kafka/coordinator/ConsumerGroupMetadata.scala | 6 ------ .../main/scala/kafka/coordinator/CoordinatorMetadata.scala | 11 ----------- 2 files changed, 17 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala index c62428d..0e3657f 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala @@ -93,12 +93,6 @@ private[coordinator] class ConsumerGroupMetadata(val groupId: String, def has(consumerId: String) = consumers.contains(consumerId) def get(consumerId: String) = consumers(consumerId) - def reset() { - consumers.clear() - state = Stable - generationId = 0 - } - def add(consumerId: String, consumer: ConsumerMetadata) { consumers.put(consumerId, consumer) } diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index faef181..c39e6de 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -85,17 +85,6 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, } /** - * Try to add the group if it does not exist yet - */ - def tryAddGroup(group: ConsumerGroupMetadata) { - inWriteLock(metadataLock) { - if (groups.get(group.groupId).isEmpty) - groups.put(group.groupId, group) - } - } - - - /** * Remove all metadata associated with the group, including its topics * @param groupId the groupId of the group we are removing * @param topicsForGroup topics that consumers in the group were subscribed to -- 1.7.12.4 From dcbe4b1e653f12be51a6e539dafde9174035b8af Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sun, 31 May 2015 17:04:19 -0700 Subject: [PATCH 4/5] v2 --- .../clients/consumer/internals/Coordinator.java | 9 +++- .../kafka/coordinator/ConsumerCoordinator.scala | 53 ++++++++++++++++------ 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index 57f985d..7492e03 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -499,9 +499,14 @@ public final class Coordinator { log.debug("Received successful heartbeat response."); } else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { + log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead."); coordinatorDead(); - } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code() - || response.errorCode() == Errors.UNKNOWN_CONSUMER_ID.code()) { + } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) { + log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group."); + subscriptions.needReassignment(); + } else if (response.errorCode() == Errors.UNKNOWN_CONSUMER_ID.code()) { + log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group."); + consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID; subscriptions.needReassignment(); } else { throw new KafkaException("Unexpected error in heartbeat response: " diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 89df13a..31b8962 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -90,14 +90,20 @@ class ConsumerCoordinator(val config: KafkaConfig, } else if (sessionTimeoutMs < config.consumerMinSessionTimeoutMs || sessionTimeoutMs > config.consumerMaxSessionTimeoutMs) { responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code) } else { - // if the group is not unknown, create the group and add this consumer to it; - // the consumer id could either be UNKNOWN or some specific value, as the consumer - // group could just be migrated to this coordinator + // only try to create the group if the group is not unknown AND + // the consumer id is UNKNOWN, if consumer is specified but group does not + // exist we should reject the request var group = coordinatorMetadata.getGroup(groupId) if (group == null) { - group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) + if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) { + responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code()) + } else { + group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) + doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) + } + } else { + doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) } - doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) } } @@ -111,8 +117,9 @@ class ConsumerCoordinator(val config: KafkaConfig, if (group.is(Dead)) { // if the group is marked as dead, it means some other thread has just removed the group // from the coordinator metadata; this is likely that the group has migrated to some other - // coordinator, so send the response to consumer to let it retry - responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code) + // coordinator OR the group is in a transient unstable phase. Let the consumer to retry + // joining without specified consumer id, + responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) } else if (partitionAssignmentStrategy != group.partitionAssignmentStrategy) { responseCallback(Set.empty, consumerId, 0, Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code) } else if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID && !group.has(consumerId)) { @@ -166,11 +173,15 @@ class ConsumerCoordinator(val config: KafkaConfig, } else { val group = coordinatorMetadata.getGroup(groupId) if (group == null) { + // if the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the consumer to retry + // joining without specified consumer id, responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) } else { group synchronized { if (group.is(Dead)) { - responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code) + responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) } else if (!group.has(consumerId)) { responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) } else if (generationId != group.generationId) { @@ -200,6 +211,18 @@ class ConsumerCoordinator(val config: KafkaConfig, heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(consumerKey)) } + private def removeGroup(group: ConsumerGroupMetadata) { + // first remove all its members + for (member <- group.allConsumers) { + removeConsumer(group, member) + } + + // then mark it as dead and remove + group.transitionTo(Dead) + info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId)) + coordinatorMetadata.removeGroup(group.groupId, group.topics) + } + private def addConsumer(consumerId: String, topics: Set[String], sessionTimeoutMs: Int, @@ -215,6 +238,7 @@ class ConsumerCoordinator(val config: KafkaConfig, group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- group.topics coordinatorMetadata.unbindGroupFromTopics(group.groupId, topicsToUnbind) + heartbeatPurgatory.checkAndComplete(ConsumerKey(consumer.groupId, consumer.consumerId)) } private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { @@ -228,7 +252,11 @@ class ConsumerCoordinator(val config: KafkaConfig, private def maybePrepareRebalance(group: ConsumerGroupMetadata) { group synchronized { - if (group.canRebalance) + // if the group no longer belongs to the coordinator, + // just remove its registry and do not try rebalance + if (!isCoordinatorForGroup(group.groupId)) { + info("Group %s no longer belongs to me, removing it".format(group.groupId)) + } else if (group.canRebalance) prepareRebalance(group) } } @@ -298,11 +326,8 @@ class ConsumerCoordinator(val config: KafkaConfig, // TODO: cut the socket connection to the consumer } - if (group.isEmpty) { - group.transitionTo(Dead) - info("Group %s generation %s is dead".format(group.groupId, group.generationId)) - coordinatorMetadata.removeGroup(group.groupId, group.topics) - } + if (group.isEmpty) + removeGroup(group) } if (!group.is(Dead)) { // assign partitions to existing consumers of the group according to the partitioning strategy -- 1.7.12.4 From 0cc3af6cc6de9542e19290fd969cc2c0ff1ea93b Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 2 Jun 2015 10:45:25 -0700 Subject: [PATCH 5/5] v3 --- .../org/apache/kafka/clients/consumer/internals/Coordinator.java | 1 + .../java/org/apache/kafka/common/requests/JoinGroupResponse.java | 1 + core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala | 8 ++------ 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index 7492e03..fac7995 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -160,6 +160,7 @@ public final class Coordinator { log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", groupId); } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code() + || errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code() || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) { // log the error and re-throw the exception log.error("Attempt to join group {} failed due to: {}", diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index 6512a08..8d418cd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -32,6 +32,7 @@ public class JoinGroupResponse extends AbstractRequestResponse { * * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) * NOT_COORDINATOR_FOR_CONSUMER (16) + * INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY (23) * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24) * UNKNOWN_CONSUMER_ID (25) * INVALID_SESSION_TIMEOUT (26) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 31b8962..c044d01 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -96,7 +96,7 @@ class ConsumerCoordinator(val config: KafkaConfig, var group = coordinatorMetadata.getGroup(groupId) if (group == null) { if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) { - responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code()) + responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) } else { group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) @@ -252,11 +252,7 @@ class ConsumerCoordinator(val config: KafkaConfig, private def maybePrepareRebalance(group: ConsumerGroupMetadata) { group synchronized { - // if the group no longer belongs to the coordinator, - // just remove its registry and do not try rebalance - if (!isCoordinatorForGroup(group.groupId)) { - info("Group %s no longer belongs to me, removing it".format(group.groupId)) - } else if (group.canRebalance) + if (group.canRebalance) prepareRebalance(group) } } -- 1.7.12.4