Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10123

Regression resetting offsets in consumer when fetching from old broker

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.6.0, 2.5.1
    • Component/s: None
    • Labels:
      None

      Description

      We saw this error in system tests:

      java.lang.NullPointerException
              at org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1111)
              at org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:246)
              at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
              at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:437)
              at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
              at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
              at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
              at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
      

      The logs showed that the consumer was in the middle of an offset reset when this happened. We changed the validation logic in KAFKA-9724 to include the following check with the intent of skipping validation for old brokers:

                  NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString());
                  if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
                      return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
                  } else {
                      // If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation
                      completeValidation(tp);
                      return false;
                  }
      

      The problem seems to be the shortcut call to `completeValidation`, which executes the following logic:

                  if (hasPosition()) {
                      transitionState(FetchStates.FETCHING, () -> this.nextRetryTimeMs = null);
                  }
      

      We should be protected by the call to `hasPosition` here, but in the case of the `AWAIT_RESET` state, we are incorrectly returning true. This causes us to enter the `FETCHING` state without a position, which ultimately leads to the NPE.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                mumrah David Arthur
                Reporter:
                hachikuji Jason Gustafson
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: