Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33360

HybridSource fails to clear the previous round's state when switching sources, leading to data loss

    XMLWordPrintableJSON

Details

    Description

      org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:

                  // track readers that have finished processing for current enumerator
                  finishedReaders.add(subtaskId);
                  if (finishedReaders.size() == context.currentParallelism()) {
                      LOG.debug("All readers finished, ready to switch enumerator!");
                      if (currentSourceIndex + 1 < sources.size()) {
                          switchEnumerator();
                          // switch all readers prior to sending split assignments
                          for (int i = 0; i < context.currentParallelism(); i++) {
                              sendSwitchSourceEvent(i, currentSourceIndex);
                          }
                      }
                  } 

      I think that finishedReaders is used to keep track of all the subTaskIds that have finished reading the current round of the source. Therefore, in the switchEnumerator function, finishedReaders should be cleared:

      If it's not cleared, then in the next source reading, whenever any SourceReader reports a SourceReaderFinishedEvent (while other SourceReaders may not have finished processing in parallel), the condition finishedReaders.size() == context.currentParallelism() will be satisfied and it will trigger sendSwitchSourceEvent(i, currentSourceIndex), sending a SwitchSourceEvent to all SourceReaders.
      If a SourceReader receives a SwitchSourceEvent before it finishes reading the previous source, it will execute currentReader.close(), and some data may not be fully read, resulting in a partial data loss in the source.

      Attachments

        Activity

          People

            fengjiajie Feng Jiajie
            fengjiajie Feng Jiajie
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: