Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7216

ExecutionGraph can perform concurrent global restarts to scheduling

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.2.1, 1.3.1
    • Fix Version/s: 1.3.2, 1.4.0
    • Labels:
      None

      Description

      Because ExecutionGraph restarts happen asynchronously and possibly delayed, it can happen in rare corner cases that two restarts are attempted concurrently, in which case some structures on the Execution Graph undergo a concurrent access:

      Sample stack trace:

      WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Failed to restart the job.
      java.lang.IllegalStateException: SlotSharingGroup cannot clear task assignment, group still has allocated resources.
          at org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup.clearTaskAssignment(SlotSharingGroup.java:78)
          at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.resetForNewExecution(ExecutionJobVertex.java:535)
          at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1151)
          at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestarter$1.call(ExecutionGraphRestarter.java:40)
          at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95)
          at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
          at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:748)
      

      The solution is to strictly guard against "subsumed" restarts via the globalModVersion in a similar way as we fence local restarts against global restarts.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                StephanEwen Stephan Ewen
                Reporter:
                StephanEwen Stephan Ewen
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: