Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3020

Local streaming execution: set number of task manager slots to the maximum parallelism

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Fixed
    • 0.10.0
    • 0.10.1, 1.0.0
    • Runtime / Task
    • None

    Description

      Quite an inconvenience is the local execution configuration behavior. It sets the number of task slots of the mini cluster to the default parallelism. This causes problem if you use setParallelism(parallelism) on an operator and set a parallelism larger than the default parallelism.

      Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #0 (Flat Map (9/100)) @ (unassigned) - [SCHEDULED] > with groupID < fa7240ee1fed08bd7e6278899db3e838 > in sharing group < SlotSharingGroup [f3d578e9819be9c39ceee86cf5eb8c08, 8fa330746efa1d034558146e4604d0b4, fa7240ee1fed08bd7e6278899db3e838] >. Resources available to scheduler: Number of instances=1, total number of slots=8, available slots=0
      	at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
      	at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
      	at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
      	at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
      	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
      	at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	... 2 more
      

      I propose to change this behavior to setting the number of task slots to the maximum parallelism present in the user program.

      What do you think?

      Attachments

        Issue Links

          Activity

            People

              mxm Maximilian Michels
              mxm Maximilian Michels
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: