Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.16.2, 1.18.0, 1.17.1
Description
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
// track readers that have finished processing for current enumerator finishedReaders.add(subtaskId); if (finishedReaders.size() == context.currentParallelism()) { LOG.debug("All readers finished, ready to switch enumerator!"); if (currentSourceIndex + 1 < sources.size()) { switchEnumerator(); // switch all readers prior to sending split assignments for (int i = 0; i < context.currentParallelism(); i++) { sendSwitchSourceEvent(i, currentSourceIndex); } } }
I think that finishedReaders is used to keep track of all the subTaskIds that have finished reading the current round of the source. Therefore, in the switchEnumerator function, finishedReaders should be cleared:
If it's not cleared, then in the next source reading, whenever any SourceReader reports a SourceReaderFinishedEvent (while other SourceReaders may not have finished processing in parallel), the condition finishedReaders.size() == context.currentParallelism() will be satisfied and it will trigger sendSwitchSourceEvent(i, currentSourceIndex), sending a SwitchSourceEvent to all SourceReaders.
If a SourceReader receives a SwitchSourceEvent before it finishes reading the previous source, it will execute currentReader.close(), and some data may not be fully read, resulting in a partial data loss in the source.