Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.16.0
-
None
Description
When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint.
The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149 ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice:
- The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it.
- The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed.
And we also found another problem see jira: https://issues.apache.org/jira/browse/FLINK-32362
Attachments
Issue Links
- causes
-
FLINK-32478 SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
- Closed
- relates to
-
FLINK-32411 SourceCoordinator thread leaks when job recovers from checkpoint
- Resolved
- links to