Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
Description
If you enable auto.leader.rebalance.enable (which is on by default), and you have a cluster with many partitions, there is a severe amount of replication downtime following a restart. This causes `UnderReplicatedPartitions` to fire, and replication is paused.
This is because the current automated leader rebalance mechanism changes leaders for all imbalanced partitions at once, instead of doing it gradually. This effectively stops all replica fetchers in the cluster (assuming there are enough imbalanced partitions), and restarts them. This can take minutes on busy clusters, during which no replication is happening and user data is at risk. Clients with acks=-1 also see issues at this time, because replication is effectively stalled.
To quote Todd Palino from the mailing list:
There is an admin CLI command to trigger the preferred replica election manually. There is also a broker configuration “auto.leader.rebalance.enable” which you can set to have the broker automatically perform the PLE when needed. DO NOT USE THIS OPTION. There are serious performance issues when doing so, especially on larger clusters. It needs some development work that has not been fully identified yet.
This setting is extremely useful for smaller clusters, but with high partition counts causes the huge issues stated above.
One potential fix could be adding a new configuration for the number of partitions to do automated leader rebalancing for at once, and stop once that number of leader rebalances are in flight, until they're done. There may be better mechanisms, and I'd love to hear if anybody has any ideas.
Attachments
Activity
tcrayford-heroku Answer to the above question? Moving out of next target release until we can get a follow up response re: the issue since it's gone without a response since previous releases.
The auto leader balancing logic now uses the async ZK api (https://issues.apache.org/jira/browse/KAFKA-5642) and batches the requests from the controller to the brokers. So, the process should be much faster with many partitions. Closing this for now.
junrao any ballpark quantification to "much faster"?
Are we talking 2x, 10x, or 100x faster?
When you say "batches the requests", I'm not sure what the batch size is... if it does all changes as a single batch or if there's multiple batches... so it's hard to guesstimate the expected performance impact.
jeffwidman, the detailed perf results can be found in https://github.com/apache/kafka/pull/3427. One of the results is the following.
5 brokers, 25K topics, 1 partition, RF=2.
Controlled shutdown time of a broker went down from 22 secs to 3 secs because of async ZK accesses and batching requests between the controller and the brokers.
Is this issue really resolved? I am using Kafka 1.1.0 and my cluster with 97 hosts and 22,500 partitions face the same problem. Here is what I see in the log for one partition.
1. Broker A went offline for some reasons (deployment, network glitch .. ect)
2. Controller elected broker B as the leader for that partition and let B become the leader.
3. Broker A went online again.
4. Controller let A become the follower of broker B and A is in sync with broker B.
5. Controller triggered auto rebalance and elect A as the new leader and inform B to become the follower.
6. Broker B became the follower and then Controller informed A to be the leader.
7. Broker A became the leader.
The problem is the broker A has 100% CPU utilization on step 6 and takes a long to for it to handle the controller request to become a leader. There a great latency between 6 and 7 which cause a partition to live without a leader for around 1- 30 minutes.
jiaxinye: You are probably hitting https://issues.apache.org/jira/browse/KAFKA-7299 . We saw the same behavior, and our brokers were taking 4 hours to get back into sync. We applied that patch, and it dropped to 2.5 minutes.
jiaxinye does the patch from https://issues.apache.org/jira/browse/KAFKA-7299 help?
any idea why step 6 has 100% CPU utilization? is it doing heavy ReplicaFetcher pulling data to follower of this previous offline brokers? If this is true, then KIP-491 might help by putting this broker to temporary preferred leader "blacklist", and once it's up and running stable with low CPU usage, then remove from the blacklist and then the auto leader rebalance can proceed.
Hi jiaxinye,
Does https://issues.apache.org/jira/browse/KAFKA-7299 solves 100% CPU at step 6 for you ?
We're running into exactly the same issues with 50 broker cluster and 35k partitions
We are seeing the same (or at least similar) issues. When terminating a broker in AWS, and assigning the same broker ID to it's replacement (that has a empty data dir).
We have auto.leader.rebalance.enable=true and when the broker get's assigned as leader - we get 100% CPU usage (ReplicaFetcher threads), and many clients (streams) fail to connect, as the broker both tries to fetch and serve data.
Am just thinking outside of the box a bit for a work around, but until KIP-491 is implemented (if at all), is it possible to implement some logic/automation to set auto.leader.rebalance.enable=false on the replacement node, so that as soon as the Kafka service is started, it will prevent it from becoming leader, until under replication partitions = 0, and then reset auto.leader.rebalance.enable back to true, and restart the kafka service ? Or must auto.leader.rebalance.enable be set on all brokers (or all other brokers), for it to be effective ?
Or perhaps just script kafka-preferred-replica-election.sh to run, when CPU is lower on the new node, or under replication partitions = 0 ?
junrao any interest in KIP-491. We are using this in production and has been a critical feature to help us meet our producer latencies SLA.
As blodsbror pointed out it can help in auto leader rebalance as well.
blodsbror We have implemented KIP-491 internally.
Using a dynamic config leader.deprioritized.list to set it for the cluster global level ('<default>') so in case of controller failover, the new controller will inherit this dynamic config settings.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config leader.deprioritized.list=10001 $ zkcli -h zk_host1 get /kafka_cluster_name/config/brokers/'<default>' ('{"version":1,"config":{"leader.deprioritized.list":"10001"}}', ZnodeStat(czxid=25790129667, mzxid=25791108491, ctime=1572677228451, mtime=1580117760492, version=21, cversion=0, aversion=0, ephemeralOwner=0, dataLength=59, numChildren=0, pzxid=25790129667))
This will put the broker 10001 to the lowest priority when controller is considering leadership for that partition, regardless this broker is in the 1st position of the assignment (namely : preferred leader), if this is currently serving leadership, the preferred leader election will move it to another broker in the ISR.
We have also implemented another feature separate from KIP-491 that when an empty broker is starting up, a dynamic config for that broker called "replica.start.offset.strategy" is set to "latest" (default is "earliest" like current upstream behavior), just like a consumer, it will fetch from current leaders' latest offsets instead of earliest(start) offset. So this makes the failed empty broker coming up very fast. This feature is used together with KIP-491 leader.deprioritized.list to blacklist this broker to serve traffic (because it does not have enough data). After it's in replication for sometime (retention of the broker/topic level), this broker is completely caught-up, and the leader.deprioritized.list is removed. and when preferred leader is run, this broker can serve traffic again. We haven't proposed this in any KIP yet. But I think this is also a good feature.
maybe I will restart the KIP-491 discussion in the dev mailing list.
sql_consulting Many thanks for the explanation.
Is there any logic I can implement at all now, to try and solve this in the meantime, until KIP-491 is merged ? Currently we are on Confluent 5.4 (open source). We have auto.leader.rebalance.enable=true on all brokers. I'm assuming, as long as the controller broker has this enabled, then setting auto.leader.rebalance.enable=false on the new broker before it's service starts - won't have any effect, in stopping it being re-assigned back as leader ?
Could bandwidth quota's on the broker level help out that situation to ease the load on connected clients ?
leader.replication.throttled.rate
follower.replication.throttled.rate
Or for those to have any consequence, does the bandwidth quotas also need to be set at the topic level ?
leader.replication.throttled.replicas
follower.replication.throttled.replicas
One solution we have in-house is a tooling to do "offline rebuild" of the failed empty broker. The idea is to keep the kafka on the new empty broker down. not serving any traffic. and copy the log segments/indexes files from other brokers in the cluster that this broker is supposed to be hosting. distribute and throttle the copy so it would not affect the brokers serving traffic in production. Once the copy is done, modify the replication offset meta file, start up the kafka on the new node, since there are historical logs already copied, the "delta" catch-up from leaders will be small, and it will be faster to catch up.
This is just the main idea. It is an internal tooling that is tied to our infrastructure. Hope this helps.
evanjpw : Yes, for now, you can potentially set up replication quota. You will need to set leader.replication.throttled.replicas and follower.replication.throttled.replicas for each topic residing on that broker. You can use the following command to do that.
bin/kafka-configs … --alter
--add-config 'leader.replication.throttled.replicas=*'
--entity-type topic
sriharsha : I am not sure if KIP-491 is necessarily the best approach to address this particular issue (in general, one probably shouldn't have any broker overloaded at any time). However, if there are other convincing use cases, we could consider it.
junrao I am not sure if we can avoid a broker overload problem even with replication throttle.
For example, each broker in a 10 node cluster has around 4000 topic-partitions with 3 days of the retention period. If a broker goes down it needs to rebuild the data for the past 3 days for all of these 4000 partitions. If we apply a throttle of 200MB/sec for the new broker, its pulling from 9 other brokers each with 200MB/sec.
If the auto.leader.rebalance.enable is enabled, this broker becomes a leader for topics as soon it catches up and be part of ISR. So it's still trying pull data from other brokers at 200MB/sec and while becoming a leader for few topic partitions which it will serve for both producer and consumer which increases the load on this broker.
This may not be an issue in AWS with EBS volumes as one could mount the volumes in to new host but it still causes an issue with on-prem hosts.
sriharsha, hmm, if one sets follower.replication.throttled.rate to 200MB/sec on the new broker, the total incoming replication traffic is capped at 200MB/sec, independent of the number of brokers it fetches from. With that, it seems that one can reserve some bandwidth for regular clients during the catchup phase.
Thanks a lot guys. sql_consulting I have actually engineered an automatic EBS re-attach solution in AWS, to attach the EBS back to the new instance after instance termination. And this indeed does solve the issue. However we have decided to move over to I3 instance type (local SSD), where data is removed upon termination/stop. This is because you get much more disk performance for the price.
- Will that exact command set a throttle on all topics in one shot ? Or must you script something up, first enumerating all topics names that broker is either leader/follow for - and then iterate through each, using --entity-name as well ? I'm assuming something needs to be scripted to apply that config (via cron) on a regular basics to watch for any new topics ?
- Just to clarify - If leader.replication.throttled.rate and follower.replication.throttled.rate only is set only on the new broker, will its total incoming and outgoing replication bandwidth be throttled to those exact limits, even when it starts becoming a member of ISR over time, and regardless if the other brokers have that config or not ? Can those configs be set before the service on the new broker has started ?
- I totally agree, that a clean/dynamic, cluster wide way to enforce that a broker should not become a leader before it comes online (as if it can only be done after service start, that time may be enough for clients to have issues) would be very handy in this situation. Depending on producer/consumer config, they may still time out - even with throttling set on the broker. And more importantly, depending on the incoming messages per second - the new broker may struggle to come into ISR, or at least take longer than without having any client traffic. Removing all leadership (and ideally automatically removing the config when it has zero replication lag) would help dramatically I would suspect.
- For this case, you do have to script up things yourself. Basically, you need to set up the per topic config for throttled replicas and then set the broker level throttling value.
- We monitor the incoming/outgoing replication traffic (in-sync and out-of-sync) per broker. If that's higher than the throttling value, we throttle the out-of-sync replicas.
- To solve this particular problem, another way is to change a bit how auto leader balancing works. For example, we can choose to only move the leaders to a broker when all replicas on it are in-sync.
Thanks junrao
Totally agree.
- Set dynamic cluster property (for example) 'auto.leader.rebalance.broker.delay=brokerid', before the new broker service starts.
- Start the broker.
- When it's in ISR for all partitions it's following, execute leader rebalance back to the broker.
evanjpw: I am not sure if you need the new config. The controller already knows which replicas are in-sync.
junrao I've implemented throttling now. Even on quite a high throttle, and num.replica.fetchers=1, it seems the fetcherthreads are still killing my CPU (4 vcpu / 32GB / 6GB Java heap). Any ideas why that may be ? There is a approx 1500 partitions for the broker to replicate, but still - I wonder why this is still not being restricted. Unfortunately it's still causing issues for clients, as I can see the total incoming message rate drop quite bit.
num.network.threads=20
num.io.threads=40 (average idle request handlers is around 20%. Edit: Now it's 0%)
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
7134 kafka 20 0 38.9g 6.0g 79628 R 99.9 20.0 4:53.34 ReplicaFetcherT
7149 kafka 20 0 38.9g 6.0g 79628 R 93.8 20.0 4:54.10 ReplicaFetcherT
7145 kafka 20 0 38.9g 6.0g 79628 R 50.0 20.0 4:03.96 ReplicaFetcherT
7135 kafka 20 0 38.9g 6.0g 79628 R 43.8 20.0 4:50.38 ReplicaFetcherT
After a 2nd look, it's looking more like this:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
15031 kafka 20 0 52.0g 1.8g 44940 R 13.3 6.0 0:24.47 data-plane-kafk
15037 kafka 20 0 52.0g 1.8g 44940 S 13.3 6.0 0:24.53 data-plane-kafk
15038 kafka 20 0 52.0g 1.8g 44940 R 13.3 6.0 0:24.34 data-plane-kafk
15044 kafka 20 0 52.0g 1.8g 44940 R 13.3 6.0 0:24.23 data-plane-kafk
15047 kafka 20 0 52.0g 1.8g 44940 R 13.3 6.0 0:24.73 data-plane-kafk
15050 kafka 20 0 52.0g 1.8g 44940 R 13.3 6.0 0:24.13 data-plane-kafk
15052 kafka 20 0 52.0g 1.8g 44940 R 13.3 6.0 0:24.38 data-plane-kafk
15054 kafka 20 0 52.0g 1.8g 44940 R 13.3 6.0 0:23.72 data-plane-kafk
15060 kafka 20 0 52.0g 1.8g 44940 R 13.3 6.0 0:24.62 data-plane-kafk
15063 kafka 20 0 52.0g 1.8g 44940 R 13.3 6.0 0:24.04 data-plane-kafk
15086 kafka 20 0 53.4g 1.9g 45560 S 11.1 6.3 1:18.21 data-plane-kafk
15090 kafka 20 0 53.4g 1.9g 45560 S 11.1 6.3 1:15.55 data-plane-kafk
15094 kafka 20 0 53.4g 1.9g 45560 R 11.1 6.3 3:46.03 ReplicaFetcherT
15099 kafka 20 0 53.4g 1.9g 45560 R 11.1 6.3 4:00.63 ReplicaFetcherT
15100 kafka 20 0 53.4g 1.9g 45560 R 11.1 6.3 3:45.42 ReplicaFetcherT
sql_consulting Any ideas ? This is where a way to just not let this broker become a leader while it's replicating, would really help.
evanjpw: There are metrics related to replication throttling (https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas#KIP-73ReplicationQuotas-Metrics). Perhaps you can check to see if the replication quota is actually engaged.
My monitoring (datadog) isn't set up for that metric at all, so it might take a while to get some data on it. But as far as I can see:
- Throttle is set on all topics.
- Throttle is set on brokers (-entity-default): Configs for default brokers are follower.replication.throttled.rate=10000000 (or does this only work if done via -entity-name brokerId) ?
I'm really stumped here. Even with a 10MB follower throttle, and clients that don't generate more than 10MB of traffic - on a 150MB/s network link and a single local SSD - it's still killing the CPU.
Yes, I can see that it's been throttled to exactly 10MB/sec. So is working. I'm not sure how the fetcherthreads work, or if num.fetcher.threads=1 can still spawn multiple child threads that can take over the CPU (and how that interacts with the number of set IO/network threads). But yes, 10MB/s on a 4vcpu server shouldn't cause this.
Anyway, didn't mean to turn this into a support thread But it just show's that I could very much use a 'blacklist' feature, to stop a broker from becoming leader while it replicates. Then it can hit higher CPU limits, without effecting any clients. It seems that it's not easy to control load, even when trying to apply throttles. Am open to any other ideas to try, but yes - thanks for all your help.
With `auto.leader.rebalance.enable=true`, even with checking the broker is in ISR, then rebalance the leadership to this broker. It might still impact the partition that this broker is serving leadership because the broker is still trying to catch up.
blodsbror
Is your cluster having lossless setting like min.insync.replicas > 1 ? For lossless, we experience high producer latency during catchup or reassignments. The leader deprioritized list feature is convenient in this case. it will just put this broker catching up in the lowest priority when considering being the leader. Another useful case is when the current controlller is very busy with metadata request, "blacklist" it, and only serving as followers can give 10-15% CPU back to the controller (without bouncing it).
In a cluster without any down brokers, no URP (Under Replicated Partitions), there is a workaround to run reassignments to move that broker to the end of the partition assignment, e.g. broker_id 100 is down. then partition assignment (100, 101, 102) => (101, 102, 100). the reassignment should complete fast because all replicas in ISR. then run preferred leader election will change the leader from 100 => 101. The downside is: its more work to rollback or rebalance again. KIP-491 is very easy to rollback, just unset the dynamic config.
In the case of the cluster with URP, the reassignment approach to move the broker to the end of the assignment might not work, because the broker is not in ISR. the reassignments will be pending till it catches up and in ISR.
So maybe we do have a good use-case for KIP-491 ?
We are using min.insync.replicas=1. And have replication.factor=3 or above for most topics (6 brokers).
As a side note, one interesting thing I've seen reported now from the owners of the clients (streams) is that, for certain topics/partitions - they had no leader elected, even if there was a clean shutdown of the bootstrapping broker. So something is quite weird there. What might cause that ?
But yes, I think there is a clear case for KIP-491 in this scenario, to just blacklist a broker from becoming leader until x factor is satisfied, or it's manually removed.
Which kafka version you are running? Maybe I can provide a diff for the KIP-491 changes for you to apply at your end to try it out?
That would be great! We are running the latest Confluent 5.4. However Zookeeper is whatever is bundled with 5.3 currently.
Another option I was thinking about, is separating client, replication and controller traffic via multiple NIC's. At the moment, we just have one listener/advertised listener and one NIC per broker. I guess this would also guarantee that even if threads get exhausted for replication traffic, that client traffic would be unaffected ?
FYI, I implemented separation of replication/controller/client traffic, and this helped somewhat. However, I think in the end the only real solution is KIP-491.
blodsbror if you are interested in trying out our patch we are happy to provide on top of whichever Kafka version you are testing.
If you can provide your test results by using this patch it gives further more evidence for this KIP. Let us know.
That would be great. We are using Confluent 5.4 (open source). If you could guide me a little on how to actually patch correctly, that would be great too.
On another note, I have managed to successfully work around the issue by simply setting auto.leader.rebalance.enable=false and running kafka-leader-election manually after all partitions have been synced. It's not a true solution, but it at least shows that by not having massive amounts of replication + leader traffic our clients are not effected.
Hi
Just wanted to chime in with some of our experience on this front.
We've also encountered issues like this.
We run our brokers with roughly 10k partitions per broker.
We have auto.leader.rebalance.enable=true. It sometimes happens to us that a normal restart of a broker will cause our cluster to get into a bad state for 1-2 hours.
During that time, what we've seen is: * Auto-leader rebalance triggers and moves a large number of leaders to/from the various brokers.
- The broker that is receiving the leaders takes a long time to assume leadership. If you do a metadata request to it, it will say it is not the leader for those partitions.
- The brokers that are giving up leadership will quickly give up leadership. If you do a metadata request to them, they will say that they are not the leader for those partitions.
- Clients will receive notification that leadership has transitioned. But since the clients will get their metadata from an arbitrary broker in the cluster, they will receive different metadata depending on which broker they contact. Regardless, they will trust that metadata and will attempt to fetch from the broker that they believe is leader. But at this point, all leaders are claiming that they are not the leader for partitions. So they will attempt a fetch, get an error, refetch metadata, and then try again. You get a thundering herd problem, where all clients are hammering the cluster.
- Broker request queues will be busy attempting to assume leadership, do replication, as well as answer and reject to all these requests.
- During this time, each broker will be reporting UnderReplicatedPartitions=0. The reason for that is, brokers will report an UnderReplicatedPartition if they are the leader for a partition, and there are followers that are not present. In this case, the brokers do not believe they are leaders for these partitions, and so will not report it as an UnderReplicatedPartition. And similarly, I believe that OfflinePartitions=0 as well. But from a user's point of view, these partitions are effectively inaccessible, because no one will serve traffic for them.
The metrics we've seen during this are: * ControllerQueueSize goes to maximum
- RequestHandlerIdlePercent drops to 0%
- (maybe) RequestQueueSize goes to maximum
- TotalTimeMs for Produce request goes into 20000ms range
TotalTimeMs for FetchFollower goes into 1000ms range (which is odd, because the default setting is 500ms)
TotalTimeMs for FetchConsumer goes into 1000ms range (which is odd, because the default setting is 500ms)
We've also seen similar behavior when we replace the hard drive on a broker and it has to re-replicate its entire contents. We don't yet definitively understand that one, yet.
We've worked with Confluent on this. The advice from Confluent seems to be our brokers are simply underpowered for our situation. From the metrics point of view, the evidence that points to this is: * Normal host CPU utilization is 45-60%
- During these times, High CPU utilization of 60-70%
- High ResponseSendTimeMs during these times
- NetworkProcessorIdlePercent 0%
From our side, we're not sure what specifically is the thing that we're underpowered for. Is it number of partitions? Number of clients + number of retries? Amount of network data? All The Things? We're not sure.
One idea on how to mitigate it is to set
auto.leader.rebalance.enable=false
. And then to use an kafka-reassign-partitions.sh to move leaders just a few at a time, until leadership is rebalanced.
You can do something similar when re-populating a broker from scratch. You can remove the (empty) broker from all partitions, and then use kafka-reassign-partitions.sh to add the broker as a follower to a small number of partitions at a time. That lets you control how many partitions move at a time. kafka-reassign-partitions.sh also lets you specify throttling, so you can also use that. That lets you throttle network bandwidth. So at this point, you are in control of how many partitions are moving, as well as how much network they use. And then lastly, you can also control not just "become follower" but you can also decide when they become leaders, and how quickly.
All of this is unfortunately more operational work, but it at least, puts you in control of things.
Our cluster specs are: * 4 vCPU, 32 GiB memory and we use st1 type EBS volumes.
- 10k partitions per broker
- unknown number of clients
- low network bytes in/out (most partitions not receiving active traffic)
- num.network.threads=8
num.io.threads=16
https://issues.apache.org/jira/browse/KAFKA-9594 identifies an issue that can delay for processing of LeaderAndIsrRequest in the follower.
Yes, I've now decided to set auto.leader.rebalance.enable=false and have implemented logic in our config management (AWS codedeploy / bash) to poll for under replicated partitions, and when UR=0, trigger a leader rebalance (we re-use broker ID's). In addition to topic throttling, it does give some control back to resource usage on a replicating broker.
So this actually solves our issues, as clients continue connecting to the other brokers without any problems, while the replaced broker replicates. But still, I would absolutely love this feature/patch still - so auto.leader.rebalance.enable can be turned on again.
Hi Evan, I am so sorry I had been busy with work and forget about providing the KIP-491 patch for Confluent 5.4. just check the backlog emails. Let me work on it this weekend.
No problem! Thanks for your help here. I'm new to applying a 'patch', so if you could guide me on that as well (or point me to instructions), including on how to rollback if possible ? That would be super.
sure. I am working on porting the patch from 1.1.x. to 2.4. Looks like some changes of separating PartitionStateMachine.scala to Election.scala. I will need some time to modify that and corresponding unit/integration/system tests. will post the patch once ready
Very busy with work and this COVID-19 Jira work-from-home ticket. Last time I worked on it I had some issues with some Unit Tests passing after porting from 1.1.x to 2.4. Let me look into them again. sorry for the delay.
Have some free time this weekend to troubleshoot and found out after 1.1 , at least in 2.4, the controller code has some optimization for PLE, not running PLE at all if current_leader == Head of Replica. That had cause my unit/integration tests to fail. I have patched that as well. I have landed my code change to my repo's feature branch. 2.4-leader-deprioritized-list (based on the 2.4 branch)
detail installation and testing steps in this Google doc. Please let me know if you have issues with the patch/testing. If can not view the doc, please click the request access button. or send me your email to add to the share. my email: sqlconsulting@gmail.com
Please keep us posted with your testing results.
Thanks,
George
Much appreciated sql_consulting. I've requested access to the Google doc. I will implement and let you know how things go!. If this turns out to work well - what are the chances that it get's merged into an official release ?
Edit: Answered in the doc, thanks!
One question though - is it reasonable to still set auto.leader.rebalance.enable=true ?, on all brokers, with this new functionality ?
I think it's still ok to set auto.leader.rebalance.enable=true. but for the broker that had failed and replaced coming up empty.
There should be some kind of automation that first set the `leader.deprioritized.list=<broker_Id>` dynamic config at '<default>' cluster global level, so current controller and possible another failover controller can make it in effect immediately. Then start the new replaced broker. During the time the broker is busy catching up. Because it's in the lower priority for being considered for leaders, the auto.leader.rebalance.enable=true will be sort of disabled automatically for this broker.
After this broker catches up. e.g. URP => 0, CPU/Disk, etc. back to normal. the dynamic config above can be removed by the automation script. and with auto.leader.rebalance.enable=true, the leaders will be auto going to its Preferred leader (first/head of the partition assignment) of this broker.
sql_consulting Thanks. Yes, I already have that logic scripted up which is good. However for me, it's much better to have auto.leader.rebalance.enable=true cluster wide if possible - so that it can take care of instance reboots, or service (planned/unplanned) restarts, automatically. And that's the real benefit of this patch. Being able to leave that enabled, yet not killing a replaced broker with traffic until UR=0
If this turns out to work well - what are the chances that it get's merged into an official release ?
If this turns out to be positive in the testing. I can restart the discussion on the dev mailing list for KIP-491. at least it works/helps with auto.leader.rebalance.enable=true.
There are other use cases listed in KIP-491. e.g. when controller is busy with metadata request, can set this dynamic config for the controller, run PLE, and controller will give up all its leadership, just as a follower, CPU usage down. 10-15%, making it light-weighted doing its work, no need to bounce the controller. I know some company is working on the feature of separating the controller to another set of machines.
Our primary use case of this `leader.deprioritized.list=<broker_Id>` feature is bundled together with another feature call replica.start.offlet.strategy=latest , which I have not filed for a KIP , (default is earliest like current kafka behavior), this is also a dynamic config. can be set for broker level (or global cluster). What it does is when a broker failed and lost all its local disk, and replaced with an empty broker, the empty broker will need to start replication from earliest offset by default, for us, this could be 20TB+ of data for a few hours and can cause outages if not throttled properly. So just like the kafka consumer, we introduce dynamic config replica.start.offlet.strategy=latest , to just replicate from each partition leader's latest offset. Once it's caught up (URP=> 0 for this broker) usually in 5-10minutes or sooner, then remove the dynamic config, Because this broker does not have all the historical data, it should not be serving leaderships. That's how the KIP-491. `leader.deprioritized.list=<broker_Id>` is coming into play. The automation software will calculate the retention time at the broker and topic level, take the Max, and once the broker is in replication for that amount of time (e.g. 6 hours, 1 day, 3days, whatever,), the automation software will remove the leader.deprioritized.list dynamic config for the broker. and run PLE to change the leadership back to it.
Many thanks for that details info. Sounds like some great work there, and very handy features.
On another somewhat related note (and I can open a bug ticket for this if need be) - I've noticed than on a cluster (5.4), with 6000 topics - manual leader election times out. Is there a way to increase the timeout ? If not, then our only option is auto.leader.rebalance.enable=true. I guess it's important PLE works, for all of this functionality to work properly.
kafka-leader-election --bootstrap-server $(grep advertised.listeners= /etc/kafka/server.properties |cut -d: -f4 |cut -d/ -f3):9092 --all-topic-partitions --election-type preferred
Timeout waiting for election results
Exception in thread "main" kafka.common.AdminCommandFailedException: Timeout waiting for election results
at kafka.admin.LeaderElectionCommand$.electLeaders(LeaderElectionCommand.scala:133)
at kafka.admin.LeaderElectionCommand$.run(LeaderElectionCommand.scala:88)
at kafka.admin.LeaderElectionCommand$.main(LeaderElectionCommand.scala:41)
at kafka.admin.LeaderElectionCommand.main(LeaderElectionCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
Probably doing PLE with too many partitions at once is not good. We have scripted to take all partition with Preferred Leader Imbalance. (e.g. current leader != first replica). and the first replica is in ISR.
Then we divide it batches (e.g. 100 partitions per batch. and throttle sleep about 5-10 seconds) between each batch. We also verify each batch after submitting for PLE. e.g. the ZK node. /<cluster_name>/admin/preferred_replica_election is gone.
for KIP-491 patch, maybe I should write a wrapper for doing PLE, because now the logic is not just. current_leader != first replica. but: current_leader != <preferred_replica_after_deprioritized_logic>
The batch logic is basically writing the topic/partitions into a Json file (e.g. 100 per batch), and the submit that batch using the open source script `kafka-preferred-replica-election.sh` , below is shell script to do PLE for one topic (all partitions). It's still using ZK to submit the json, can change to --bootstrap-server
$ cat topic_preferred_leader_election.sh ..... name=$1 topic=$2 kafka_cluster_name="${name}" zk=$(kafka_zk_lookup ${kafka_cluster_name}) json_filename="${name}_${topic}_leader_election.json" touch ${json_filename} echo "{\"partitions\":[" >${json_filename} IFS=$'\n' for partition in `/usr/lib/kafka/bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper $zk --describe --topic $topic 2>/dev/null |grep Partition:|awk -F "Partition:" '{print $2}'|awk '{print $1}'` do if [ "$partition" == "0" ] then echo " {\"topic\": \"${topic}\", \"partition\": ${partition}}" >>${json_filename} else echo ",{\"topic\": \"${topic}\", \"partition\": ${partition}}" >>${json_filename} fi done echo "]}" >>${json_filename} /usr/lib/kafka/bin/kafka-preferred-replica-election.sh --zookeeper $zk --path-to-json-file ${json_filename} 2>/dev/null #rm ${json_filename}
for the troubleshooting of the timeout, maybe check the ZK node: /<cluster_name>/admin/preferred_replica_election and see any pending PLE there. maybe because of the KIP-491 Preferred Leader deprioritized/black list? I doubt, because I have tested it worked. does this PLE work before applying KIP-491 patch?
I think Zookeeper node has size limit of 1MB. so 5000-6000 partitions doing PLE all together in one batch might not work. How about trying one topic first, then try 100 in a batch?
Many thanks for that!
I've just started trying to implement the patch, and just wanted to confirm a few things:
- Do I replace the entire contents of /usr/share/java/kafka (where my Kafka JAR's are), with the 96 JAR's from the tar file ? Or do I just copy them in addition to the existing JAR's ? I tried replacing them all, however kafka was just crashing on start.
- In regards to the patched JAR's, I see that they have "2.4.1" in the filenames, ie: kafka-clients-2.4.1-SNAPSHOT.jar
Whereas my existing JARS's have 5.4.1. - will that effect things, when I do the rolling restarts ? ie, mistmatching versions.
I am not very familiar with 5.4 setup.
Do you have the error message of the crash in the log? is it missing the zkclient jar like below?
$ ls -l zk*.jar -rw-r--r-- 1 georgeli engineering 74589 Nov 18 18:21 zkclient-0.11.jar $ jar tvf zkclient-0.11.jar 0 Mon Nov 18 18:11:58 UTC 2019 META-INF/ 1135 Mon Nov 18 18:11:58 UTC 2019 META-INF/MANIFEST.MF 0 Mon Nov 18 18:11:58 UTC 2019 org/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/ 0 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ 3486 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/ContentWatcher.class 263 Mon Nov 18 18:11:58 UTC 2019 org/I0Itec/zkclient/DataUpdater.class
If this jar file was there before, please copy it back. I need to find out why it was missing after the build. maybe some dependency setup in gradle. I have also update the install doc using `./gradew clean build -x test`
Also make sure the startup script for kafka is not hard coding 5.4 jars, but take the jars from the lib classpath? e.g.
/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Dlog4j.configuration=file:/etc/kafka/log4j.xml -Xms22G -Xmx22G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:NewSize=16G -XX:MaxNewSize=16G -XX:InitiatingHeapOccupancyPercent=3 -XX:G1MixedGCCountTarget=1 -XX:G1HeapWastePercent=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -verbose:gc -Xloggc:/var/log/kafka/gc-kafka.log -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=29010 -Djava.rmi.server.hostname=kafka12345-dca4 -cp '.:/usr/share/kafka/lib/*' kafka.Kafka /etc/kafka/server.properties
If you give us more details, we can help more.
Thanks
Actually, I just patched and added back zkclient libs for the gradle build. Please "git clone https://github.com/sql888/kafka.git" (or git pull) and try to build again. I suspect that was the issue. Otherwise, we need to see the errors of the crash from the kafka logs.
Apologies for the delayed response sql_consulting. Have been a bit unwell (not corona).
Thanks for doing that. Hopefully I'll have some time asap to do more testing. It's a bit chaotic at the moment.
On another small note, have you ever experienced that auto.leader.rebalance.enable=false , doesn't take effect at all ?. I have it set on all nodes in a cluster (after restarting all), however when bringing a node back online (service restart for example), leaders are still moved back to the broker automatically for those partitions it's preferred leader for.
Is there a vnode in ZK, that might be stale, that I could check ?
Hi guys! I'm trying to learn about the alternatives for managing preferred leader election. There's extensive discussion here about the built-in option for Kafka. I wonder though if using Cruise Control is a practical alternative? Thank you!
hmm...That's weird.
auto.leader.rebalance.enable seems to be functioning as it is meant. need to make sure the controller has it set correctly.
I wonder what the running config is. could you this:
~/confluent-kafka-go$ git remote -v origin https://github.com/confluentinc/confluent-kafka-go.git (fetch) origin https://github.com/confluentinc/confluent-kafka-go.git (push) ~/confluent-kafka-go$ go run examples/admin_describe_config/admin_describe_config.go <broker_host>:9092 broker <broker_id> |grep auto auto.leader.rebalance.enable = false STATIC_BROKER_CONFIG Read-only:true Sensitive:false auto.create.topics.enable = true STATIC_BROKER_CONFIG Read-only:true Sensitive:false
The above auto.leader.rebalance.enable = false is the real/actual config. some configs can be change dynamically while the process is running. just want to make sure. do it for all brokers.
Another cause might be some cluster management software running (like cruise control), that might be doing PLE periodically? that will make te current leader = first replica when first replica is in ISR.
Thanks sql_consulting point me to this ticket. junrao Just want to hear a bit more about why KIP-491 is not in consideration based on your comment above.
I am not sure if KIP-491 is necessarily the best approach to address this particular issue (in general, one probably shouldn't have any broker overloaded at any time). However, if there are other convincing use cases, we could consider it.
I feel it's a very useful feature for a lot of operation cases:
1. For high replica rate when broker boot up:
To me uneven size of partition on production is very command, with throttle some big partitions will take much longer to get fully replicated. Sometimes we just want a fully replica broker(like in 10 minutes without replica rather than hours). A long time with under replica broker in the system add more complicity for operation. For example, we need to be careful there is no other broker is offline during the replicating process.
2 Other situation like outlier broker
This happen pretty often if the cluster is big, most of the time it's not easy(at least time consuming) to replace broker even with EBS. We would like to disable a broker as leader but not take it offline. So the on-call have time to investigate the problem without terminate it right away. With KIP-491 we can add a lot of automation to the system that handle some network partition for a single broker without actually replace it.
3 Potential
If we can manipulate the view of leader in a cluster, we can do a bit more like introduce different leader for producer and consumer(consumer now can consumer from replica but I think there is still way we can control it). Then we can add priority to the client level and isolate client to talk only some of the brokers.
This is more for KIP-491, we can surely move it back to the original ticket if we feel there is more discussion for this.
hai_lin : (1) If we just need a tool to disallow a broker from taking over as the leader for some partitions, it seems the existing partitionReassignment tool can do that already. One just needs to move that broker away from the first replica in the assigned replica list. So, I am not sure if we need a separate tool proposed by KIP-491 for this. (2) For the common case when a broker is restarted, currently, we move the leader when some, but not all replicas are caught up. It could be a better behavior to wait until all replicas are caught up before changing the leaders. If this is the more desired behavior, maybe this can just be added in the auto leader balancing logic, instead of a separate tool.
junrao ,
I dont want to keep adding examples of why this is a problem in production clusters, but also I agree with your sentence "It could be a better behaviour to wait until all replicas are caught up before changing the leaders. If this is the more desired behaviour, maybe this can just be added in the auto leader balancing logic, instead of a separate tool."
So maybe we can find an agreement in between the options.
I think this could be optional, so we could have a parameter called "auto.leader.rebalance.paused.if.underreplicated.replicas" (default false) which basically will skip the leader rebalance when the cluster has under-replicated partitions.
The advantage is there is not administrator intervention before or after the cluster operation which remove another step on cluster administration.
I wanted to add some extra information about the metrics and issues you have reported:
I had similar issues and I found out the root cause of those failures is the OS (I use linux) reaching its capacity to hold memory dirty pages.
The Kernel has a mechanism to check how "quick" the processes write data, in order to protect itself it will throttle teh process which is generating a bunch of data when it calculates if it continues at the same rate it will end up filling up the OS memory with dirty pages. So after adding the throttle you Kafka process moves writes to "sync" to "async" basically the OS makes the process to wait in each write to allow the OS flush to write the dirty pages and write the new ones.
I still consider that the demote broker is the best option when we are replacing a volume but we have generated a tool to monitor when the OS is throttling the Kafka process, so at least we have visibility.
We saw when the OS throttle the process the produce request goes from 30 ms to 3 seconds, of course this creates a lot of problems to the clients.
Here is the script we use to monitor this:
#!/usr/bin/env python3 # # Detects if the kernel is throttling a proccess due to the high write operations # Get the thottle time in jiffies, more info here: https://www.yugabyte.com/blog/linux-performance-tuning-memory-disk-io/ # and here: https://msreekan.com/tag/dirty-pages/ # For Linux, uses BCC, eBPF. # # USAGE: monitor-writeback-throttle # # Copyright 2023 Adevinta. # # 28-Agust-2023 Sergio Troiano Created this. import argparse from time import sleepfrom bcc import BPF from datadog import initialize, statsd def argsparser(): parser = argparse.ArgumentParser( description="Detects if the kernel is throttling a proccess due to the high write operations.", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "-f", "--probes_source_file", default="monitor-writeback-throttle.c", type=str, help="C source code of the probe to be attached to the kernel trace function", ) parser.add_argument( "-i", "--interval", default=3, type=int, help="output interval, in seconds" ) parser.add_argument( "-d", "--report-to-datadog", default=False, help="Report metrics to Datadog.", action="store_true", ) return parser.parse_args() def load_probes(args): with open(args.probes_source_file, "r") as file: bpf_text = file.read() file.close() b = BPF(text=bpf_text) return b def print_event(ctx, data, size): event = b["events"].event(data) paused_events.append(event.pause) args = argsparser() if args.report_to_datadog: options = {"statsd_host": "127.0.0.1", "statsd_port": 8125} initialize(**options) PREFIX = "ebpf."b = load_probes(args) b["events"].open_ring_buffer(print_event)while True: paused_events = [] sleep(int(args.interval)) b.ring_buffer_poll(timeout=int(args.interval)) if args.report_to_datadog: statsd.gauge("{}writeback_throttle".format(PREFIX), int(any(paused_events))) else: print({"paused": any(paused_events)})
And of course you will need the probe in C for eBPF:
#include <linux/ptrace.h> #include <linux/kernel.h> #include <linux/module.h> #include <linux/kprobes.h>struct event { unsigned long pause; };BPF_RINGBUF_OUTPUT(events, 8);// This probe traces the kernel function balance_dirty_pages() from writeback.c TRACEPOINT_PROBE(writeback, balance_dirty_pages) { struct event event = {}; event.pause = args->pause; events.ringbuf_output(&event, sizeof(event), 0); return 0; };
tcrayford-heroku, if you check KafkaController.checkAndTriggerPartitionRebalance(), the auto leader balancing logic actually moves the leader one partition at a time. In contrast, the manual preferred leader election moves the leader for all partitions. So, do you see the issue with auto or manual leader election?