Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14106

Fetcher thread was shutdown and all fetched partitions are lost.



    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.2.2, 3.0.0
    • None
    • replication


      Dynamic changes of listeners will lead into out of sync. Our operation is as following:

      1. Broker is started and listening on a ip-address.
      2. Create some topics.
      3. Change listening to a domain name via dynamic-configuration for some reason.
      4. Create some new topics.
      5. Produce message into any older topics.
      6. All topics, produced in step 5, are out of sync.

      Following is major logs:

      [2022-07-23 15:30:53,282] INFO [ReplicaFetcherManager on broker 0] Added fetcher to broker BrokerEndPoint(id=2, host= for partitions Map(test-11 -> (offset=0, leaderEpoch=0), test-5 -> (offset=0, leaderEpoch=0), test-8 -> (offset=0, leaderEpoch=0), test-2 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager)
      [2022-07-25 15:01:51,581] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
      [2022-07-25 18:14:05,297] INFO [ReplicaFetcherManager on broker 0]Added fetcher to broker BrokerEndPoint(id=2, host=kafka-server-1:9092) for partitions Map(test2-6 -> (offset=0, leaderEpoch=0), test2-0 -> (offset=0, leaderEpoch=0), test2-3 -> (offset=0, leaderEpoch=0), test2-9 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager)

      After read source code. We found following code in AbstractFetcherManager:

      def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]) {
            for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {
              val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
              val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {
                case Some(currentFetcherThread) if currentFetcherThread.sourceBroker == brokerAndFetcherId.broker =>
                case Some(f) =>
                  f.shutdown() // ----------------- marked
                  addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
                case None =>
                  addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)

      As marked code defined, if sourceBroker is changed, in our case, the older fetcher thread will be shutdown and a new fetcher thread will be created using new sourceBroker. In this way, all of the fetched partitions in older fetcher thread will be lost.




            Unassigned Unassigned
            lucasrx Yang Ling
            0 Vote for this issue
            2 Start watching this issue