From ba9478e372fcb1f4e390aefd824c2e5c7d68de7c Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 20 May 2015 19:01:25 -0700 Subject: [PATCH 1/2] dummy --- .../clients/consumer/internals/Coordinator.java | 87 ++++++++++++++-------- .../kafka/clients/consumer/internals/Fetcher.java | 7 +- .../kafka/common/requests/HeartbeatResponse.java | 5 +- .../kafka/common/requests/JoinGroupResponse.java | 6 +- .../kafka/coordinator/ConsumerCoordinator.scala | 32 ++++---- .../kafka/coordinator/ConsumerGroupMetadata.scala | 24 ++++-- .../kafka/coordinator/CoordinatorMetadata.scala | 11 +++ .../main/scala/kafka/network/RequestChannel.scala | 9 ++- core/src/main/scala/kafka/server/KafkaConfig.scala | 28 +++++++ core/src/test/resources/log4j.properties | 6 +- .../integration/kafka/api/ConsumerBounceTest.scala | 10 ++- .../scala/integration/kafka/api/ConsumerTest.scala | 14 ++-- .../kafka/server/KafkaConfigConfigDefTest.scala | 5 ++ 13 files changed, 167 insertions(+), 77 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index b2764df..57f985d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -120,30 +120,57 @@ public final class Coordinator { // send a join group request to the coordinator log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics); - JoinGroupRequest request = new JoinGroupRequest(groupId, - (int) this.sessionTimeoutMs, - subscribedTopics, - this.consumerId, - this.assignmentStrategy); - ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now); - - // process the response - JoinGroupResponse response = new JoinGroupResponse(resp.responseBody()); - // TODO: needs to handle disconnects and errors, should not just throw exceptions - Errors.forCode(response.errorCode()).maybeThrow(); - this.consumerId = response.consumerId(); - this.generation = response.generationId(); - - // set the flag to refresh last committed offsets - this.subscriptions.needRefreshCommits(); - - log.debug("Joined group: {}", response); - - // record re-assignment time - this.sensors.partitionReassignments.record(time.milliseconds() - now); - - // return assigned partitions - return response.assignedPartitions(); + // repeat processing the response until succeed or fatal error + do { + JoinGroupRequest request = new JoinGroupRequest(groupId, + (int) this.sessionTimeoutMs, + subscribedTopics, + this.consumerId, + this.assignmentStrategy); + + ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(), null, now); + JoinGroupResponse response = new JoinGroupResponse(resp.responseBody()); + short errorCode = response.errorCode(); + + if (errorCode == Errors.NONE.code()) { + this.consumerId = response.consumerId(); + this.generation = response.generationId(); + + // set the flag to refresh last committed offsets + this.subscriptions.needRefreshCommits(); + + log.debug("Joined group: {}", response); + + // record re-assignment time + this.sensors.partitionReassignments.record(time.milliseconds() - now); + + // return assigned partitions + return response.assignedPartitions(); + } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) { + // reset the consumer id and retry immediately + this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID; + log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.", + groupId); + } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() + || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { + // re-discover the coordinator and retry with backoff + coordinatorDead(); + Utils.sleep(this.retryBackoffMs); + + log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", + groupId); + } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code() + || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) { + // log the error and re-throw the exception + log.error("Attempt to join group {} failed due to: {}", + groupId, Errors.forCode(errorCode).exception().getMessage()); + Errors.forCode(errorCode).maybeThrow(); + } else { + // unexpected error, throw the exception + throw new KafkaException("Unexpected error in join group response: " + + Errors.forCode(response.errorCode()).exception().getMessage()); + } + } while (true); } /** @@ -217,7 +244,6 @@ public final class Coordinator { // parse the response to get the offsets boolean offsetsReady = true; OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody()); - // TODO: needs to handle disconnects Map offsets = new HashMap(response.responseData().size()); for (Map.Entry entry : response.responseData().entrySet()) { TopicPartition tp = entry.getKey(); @@ -239,7 +265,8 @@ public final class Coordinator { // just ignore this partition log.debug("Unknown topic or partition for " + tp); } else { - throw new IllegalStateException("Unexpected error code " + data.errorCode + " while fetching offset"); + throw new KafkaException("Unexpected error in fetch offset response: " + + Errors.forCode(data.errorCode).exception().getMessage()); } } else if (data.offset >= 0) { // record the position with the offset (-1 indicates no committed offset to fetch) @@ -471,9 +498,10 @@ public final class Coordinator { if (response.errorCode() == Errors.NONE.code()) { log.debug("Received successful heartbeat response."); } else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() - || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { + || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { coordinatorDead(); - } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) { + } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code() + || response.errorCode() == Errors.UNKNOWN_CONSUMER_ID.code()) { subscriptions.needReassignment(); } else { throw new KafkaException("Unexpected error in heartbeat response: " @@ -506,9 +534,10 @@ public final class Coordinator { log.debug("Committed offset {} for partition {}", offset, tp); subscriptions.committed(tp, offset); } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() - || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { + || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { coordinatorDead(); } else { + // do not need to throw the exception but just log the error log.error("Error committing partition {} at offset {}: {}", tp, offset, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index ef9dd52..c5e577f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -231,13 +231,14 @@ public class Fetcher { log.debug("Fetched offset {} for partition {}", offset, topicPartition); return offset; } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() - || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { + || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", topicPartition); awaitMetadataUpdate(); } else { - // TODO: we should not just throw exceptions but should handle and log it. - Errors.forCode(errorCode).maybeThrow(); + log.error("Attempt to fetch offsets for partition {} failed due to: {}", + topicPartition, Errors.forCode(errorCode).exception().getMessage()); + awaitMetadataUpdate(); } } } else { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java index f548cd0..96e6ab0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java @@ -27,7 +27,10 @@ public class HeartbeatResponse extends AbstractRequestResponse { /** * Possible error code: * - * TODO + * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_CONSUMER (16) + * ILLEGAL_GENERATION (22) + * UNKNOWN_CONSUMER_ID (25) */ private final short errorCode; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index fd9c545..6512a08 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -30,7 +30,11 @@ public class JoinGroupResponse extends AbstractRequestResponse { /** * Possible error code: * - * TODO + * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_CONSUMER (16) + * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24) + * UNKNOWN_CONSUMER_ID (25) + * INVALID_SESSION_TIMEOUT (26) */ private static final String GENERATION_ID_KEY_NAME = "group_generation_id"; diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index af06ad4..89df13a 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -25,11 +25,6 @@ import org.apache.kafka.common.requests.JoinGroupRequest import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean -// TODO: expose MinSessionTimeoutMs and MaxSessionTimeoutMs in broker configs -object ConsumerCoordinator { - private val MinSessionTimeoutMs = 6000 - private val MaxSessionTimeoutMs = 30000 -} /** * ConsumerCoordinator handles consumer group and consumer offset management. @@ -41,7 +36,6 @@ object ConsumerCoordinator { class ConsumerCoordinator(val config: KafkaConfig, val zkClient: ZkClient, val offsetManager: OffsetManager) extends Logging { - import ConsumerCoordinator._ this.logIdent = "[ConsumerCoordinator " + config.brokerId + "]: " @@ -93,20 +87,17 @@ class ConsumerCoordinator(val config: KafkaConfig, responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code) } else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) { responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code) - } else if (sessionTimeoutMs < MinSessionTimeoutMs || sessionTimeoutMs > MaxSessionTimeoutMs) { + } else if (sessionTimeoutMs < config.consumerMinSessionTimeoutMs || sessionTimeoutMs > config.consumerMaxSessionTimeoutMs) { responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code) } else { - val group = coordinatorMetadata.getGroup(groupId) + // if the group is not unknown, create the group and add this consumer to it; + // the consumer id could either be UNKNOWN or some specific value, as the consumer + // group could just be migrated to this coordinator + var group = coordinatorMetadata.getGroup(groupId) if (group == null) { - if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) { - responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) - } else { - val group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) - doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) - } - } else { - doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) + group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) } + doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) } } @@ -118,10 +109,15 @@ class ConsumerCoordinator(val config: KafkaConfig, responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) { group synchronized { if (group.is(Dead)) { - responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) + // if the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator, so send the response to consumer to let it retry + responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code) } else if (partitionAssignmentStrategy != group.partitionAssignmentStrategy) { responseCallback(Set.empty, consumerId, 0, Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code) } else if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID && !group.has(consumerId)) { + // if the consumer trying to register with a un-recognized id, send the response to let + // it reset its consumer id and retry responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) } else if (group.has(consumerId) && group.is(Stable) && topics == group.get(consumerId).topics) { /* @@ -174,7 +170,7 @@ class ConsumerCoordinator(val config: KafkaConfig, } else { group synchronized { if (group.is(Dead)) { - responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) + responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code) } else if (!group.has(consumerId)) { responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) } else if (generationId != group.generationId) { diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala index 47bdfa7..c62428d 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala @@ -62,6 +62,14 @@ private[coordinator] case object Stable extends GroupState { val state: Byte = 3 private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } +private object ConsumerGroupMetadata { + private val validPreviousStates: Map[GroupState, Set[GroupState]] = + Map(Dead -> Set(PreparingRebalance), + Stable -> Set(Rebalancing), + PreparingRebalance -> Set(Stable), + Rebalancing -> Set(PreparingRebalance)) +} + /** * Group contains the following metadata: * @@ -77,12 +85,6 @@ private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } private[coordinator] class ConsumerGroupMetadata(val groupId: String, val partitionAssignmentStrategy: String) { - private val validPreviousStates: Map[GroupState, Set[GroupState]] = - Map(Dead -> Set(PreparingRebalance), - Stable -> Set(Rebalancing), - PreparingRebalance -> Set(Stable), - Rebalancing -> Set(PreparingRebalance)) - private val consumers = new mutable.HashMap[String, ConsumerMetadata] private var state: GroupState = Stable var generationId = 0 @@ -91,6 +93,12 @@ private[coordinator] class ConsumerGroupMetadata(val groupId: String, def has(consumerId: String) = consumers.contains(consumerId) def get(consumerId: String) = consumers(consumerId) + def reset() { + consumers.clear() + state = Stable + generationId = 0 + } + def add(consumerId: String, consumer: ConsumerMetadata) { consumers.put(consumerId, consumer) } @@ -124,8 +132,8 @@ private[coordinator] class ConsumerGroupMetadata(val groupId: String, } private def assertValidTransition(targetState: GroupState) { - if (!validPreviousStates(targetState).contains(state)) + if (!ConsumerGroupMetadata.validPreviousStates(targetState).contains(state)) throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state" - .format(groupId, validPreviousStates(targetState).mkString(","), targetState, state)) + .format(groupId, ConsumerGroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state)) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index c39e6de..faef181 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -85,6 +85,17 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, } /** + * Try to add the group if it does not exist yet + */ + def tryAddGroup(group: ConsumerGroupMetadata) { + inWriteLock(metadataLock) { + if (groups.get(group.groupId).isEmpty) + groups.put(group.groupId, group) + } + } + + + /** * Remove all metadata associated with the group, including its topics * @param groupId the groupId of the group we are removing * @param topicsForGroup topics that consumers in the group were subscribed to diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 1d0024c..24edb61 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -49,11 +49,14 @@ object RequestChannel extends Logging { @volatile var responseCompleteTimeMs = -1L @volatile var responseDequeueTimeMs = -1L val requestId = buffer.getShort() + // for server-side request / response format + // TODO: this will be removed once we migrated to client-side format val requestObj = if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId)) RequestKeys.deserializerForKey(requestId)(buffer) else null + // for client-side request / response format val header: RequestHeader = if (requestObj == null) { buffer.rewind @@ -68,7 +71,7 @@ object RequestChannel extends Logging { buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger") - trace("Processor %d received request : %s".format(processor, requestObj)) + trace("Processor %d received request : %s".format(processor, if (requestObj != null) requestObj.describe(false) else header.toString + " : " + body.toString)) def updateRequestMetrics() { val endTimeMs = SystemTime.milliseconds @@ -101,10 +104,10 @@ object RequestChannel extends Logging { } if(requestLogger.isTraceEnabled) requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" - .format(requestObj.describe(true), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) + .format(if (requestObj != null) requestObj.describe(true) else header.toString + " : " + body.toString, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) else if(requestLogger.isDebugEnabled) { requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" - .format(requestObj.describe(false), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) + .format(if (requestObj != null) requestObj.describe(false) else header.toString + " : " + body.toString, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) } } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9efa15c..6f25afd 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -111,6 +111,10 @@ object Defaults { val ControlledShutdownRetryBackoffMs = 5000 val ControlledShutdownEnable = true + /** ********* Consumer coordinator configuration ***********/ + val ConsumerMinSessionTimeoutMs = 6000 + val ConsumerMaxSessionTimeoutMs = 30000 + /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSize = OffsetManagerConfig.DefaultMaxMetadataSize val OffsetsLoadBufferSize = OffsetManagerConfig.DefaultLoadBufferSize @@ -218,6 +222,9 @@ object KafkaConfig { val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries" val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms" val ControlledShutdownEnableProp = "controlled.shutdown.enable" + /** ********* Consumer coordinator configuration ***********/ + val ConsumerMinSessionTimeoutMsProp = "consumer.min.session.timeout.ms" + val ConsumerMaxSessionTimeoutMsProp = "consumer.max.session.timeout.ms" /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes" val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size" @@ -343,6 +350,9 @@ object KafkaConfig { val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens" val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying." val ControlledShutdownEnableDoc = "Enable controlled shutdown of the server" + /** ********* Consumer coordinator configuration ***********/ + val ConsumerMinSessionTimeoutMsDoc = "The minimum allowed session timeout for registered consumers" + val ConsumerMaxSessionTimeoutMsDoc = "The maximum allowed session timeout for registered consumers" /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit" val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading offsets into the cache." @@ -461,11 +471,16 @@ object KafkaConfig { .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc) .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc) .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc) + /** ********* Controlled shutdown configuration ***********/ .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc) .define(ControlledShutdownRetryBackoffMsProp, INT, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc) .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc) + /** ********* Consumer coordinator configuration ***********/ + .define(ConsumerMinSessionTimeoutMsProp, INT, Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc) + .define(ConsumerMaxSessionTimeoutMsProp, INT, Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc) + /** ********* Offset management configuration ***********/ .define(OffsetMetadataMaxSizeProp, INT, Defaults.OffsetMetadataMaxSize, HIGH, OffsetMetadataMaxSizeDoc) .define(OffsetsLoadBufferSizeProp, INT, Defaults.OffsetsLoadBufferSize, atLeast(1), HIGH, OffsetsLoadBufferSizeDoc) @@ -581,11 +596,16 @@ object KafkaConfig { uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean], interBrokerSecurityProtocol = SecurityProtocol.valueOf(parsed.get(InterBrokerSecurityProtocolProp).asInstanceOf[String]), interBrokerProtocolVersion = ApiVersion(parsed.get(InterBrokerProtocolVersionProp).asInstanceOf[String]), + /** ********* Controlled shutdown configuration ***********/ controlledShutdownMaxRetries = parsed.get(ControlledShutdownMaxRetriesProp).asInstanceOf[Int], controlledShutdownRetryBackoffMs = parsed.get(ControlledShutdownRetryBackoffMsProp).asInstanceOf[Int], controlledShutdownEnable = parsed.get(ControlledShutdownEnableProp).asInstanceOf[Boolean], + /** ********* Consumer coordinator configuration ***********/ + consumerMinSessionTimeoutMs = parsed.get(ConsumerMinSessionTimeoutMsProp).asInstanceOf[Int], + consumerMaxSessionTimeoutMs = parsed.get(ConsumerMaxSessionTimeoutMsProp).asInstanceOf[Int], + /** ********* Offset management configuration ***********/ offsetMetadataMaxSize = parsed.get(OffsetMetadataMaxSizeProp).asInstanceOf[Int], offsetsLoadBufferSize = parsed.get(OffsetsLoadBufferSizeProp).asInstanceOf[Int], @@ -729,6 +749,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val controlledShutdownRetryBackoffMs: Int = Defaults.ControlledShutdownRetryBackoffMs, val controlledShutdownEnable: Boolean = Defaults.ControlledShutdownEnable, + /** ********* Consumer coordinator configuration ***********/ + val consumerMinSessionTimeoutMs: Int = Defaults.ConsumerMinSessionTimeoutMs, + val consumerMaxSessionTimeoutMs: Int = Defaults.ConsumerMaxSessionTimeoutMs, + /** ********* Offset management configuration ***********/ val offsetMetadataMaxSize: Int = Defaults.OffsetMetadataMaxSize, val offsetsLoadBufferSize: Int = Defaults.OffsetsLoadBufferSize, @@ -951,6 +975,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(ControlledShutdownRetryBackoffMsProp, controlledShutdownRetryBackoffMs.toString) props.put(ControlledShutdownEnableProp, controlledShutdownEnable.toString) + /** ********* Consumer coordinator configuration ***********/ + props.put(ConsumerMinSessionTimeoutMsProp, consumerMinSessionTimeoutMs.toString) + props.put(ConsumerMaxSessionTimeoutMsProp, consumerMaxSessionTimeoutMs.toString) + /** ********* Offset management configuration ***********/ props.put(OffsetMetadataMaxSizeProp, offsetMetadataMaxSize.toString) props.put(OffsetsLoadBufferSizeProp, offsetsLoadBufferSize.toString) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 1b7d5d8..9973dad 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,14 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=OFF, stdout +log4j.rootLogger=TRACE, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=ERROR -log4j.logger.org.apache.kafka=ERROR +log4j.logger.kafka=TRACE +log4j.logger.org.apache.kafka=TRACE # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 5c4cca6..9a993d9 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -42,12 +42,14 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { val tp = new TopicPartition(topic, part) // configure the servers and clients - this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown - this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset - this.serverConfig.setProperty("offsets.topic.num.partitions", "1") + this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") + this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "10") // set small enough session timeout this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") override def generateConfigs() = { @@ -62,7 +64,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers) } - def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5) + def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(20) /* * 1. Produce a bunch of messages diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index a1eed96..17b17b9 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -24,8 +24,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.NoOffsetForPartitionException -import kafka.utils.{ShutdownableThread, TestUtils, Logging} -import kafka.server.OffsetManager +import kafka.utils.{TestUtils, Logging} +import kafka.server.{KafkaConfig, OffsetManager} import java.util.ArrayList import org.junit.Assert._ @@ -47,9 +47,10 @@ class ConsumerTest extends IntegrationTestHarness with Logging { val tp = new TopicPartition(topic, part) // configure the servers and clients - this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown - this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset - this.serverConfig.setProperty("offsets.topic.num.partitions", "1") + this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") + this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) @@ -146,8 +147,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertNull(this.consumers(0).partitionsFor("non-exist-topic")) } - // TODO: fix test after fixing consumer-side Coordinator logic - def failingTestPartitionReassignmentCallback() { + def testPartitionReassignmentCallback() { val callback = new TestConsumerReassignmentCallback() this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer()) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 8014a5a..71f48c0 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -133,6 +133,9 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { Assert.assertEquals(expectedConfig.controlledShutdownRetryBackoffMs, actualConfig.controlledShutdownRetryBackoffMs) Assert.assertEquals(expectedConfig.controlledShutdownEnable, actualConfig.controlledShutdownEnable) + Assert.assertEquals(expectedConfig.consumerMinSessionTimeoutMs, actualConfig.consumerMinSessionTimeoutMs) + Assert.assertEquals(expectedConfig.consumerMaxSessionTimeoutMs, actualConfig.consumerMaxSessionTimeoutMs) + Assert.assertEquals(expectedConfig.offsetMetadataMaxSize, actualConfig.offsetMetadataMaxSize) Assert.assertEquals(expectedConfig.offsetsLoadBufferSize, actualConfig.offsetsLoadBufferSize) Assert.assertEquals(expectedConfig.offsetsTopicReplicationFactor, actualConfig.offsetsTopicReplicationFactor) @@ -330,6 +333,8 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") + case KafkaConfig.ConsumerMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ConsumerMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") -- 1.7.12.4 From e2d1a1947820b2757ef41db1d69b1d40e343b788 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 20 May 2015 19:04:45 -0700 Subject: [PATCH 2/2] revert log4j --- core/src/test/resources/log4j.properties | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 9973dad..1b7d5d8 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,14 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=TRACE, stdout +log4j.rootLogger=OFF, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=TRACE -log4j.logger.org.apache.kafka=TRACE +log4j.logger.kafka=ERROR +log4j.logger.org.apache.kafka=ERROR # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN -- 1.7.12.4