KAFKA-7395 Added FENCED_LEADER_EPOCH error response to a OffsetsForLeaderEpoch request if the epoch in the request is lower than the broker's leader epoch. ReplicaFetcherThread builds a OffsetsForLeaderEpoch request under partitionMapLock, sends the request outside the lock, and then processes the response under partitionMapLock. The broker may receive LeaderAndIsr with the same leader but with the next leader epoch, remove and add partition to the fetcher thread (with partition state reflecting the updated leader epoch) – all while the OffsetsForLeaderEpoch request (with the old leader epoch) is still outstanding/ waiting for the lock to process the OffsetsForLeaderEpoch response. As a result, partition gets removed from partitionStates and this broker will not fetch for this partition until the next LeaderAndIsr which may take a while. We will see log message like this:
[2018-12-23 07:23:04,802] INFO [ReplicaFetcher replicaId=3, leaderId=2, fetcherId=0] Partition test_topic-17 has an older epoch (7) than the current leader. Will await the new LeaderAndIsr state before resuming fetching. (kafka.server.ReplicaFetcherThread)
We saw this happen with kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=True.reassign_from_offset_zero=True. This test does partition re-assignment while bouncing 2 out of 4 total brokers. When the failure happen, each bounced broker was also a controller. Because of re-assignment, the controller updates leader epoch without updating the leader on controller change or on broker startup, so we see several leader epoch changes without the leader change, which increases the likelihood of the race condition described above.
Here is exact events that happen in this test (around the failure):
We have 4 brokers Brokers 1, 2, 3, 4. Partition re-assignment is started for test_topic-17 [2, 4, 1] —> [3, 1, 2]. At time t0, leader of test_topic-17 is broker 2.
- clean shutdown of broker 3, which is also a controller
- broker 4 becomes controller, continues re-assignment and updates leader epoch for test_topic-17 to 6 (with same leader)
- broker 2 (leader of test_topic-17) receives new leader epoch: “test_topic-17 starts at Leader Epoch 6 from offset 1388. Previous Leader Epoch was: 5”
- broker 3 is started again after clean shutdown
- controller sees broker 3 startup, and sends LeaderAndIsr(leader epoch 6) to broker 3
- controller updates leader epoch to 7
- broker 2 (leader of test_topic-17) receives LeaderAndIsr for leader epoch 7: “test_topic-17 starts at Leader Epoch 7 from offset 1974. Previous Leader Epoch was: 6”
- broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 6 from controller: “Added fetcher to broker BrokerEndPoint(id=2) for leader epoch 6” and sends OffsetsForLeaderEpoch request to broker 2
- broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 7 from controller; removes fetcher thread and adds fetcher thread + executes AbstractFetcherThread.addPartitions() which updates partition state with leader epoch 7
- broker 3 receives FENCED_LEADER_EPOCH in response to OffsetsForLeaderEpoch(leader epoch 6), because the leader received LeaderAndIsr for leader epoch 7 before it got OffsetsForLeaderEpoch(leader epoch 6) from broker 3. As a result, it removes partition from partitionStates and it does not fetch until controller updates leader epoch and sends LeaderAndIsr for this partition to broker 3. The test fails, because re-assignment does not finish on time (due to broker 3 not fetching).
One way to address this is possibly add more state to PartitionFetchState. However, we may introduce other race condition. A cleaner way, I think, is to return leader epoch in the OffsetsForLeaderEpoch response with FENCED_LEADER_EPOCH error, and then ignore the error if partition state contains a higher leader epoch. The advantage is less state maintenance, but disadvantage is it requires bumping inter-broker protocol.