Because our consumers' consumption logic is sometimes heavier, we refer to the configuration of Kafka stream https://kafka.apache.org/documentation/#upgrade_10201_notable
Set max.poll.interval.ms to Integer.MAX_VALUE
Our consumers have adopted method : consumer.subscribe(Pattern.compile(".riven."));
operate steps are
(1) Test environment Kafka cluster: three brokers
(2) Topics conforming to regular expressions include rivenTest1, rivenTest2, and rivenTest88
(3) Only one consumer is needed, group.id is "rivenReassign", consumer.subscribe(Pattern.compile(".riven."));
(4) At the beginning, the group status is stable, and everything is normal for consumers, then I delete topic: rivenTest88
(1) The consumer is blocked in the poll method, no longer consume any messages, and the consumer log is always printing
[main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit failed on partition rivenTest88-1 at offset 0: This server does not host this topic-partition.
(2) The describe consumerGroup interface of Adminclient has always timed out, and the group status is no longer stable
(3) The cpu and traffic of the broker are significantly increased
By analyzing the kafkaConsumer code, the version is 2.8.1.
I found that you introduced the waitForJoinGroup variable in the updateAssignmentMetadataIfNeeded method. For the reason, I attached the comment on the method: "try to update assignment metadata BUT do not need to block on the timer for join group". See as below:
By tracing the code back layer by layer, it is found that the function of this variable is to construct a time.timer(0L) and pass it back to the method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator. See as below:
But you will find that there is a submethod onJoinPrepare in the method stack of joinGroupIfNeeded, and then there is a line of code in the onJoinPrepare method
maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), the value of rebalanceConfig.rebalanceTimeoutMs is actually max.poll.interval.ms.
Finally, I tracked down ConsumerCoordinator's method commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer)
The input parameter offsets is subscriptions.allConsumed(), when I delete the topic: rivenTest88, commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) method will fall into an infinite loop! !
The reason for the endless loop is:
(1) The expiration time of the timer is too long, which is max.poll.interval.ms
(2) The offsets to be submitted contain dirty data and TopicPartition that no longer exists
(3) The response future of sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) has always failed, and the exception in the future is UnknownTopicOrPartitionException. This exception is allowed to be retried.
Then since the infinite loop interval above is 100ms by default, timer.sleep(rebalanceConfig.retryBackoffMs);
If a large number of consumers have this problem at the same time, a large number of network requests will be generated to the Kafka broker, resulting in a sharp increase in the cpu and traffic of the broker machine!
1.maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), the time of this method is recommended not to use max.poll.interval.ms,
This parameter is open to users to configure. Through the explanation of this parameter on the official website, I would never think that this parameter will be used in this place. At the same time, it will block KafkaConsumer's poll (final Duration timeout), even if I set consumer.poll (Duration.ofMillis(1000)).
2. In fact, in the poll (Timer timer, boolean waitForJoinGroup) method of ConsumerCoordinatord, before calling the ensureActiveGroup method, the consumer ensures that the local metadata is up to date, see the code
That is to say, the consumer knows which topic/topicPartition is legal before onJoinPrepare. In this case, why didn't you find the UnknownTopicOrPartitionException in the commitOffsetsSync method mentioned above,do not put the submitted offsets and the latest local metadata together for analysis, remove the non-existent topicpartitions, and then try to submit the offsets again. I think I can break out of the infinite loop by doing this
3. Why must the offset be submitted synchronously in the onJoinPrepare method? Can't the offset be submitted asynchronously? Or provide a parameter for the user to choose whether to submit synchronously or asynchronously. Or provide a new parameter to control the maximum number of retries for synchronous submission here, instead of using the Timer constructed by max.poll.interval.ms.
And if you don’t really submit the offset here, it will not have much impact. It may cause repeated consumption of some messages. I still suggest to provide a new parameter to control whether you need to submit the offset.