Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis.
It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers.
I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot.
Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover.
We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us.
Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it.
Note that this is the fetch purgatories on the brokers that MM was producing to, which indicates fairly strongly that this is a replication issue, not a MM issue.
This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging.
The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each.
Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster.
The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all.
We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines:
The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker.
At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests:
- Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem.
- Doing an additional rebalance of the cluster in order to move a number of other topics with regular data to kafka06 appears to have solved the problem completely.
Current versions (0.8.2.1 and earlier) have issues with the replica fetcher not backing off correctly (
KAFKA-1461, KAFKA-2082 and others). I believe that in a very specific situation, the replica fetcher thread of one broker can spam another broker with requests that fill up its purgatory and do not get properly flushed. My best guess is that the necessary conditions are:
- broker A leads some partitions which receive regular traffic, and some partitions which do not
- broker B replicates some of each type of partition from broker A
- some producers are producing with RequiredAcks=-1 (wait for all ISR)
- broker B happens to divide its replicated partitions such that one of its replica threads consists only of partitions which receive no regular traffic
When the above conditions are met, and broker A receives a produce request (frequently, since it leads some high-volume partitions), it triggers broker B's replica manager, which causes all of broker B's replica fetcher threads to send fetch requests. This includes the thread which owns only the empty partitions, causing fetch requests for those partitions to build up quickly in broker A's purgatory, causing the issue.
Hopefully somebody with more kafka experience will be able to validate or disprove my hypothesis. The issue has been resolved for us, for now, by the rebalancing of broker 6, but I would like to fully understand and fix it before we run into again in another context.