Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.2.2, 3.0.0
-
None
Description
Dynamic changes of listeners will lead into out of sync. Our operation is as following:
- Broker is started and listening on a ip-address.
- Create some topics.
- Change listening to a domain name via dynamic-configuration for some reason.
- Create some new topics.
- Produce message into any older topics.
- All topics, produced in step 5, are out of sync.
Following is major logs:
[2022-07-23 15:30:53,282] INFO [ReplicaFetcherManager on broker 0] Added fetcher to broker BrokerEndPoint(id=2, host=168.1.3.88:9092) for partitions Map(test-11 -> (offset=0, leaderEpoch=0), test-5 -> (offset=0, leaderEpoch=0), test-8 -> (offset=0, leaderEpoch=0), test-2 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager)
[2022-07-25 15:01:51,581] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2022-07-25 18:14:05,297] INFO [ReplicaFetcherManager on broker 0]Added fetcher to broker BrokerEndPoint(id=2, host=kafka-server-1:9092) for partitions Map(test2-6 -> (offset=0, leaderEpoch=0), test2-0 -> (offset=0, leaderEpoch=0), test2-3 -> (offset=0, leaderEpoch=0), test2-9 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager)
After read source code. We found following code in AbstractFetcherManager:
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]) { ... for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) { val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId) val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match { case Some(currentFetcherThread) if currentFetcherThread.sourceBroker == brokerAndFetcherId.broker => currentFetcherThread case Some(f) => f.shutdown() // ----------------- marked addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) case None => addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) } } } ... }
As marked code defined, if sourceBroker is changed, in our case, the older fetcher thread will be shutdown and a new fetcher thread will be created using new sourceBroker. In this way, all of the fetched partitions in older fetcher thread will be lost.