Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13266

`InitialFetchState` should be created after partition is removed from the fetchers

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • None
    • 3.0.0
    • None
    • None

    Description

       `ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes fails with the following error in the log:

      [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Unexpected error occurred while processing data for partition __consumer_offsets-1 at offset 31727 (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end offset = 31728. at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Partition __consumer_offsets-1 marked as failed (kafka.server.ReplicaFetcherThread)
      

       The issue is due to a race condition in `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created and populated before the partition is removed from the fetcher threads. This means that the fetch offset of the `InitialFetchState` could be outdated when the fetcher threads are re-started because the fetcher threads could have incremented the log end offset in between.

      The partitions must be removed from the fetcher threads before the `InitialFetchStates` are created.

      Attachments

        Issue Links

          Activity

            People

              dajac David Jacot
              dajac David Jacot
              Jason Gustafson Jason Gustafson
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: