Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33170

HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ sources

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 1.15.4, 1.16.2, 1.17.1, 1.19.0, 1.18.1
    • None
    • None

    Description

      Possibly related to FLINK-27916.

      Priority labeled critical because this issue can cause major data loss, in our experience order GBs to TBs.

       

      In all versions since 1.15.x there's a subtle bug in HybridSourceSplitEnumerator when determining if it's time to move on to the next source:

      finishedReaders.add(subtaskId);
      if (finishedReaders.size() == context.currentParallelism()) {
        // move on to the next source if it exists
      

      This snippet is correct, but when changing to the next source, finishedReaders is never cleared. So when processing the second source, the finishedReaders.size() check will return true when the first subtask finishes.** The hybrid source moves on to the next source if one exists, so any records remaining to be read and sent in the other numSubtasks - 1 subtasks will get dropped.

       

      ** if each of the sources in the hybrid source has the same parallelism. If any source except the last has lower parallelism then I suspect that the source will never move on: it's impossible for finishedReaders.size() to shrink.

       

      Concrete example with three sources, two subtasks each:

      1. subtask 0 finishes with the first source. finishedReaders has size 1
      2. subtask 1 finishes with the first source. finishedReaders has size 2 now, and moves on to the second source
      3. subtask 1 finishes with the first source. finishedReaders.add(1) doesn't change the set; finishedReaders still has size 2. So the hybrid source moves on to the third source.
      4. subtask 0 wasn't finished with the second source, but receives the notification to move on. Any unsent records are lost. Data loss!
      5. this continues to the last source. The source doesn't change over if at the last source so the race condition in step 3 never happens

       

      So step 3 results in the race condition that will drop records indeterminately for all but the first source and last source.

      In production this issue caused the loss of GBs to TBs of data when a hybrid source had the following:

      • 3-5 underlying sources, each of which should emit 100 GB to 10 TB worth of records
      • all sources had the same number of splits, around 64-256

      We fixed it in a private fork by clearing the finishedReaders set when changing to the next source.

      Existing tests don't catch this data race because, as far as I understand them:

      • use two mock sources, whereas this bug manifests for 3+ sources
      • have sources with parallelism 1, while this bug manifests when the sources have parallelism > 1

      Attachments

        Activity

          People

            Unassigned Unassigned
            robertjira Robert Hoyt
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 24h
                24h
                Remaining:
                Remaining Estimate - 24h
                24h
                Logged:
                Time Spent - Not Specified
                Not Specified