Description
case situation:
1. The business program occupies a large amount of memory, causing the `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally.
2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer clientId=consumer-5, groupId=*****_dev_VA] Heartbeat thread failed due to unexpected error java.lang.OutOfMemoryError: Java heap space
2. The finally module of the heartbeat thread ` run()` method only prints the log, but does not update the value of `AbstractCoordinator.state`.
3. For kafkaConsumer with the groupRebalance mechanism enabled, in the `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take the value `timeToNextHeartbeat(now)`.
4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will never be updated again.
And the `AbstractCoordinator.state` field value will always be STABLE,
So the `timeToNextHeartbeat(long now)` method will return 0.
0 will be passed to the underlying `networkClient#poll` method.
In the end, the user calls the `poll(duration)` method in an endless loop, and the `kafkaConsumer#pollForFetches(timer)` method will always return very quickly, taking up a lot of cpu.
solution:
1. Refer to the note of `MemberState.STABLE` :
the client has joined and is sending heartbeats.
When the heartbeat thread exits, in `finally` module, we should add code:
state = MemberState.UNJOINED;
closed = true;
2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new judgment condition: `heartbeatThread.hasFailed()`
if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed()) return Long.MAX_VALUE; return heartbeat.timeToNextHeartbeat(now);
Attachments
Attachments
Issue Links
- links to