Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12558

MM2 may not sync partition offsets correctly

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.7.0, 2.6.1
    • 3.5.0, 3.4.1, 3.3.3
    • mirrormaker
    • None

    Description

      There is a race condition in MirrorSourceTask where certain partition offsets may never be sent. The bug occurs when the outstandingOffsetSync semaphore is full. In this case, the sendOffsetSync will silently fail.

      This failure is normally acceptable since offset sync will retry frequently. However, maybeSyncOffsets has a bug where it will mutate the partition state prior to confirming the result of sendOffsetSync. The end result is that the partition state is mutated prematurely, and prevent future offset syncs to recover.

      Since MAX_OUTSTANDING_OFFSET_SYNCS is 10, this bug happens when you assign more than 10 partitions to each task.

      In my test cases where I had over 100 partitions per task, the majority of the offsets were wrong. Here's an example of such a failure. https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17308308&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17308308

      During my troubleshooting, I customized the MirrorSourceTask to confirm that all partitions that have the wrong offset were failing to acquire the initial semaphore. The condition can be trapped here.

      Possible Fix:

      A possible fix is to create a shouldUpdate method in PartitionState. This method should be read-only and return true if sendOffsetSync is needed. Once sendOffsetSync is successful, only then update should be called.

      Here's some pseudocode

      private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset,
              long downstreamOffset) {
          PartitionState partitionState =
              partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
          if (partitionState.shouldUpdate(upstreamOffset, downstreamOffset)) {
              if(sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
                  partitionState.update(upstreamOffset, downstreamOffset)
              }
          }
      }
      

       

      Workaround:

      For those who are experiencing this issue, the workaround is to make sure you have less than or equal to 10 partitions per task. Set your `tasks.max` value accordingly.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              askldjd Alan Ning
              Votes:
              6 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: