XMLWordPrintableJSON

Details

    Description

      Running system tests with the new async consumer revealed an invalid transition related to the consumer not being polled on the interval in some kind of scenario (maybe relates to consumer close, as the transition is leaving->stale)

      Log trace:

      [2024-01-17 19:45:07,379] WARN [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
      [2024-01-17 19:45:07,379] ERROR [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] Unexpected error caught in consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
      java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE
      at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
      at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
      at org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
      at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
      at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
      at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
      at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
      at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
      at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
      at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
      at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)

      We should review the poll expiration logic that triggers a leave group operation. That is currently applied in the HB Manager poll, without any validation, and given it depends on the consumer poll timer, it could happen at any time, regardless of the state of the member. Ex. poll timer could expire when the member is leaving, leading to this leaving->stale invalid transition. We should probably consider that this pro-active leave should only apply when the consumer is not leaving (prepare leaving or leaving)

      Attachments

        Issue Links

          Activity

            People

              lianetm Lianet Magrans
              lianetm Lianet Magrans
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: