Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.10.0.0
-
None
-
None
Description
We observed a case that the leader experienced a crash and lost its in-memory data and latest HW offsets. Normally Kafka should be safe and be able to make progress with a single node failure. However a few seconds before the crash the leader shrunk its ISR to itself, which is safe since min-in-sync-replicas is 2 and replication factor is 3 thus the troubled leader cannot accept new produce messages. After the crash however the controller could not name any of the of the followers as the new leader since as far as the controller knows they are not in ISR and could potentially be behind the last leader. Note that unclean-leader-election is disabled in this cluster since the cluster requires a very high degree of durability and cannot tolerate data loss.
The impact could get worse if the admin brings up the crashed broker in an attempt to make such partitions available again; this would take down even more brokers as the followers panic when they find their offset larger than HW offset in the leader:
if (leaderEndOffset < replica.logEndOffset.messageOffset) { // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset)) Runtime.getRuntime.halt(1) }
One hackish solution would be that the admin investigates the logs, determine that unclean-leader-election in this particular case would be safe and temporarily enables it (while the crashed node is down) until new leaders are selected for affected partitions, wait for the topics LEO advances far enough and then brings up the crashed node again. This manual process is however slow and error-prone and the cluster will suffer partial unavailability in the meanwhile.
We are thinking of having the controller make an exception for this case: if ISR size is less than min-in-sync-replicas and the new leader would be -1, then the controller does an RPC to all the replicas and inquire of the latest offset, and if all the replicas responded then chose the one with the largest offset as the leader as well as the new ISR. Note that the controller cannot do that if any of the non-leader replicas do not respond since there might be a case that the responding replicas have not been involved the last ISR and hence potentially behind the others (and the controller could not know that since it does not keep track of previous ISR).
Pros would be that kafka will be safely available when such cases occur and would not require any admin intervention. The cons however is that the controller talking to brokers inside the leader election function would break the existing pattern in the source code as currently the leader is elected locally without requiring any additional RPC.
Thoughts?
Attachments
Issue Links
- is related to
-
KAFKA-3410 Unclean leader election and "Halting because log truncation is not allowed"
- Resolved
- is superceded by
-
KAFKA-12241 Partition offline when ISR shrinks to leader and LogDir goes offline
- Open