Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.9.0
-
None
Description
The MirrorCheckpointConnector has two operations that read the source consumer groups:
- loadInitialConsumerGroups
- refreshConsumerGroups
loadInitialConsumerGroups blocks the start() method of the connector, while refreshConsumerGroups is asynchronous and runs periodically while the connector is running.
loadInitialConsumerGroups may take a long time to execute, and may exceed the configured "admin.timeout.ms" used by the Scheduler. This timeout is logged and the start() method returns normally. If this happens, the framework will generate task configs immediately after start(), before loadInitialConsumerGroups can finish, and will generate an empty set of task configs: https://github.com/apache/kafka/blob/e2494e6ffb89f8288ed2aeb9b5596c755210bffd/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L118-L121.
Later, when loadInitialConsumerGroups completes, it will not request task reconfiguration, believing it is the initial load operation.
Later still, when refreshConsumerGroups completes, it will not request task reconfiguration, as the set of consumer groups has not changed since the initial load: https://github.com/apache/kafka/blob/e2494e6ffb89f8288ed2aeb9b5596c755210bffd/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L173-L180
This leads to a situation where the MirrorCheckpointConnector believes it has converged with nothing to update, but actually has consumer groups that are not allocated to tasks.
This happens particularly for large, stable Kafka clusters with many consumer groups that are not being actively created or deleted.
Attachments
Issue Links
- relates to
-
KAFKA-17233 MirrorCheckpointConnector should use batched listConsumerGroupOffsets
- Resolved
- links to