Details
-
Sub-task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.16.2, 1.17.1
Description
SourceCoordinator thread leaks when job recovers from checkpoint, from the following figure, we can see:
- 2 SourceCoordinator thread for slow SlowNumberSequenceSource
- 2 SourceCoordinator thread for slow FastNumberSequenceSource
Root cause:
- When initialize the ExecutionJobVertex of source, RecreateOnResetOperatorCoordinator will create the SourceCoordinator. code link
- When job recovers from checkpoint, RecreateOnResetOperatorCoordinator#resetToCheckpoint will close the old coordinator, and create a new coordinator.
- The SourceCoordinator#close just close the SourceCoordinatorContext after coordinator is started, so the SourceCoordinatorContext of old coordinator won't be closed.
- The SourceCoordinatorContext create some threads in its constructor, so it should be closed even if the SourceCoordinator isn't started.
The call stack about creating SourceCoordinator:
// Create the first SourceCoordinator "jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:142) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339) - locked <0x1f02> (a org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:60) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:43) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912) at org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207) at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:366) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:140) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:378) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:355) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory$$Lambda$751.371794887.get(Unknown Source:-1) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) at org.apache.flink.util.function.FunctionUtils$$Lambda$752.1103993612.get(Unknown Source:-1) at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:-1) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) // Create the second SourceCoordinator when recovers from checkpoint "Thread-7@8239" daemon prio=5 tid=0x4f nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:142) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339) - locked <0x2063> (a org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:150) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$$Lambda$900.2091837632.accept(Unknown Source:-1) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77) at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils$$Lambda$895.1145152008.run(Unknown Source:-1) at java.lang.Thread.run(Thread.java:748)
Attachments
Attachments
Issue Links
- causes
-
FLINK-32478 SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails
- Closed
-
FLINK-32487 SourceCoordinatorAlignmentTest.testAnnounceCombinedWatermarkWithoutStart fails with RejectedExecution
- Closed
- is related to
-
FLINK-32316 Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
- Closed
- links to