Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14106

Fetcher thread was shutdown and all fetched partitions are lost.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.2.2, 3.0.0
    • None
    • replication

    Description

      Dynamic changes of listeners will lead into out of sync. Our operation is as following:

      1. Broker is started and listening on a ip-address.
      2. Create some topics.
      3. Change listening to a domain name via dynamic-configuration for some reason.
      4. Create some new topics.
      5. Produce message into any older topics.
      6. All topics, produced in step 5, are out of sync.

      Following is major logs:

      [2022-07-23 15:30:53,282] INFO [ReplicaFetcherManager on broker 0] Added fetcher to broker BrokerEndPoint(id=2, host=168.1.3.88:9092) for partitions Map(test-11 -> (offset=0, leaderEpoch=0), test-5 -> (offset=0, leaderEpoch=0), test-8 -> (offset=0, leaderEpoch=0), test-2 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager)
      [2022-07-25 15:01:51,581] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
      [2022-07-25 18:14:05,297] INFO [ReplicaFetcherManager on broker 0]Added fetcher to broker BrokerEndPoint(id=2, host=kafka-server-1:9092) for partitions Map(test2-6 -> (offset=0, leaderEpoch=0), test2-0 -> (offset=0, leaderEpoch=0), test2-3 -> (offset=0, leaderEpoch=0), test2-9 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager)

      After read source code. We found following code in AbstractFetcherManager:

      def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]) {
      ...
            for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {
              val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
              val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {
                case Some(currentFetcherThread) if currentFetcherThread.sourceBroker == brokerAndFetcherId.broker =>
                  currentFetcherThread
                case Some(f) =>
                  f.shutdown() // ----------------- marked
                  addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
                case None =>
                  addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
              }
              }
            }
      ...
      }
      

      As marked code defined, if sourceBroker is changed, in our case, the older fetcher thread will be shutdown and a new fetcher thread will be created using new sourceBroker. In this way, all of the fetched partitions in older fetcher thread will be lost.

      Attachments

        Activity

          People

            Unassigned Unassigned
            lucasrx Yang Ling
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: