Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3020

Local streaming execution: set number of task manager slots to the maximum parallelism

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 0.10.0
    • Fix Version/s: 0.10.1, 1.0.0
    • Component/s: Runtime / Task
    • Labels:
      None

      Description

      Quite an inconvenience is the local execution configuration behavior. It sets the number of task slots of the mini cluster to the default parallelism. This causes problem if you use setParallelism(parallelism) on an operator and set a parallelism larger than the default parallelism.

      Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #0 (Flat Map (9/100)) @ (unassigned) - [SCHEDULED] > with groupID < fa7240ee1fed08bd7e6278899db3e838 > in sharing group < SlotSharingGroup [f3d578e9819be9c39ceee86cf5eb8c08, 8fa330746efa1d034558146e4604d0b4, fa7240ee1fed08bd7e6278899db3e838] >. Resources available to scheduler: Number of instances=1, total number of slots=8, available slots=0
      	at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
      	at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
      	at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
      	at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
      	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
      	at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	... 2 more
      

      I propose to change this behavior to setting the number of task slots to the maximum parallelism present in the user program.

      What do you think?

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                mxm Maximilian Michels
                Reporter:
                mxm Maximilian Michels
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: