Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.9.0.1
-
None
-
None
Description
The replicas are not evenly distributed among the fetcher threads. This has caused some fetcher threads get overloaded and hence their requests time out frequently. This is especially a big issue when a new node is added to the cluster and the fetch traffic is high.
Here is an example run in a test cluster with 10 brokers and 6 fetcher threads (per source broker). A single topic consisting of 500+ partitions was assigned to have a replica for each parition on the newly added broker.
[kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; do grep ReplicaFetcherThread-$i- /var/log/kafka/server.log | grep "reset its fetch offset from 0" | wc -l; done 85 83 85 83 85 85 [kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; do grep ReplicaFetcherThread-$i-22 /var/log/kafka/server.log | grep "reset its fetch offset from 0" | wc -l; done 15 1 13 1 14 1
The problem is that AbstractFetcherManager::getFetcherId method does not take the broker id into account:
private def getFetcherId(topic: String, partitionId: Int) : Int = { Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers }
Hence although the replicas are evenly distributed among the fetcher ids across all source brokers, this is not necessarily the case for each broker separately.
I think a random function would do a much better job in distributing the load over the fetcher threads from each source broker.
Thoughts?