Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13840

KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0
    • 3.2.1
    • clients, consumer
    • None

    Description

      Hi, I've discovered an issue with the java Kafka client (consumer) whereby a timeout or any other retry-able exception triggered during an async offset commit, renders the client unable to recover its group co-coordinator and leaves the client in a broken state.

       

      I first encountered this using v2.8.1 of the java client, and after going through the code base for all versions of the client, have found it affects all versions of the client from 2.6.1 onward.

      I also confirmed that by rolling back to 2.5.1, the issue is not present.

       

      The issue stems from changes to how the FindCoordinatorResponseHandler in 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure here:
      https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783

       

      In all future version of the client this call is not made:
      https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838

       

      What this results in, is when the KafkaConsumer makes a call to coordinator.commitOffsetsAsync(...), if an error occurs such that the coordinator is unavailable here:

      https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007

       

      then the client will try call:

      https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017

      However this will never be able to succeed as it perpetually returns a reference to a failed future: findCoordinatorFuture that is never cleared out.

       

      This manifests in all future calls to commitOffsetsAsync() throwing a "coordinator unavailable" exception forever going forward after any retry-able exception causes the coordinator to close. 

      Note we discovered this when we upgraded the kafka client in our Flink consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the client. We noticed this occurring in our non-flink java consumers too running 3.x client versions.

       

      Attachments

        Activity

          People

            showuon Luke Chen
            kyle.stehbens Kyle R Stehbens
            Votes:
            4 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: