Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14729

The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.0
    • 3.5.0
    • consumer
    • None

    Description

      case situation:

      1. The business program occupies a large amount of memory, causing the `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally.

      2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer clientId=consumer-5, groupId=*****_dev_VA] Heartbeat thread failed due to unexpected error java.lang.OutOfMemoryError: Java heap space 

      2. The finally module of the heartbeat thread ` run()` method only prints the log, but does not update the value of `AbstractCoordinator.state`.
      3. For kafkaConsumer with the groupRebalance mechanism enabled, in the `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take the value `timeToNextHeartbeat(now)`.
      4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will never be updated again.
      And the `AbstractCoordinator.state` field value will always be STABLE,
      So the `timeToNextHeartbeat(long now)` method will return 0.
      0 will be passed to the underlying `networkClient#poll` method.

       

      In the end, the user calls the `poll(duration)` method in an endless loop, and the `kafkaConsumer#pollForFetches(timer)` method will always return very quickly, taking up a lot of cpu.

       

      solution:

      1. Refer to the note of `MemberState.STABLE` :

      the client has joined and is sending heartbeats.

      When the heartbeat thread exits, in `finally` module, we should add code:

      state = MemberState.UNJOINED;
      closed = true;

      2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new judgment condition: `heartbeatThread.hasFailed()`

      if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed())
          return Long.MAX_VALUE;
      return heartbeat.timeToNextHeartbeat(now);

       

      Attachments

        1. image-2023-02-17-13-15-50-362.png
          74 kB
          RivenSun
        2. jstack_highCpu.txt
          4 kB
          RivenSun

        Issue Links

          Activity

            People

              RivenSun RivenSun
              RivenSun RivenSun
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: