Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32548 Make watermark alignment ready for production use
  3. FLINK-32411

SourceCoordinator thread leaks when job recovers from checkpoint

    XMLWordPrintableJSON

Details

    Description

      SourceCoordinator thread leaks when job recovers from checkpoint, from the following figure, we can see:

      • 2 SourceCoordinator thread for slow SlowNumberSequenceSource
      • 2 SourceCoordinator thread for slow FastNumberSequenceSource 

      Root cause:

      1. When initialize the ExecutionJobVertex of source, RecreateOnResetOperatorCoordinator will create the SourceCoordinator. code link
      2. When job recovers from checkpoint,  RecreateOnResetOperatorCoordinator#resetToCheckpoint will close the old coordinator, and create a new coordinator. 
      3. The SourceCoordinator#close just close the SourceCoordinatorContext after coordinator is started, so the SourceCoordinatorContext of old coordinator won't be closed.
      4. The SourceCoordinatorContext create some threads in its constructor, so it should be closed even if the SourceCoordinator isn't started.

       

      The call stack about creating SourceCoordinator:

      // Create the first SourceCoordinator
      "jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable
        java.lang.Thread.State: RUNNABLE
            at org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:142)
            at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
            - locked <0x1f02> (a org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:60)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:43)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196)
            at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534)
            at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497)
            at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
            at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
            at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912)
            at org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218)
            at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894)
            at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850)
            at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207)
            at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163)
            at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:366)
            at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210)
            at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:140)
            at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156)
            at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122)
            at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:378)
            at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:355)
            at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
            at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
            at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory$$Lambda$751.371794887.get(Unknown Source:-1)
            at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
            at org.apache.flink.util.function.FunctionUtils$$Lambda$752.1103993612.get(Unknown Source:-1)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1590)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:-1)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
      
      
      
      
      
      
      // Create the second SourceCoordinator when recovers from checkpoint
      "Thread-7@8239" daemon prio=5 tid=0x4f nid=NA runnable
        java.lang.Thread.State: RUNNABLE
            at org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:142)
            at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
            - locked <0x2063> (a org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:150)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$$Lambda$900.2091837632.accept(Unknown Source:-1)
            at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
            at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
            at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
            at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)
            at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils$$Lambda$895.1145152008.run(Unknown Source:-1)
            at java.lang.Thread.run(Thread.java:748)

       

      Attachments

        Issue Links

          Activity

            People

              fanrui Rui Fan
              fanrui Rui Fan
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: