Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.7.0, 2.6.1
-
None
Description
There is a race condition in MirrorSourceTask where certain partition offsets may never be sent. The bug occurs when the outstandingOffsetSync semaphore is full. In this case, the sendOffsetSync will silently fail.
This failure is normally acceptable since offset sync will retry frequently. However, maybeSyncOffsets has a bug where it will mutate the partition state prior to confirming the result of sendOffsetSync. The end result is that the partition state is mutated prematurely, and prevent future offset syncs to recover.
Since MAX_OUTSTANDING_OFFSET_SYNCS is 10, this bug happens when you assign more than 10 partitions to each task.
In my test cases where I had over 100 partitions per task, the majority of the offsets were wrong. Here's an example of such a failure. https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17308308&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17308308
During my troubleshooting, I customized the MirrorSourceTask to confirm that all partitions that have the wrong offset were failing to acquire the initial semaphore. The condition can be trapped here.
Possible Fix:
A possible fix is to create a shouldUpdate method in PartitionState. This method should be read-only and return true if sendOffsetSync is needed. Once sendOffsetSync is successful, only then update should be called.
Here's some pseudocode
private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) { PartitionState partitionState = partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); if (partitionState.shouldUpdate(upstreamOffset, downstreamOffset)) { if(sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) { partitionState.update(upstreamOffset, downstreamOffset) } } }
Workaround:
For those who are experiencing this issue, the workaround is to make sure you have less than or equal to 10 partitions per task. Set your `tasks.max` value accordingly.
Attachments
Issue Links
- links to