Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14106

Fetcher thread was shutdown and all fetched partitions are lost.

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.2.2, 3.0.0
    • None
    • replication

    Description

      Dynamic changes of listeners will lead into out of sync. Our operation is as following:

      1. Broker is started and listening on a ip-address.
      2. Create some topics.
      3. Change listening to a domain name via dynamic-configuration for some reason.
      4. Create some new topics.
      5. Produce message into any older topics.
      6. All topics, produced in step 5, are out of sync.

      Following is major logs:

      [2022-07-23 15:30:53,282] INFO [ReplicaFetcherManager on broker 0] Added fetcher to broker BrokerEndPoint(id=2, host=168.1.3.88:9092) for partitions Map(test-11 -> (offset=0, leaderEpoch=0), test-5 -> (offset=0, leaderEpoch=0), test-8 -> (offset=0, leaderEpoch=0), test-2 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager)
      [2022-07-25 15:01:51,581] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
      [2022-07-25 18:14:05,297] INFO [ReplicaFetcherManager on broker 0]Added fetcher to broker BrokerEndPoint(id=2, host=kafka-server-1:9092) for partitions Map(test2-6 -> (offset=0, leaderEpoch=0), test2-0 -> (offset=0, leaderEpoch=0), test2-3 -> (offset=0, leaderEpoch=0), test2-9 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager)

      After read source code. We found following code in AbstractFetcherManager:

      def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]) {
      ...
            for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {
              val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
              val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {
                case Some(currentFetcherThread) if currentFetcherThread.sourceBroker == brokerAndFetcherId.broker =>
                  currentFetcherThread
                case Some(f) =>
                  f.shutdown() // ----------------- marked
                  addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
                case None =>
                  addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
              }
              }
            }
      ...
      }
      

      As marked code defined, if sourceBroker is changed, in our case, the older fetcher thread will be shutdown and a new fetcher thread will be created using new sourceBroker. In this way, all of the fetched partitions in older fetcher thread will be lost.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            lucasrx Yang Ling

            Dates

              Created:
              Updated:

              Slack

                Issue deployment