Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32548 Make watermark alignment ready for production use
  3. FLINK-32411

SourceCoordinator thread leaks when job recovers from checkpoint

Attach filesAttach ScreenshotVotersWatch issueWatchersLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      SourceCoordinator thread leaks when job recovers from checkpoint, from the following figure, we can see:

      • 2 SourceCoordinator thread for slow SlowNumberSequenceSource
      • 2 SourceCoordinator thread for slow FastNumberSequenceSource 

      Root cause:

      1. When initialize the ExecutionJobVertex of source, RecreateOnResetOperatorCoordinator will create the SourceCoordinator. code link
      2. When job recovers from checkpoint,  RecreateOnResetOperatorCoordinator#resetToCheckpoint will close the old coordinator, and create a new coordinator. 
      3. The SourceCoordinator#close just close the SourceCoordinatorContext after coordinator is started, so the SourceCoordinatorContext of old coordinator won't be closed.
      4. The SourceCoordinatorContext create some threads in its constructor, so it should be closed even if the SourceCoordinator isn't started.

       

      The call stack about creating SourceCoordinator:

      // Create the first SourceCoordinator
      "jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable
        java.lang.Thread.State: RUNNABLE
            at org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:142)
            at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
            - locked <0x1f02> (a org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:60)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:43)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196)
            at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534)
            at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497)
            at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
            at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
            at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912)
            at org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218)
            at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894)
            at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850)
            at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207)
            at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163)
            at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:366)
            at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210)
            at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:140)
            at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156)
            at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122)
            at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:378)
            at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:355)
            at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
            at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
            at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory$$Lambda$751.371794887.get(Unknown Source:-1)
            at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
            at org.apache.flink.util.function.FunctionUtils$$Lambda$752.1103993612.get(Unknown Source:-1)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1590)
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:-1)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
      
      
      
      
      
      
      // Create the second SourceCoordinator when recovers from checkpoint
      "Thread-7@8239" daemon prio=5 tid=0x4f nid=NA runnable
        java.lang.Thread.State: RUNNABLE
            at org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:142)
            at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
            - locked <0x2063> (a org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:150)
            at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$$Lambda$900.2091837632.accept(Unknown Source:-1)
            at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
            at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
            at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
            at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)
            at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils$$Lambda$895.1145152008.run(Unknown Source:-1)
            at java.lang.Thread.run(Thread.java:748)

       

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            fanrui Rui Fan
            fanrui Rui Fan
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment