Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11078

Capability to define the numerical range for running TaskExecutors




      In pre-FLIP-6 context, we start a yarn session with fixed number of running TaskManagers. This is good for user to run a series of small jobs on a specific cluster and reduce the cost of deploying a cluster per job. In current code base, we start a yarn session with none pre-allocated TMs, but allocates them on a certain job submitted and running.

      To get benefits from both mode, I propose introducing a pair (min, max) represents the minimum and maximum for the number of running TaskExecutors.

      With such option, when setting minimum = maximum = n we effectively have the same behaviour as before with the pre-Flip-6 code; and when setting minimum = 0, maximum = inf we effectively have the same behaviour as current code path.

      Most of the implementation area would be passing such option in FlinkYarnSessionCli and respecting it.

      Hopefully the changes are not that big and I would try to draft more details. Glad to see your suggestions and ideas.

      UPDATE: For how to respect the options, it is a choice that we

      1. Build SlotManager respecting the options.

      That is, passing options via Cli and set configuration, the code path would be env -> YarnEntrypointUtils#loadConfiguration -> YarnSessionClusterEntrypoint -> ... -> ClusterEntrypoint#runCluster -> ... -> SlotManagerConfiguration#fromConfiguration. And for the new options, we introduce two new keys ResourceManagerOptions#TASK_MANAGER_MAXIMUM and ResourceManagerOptions#TASK_MANAGER_MINIMUM

      2. Start SlotManager will also start new workers with number ResourceManagerOptions#TASK_MANAGER_MINIMUM

      3.1. When SlotManager#allocateResource, we ensure the number of TMs does not exceed

      3.2 When resourceActions#releaseResource, we ensure the number of TMs does exceed ResourceManagerOptions#TASK_MANAGER_MINIMUM. By an analysis of code path, currently we stopWorker only when SlotManager#checkTaskManagerTimeouts. Thus add a guardian before #releaseResource in #checkTaskManagerTimeouts should work.

      I give this a roughly attempt at https://github.com/tisonkun/flink/tree/FLINK-11078. More corner/conflict scenarios should be taken into consideration.


          Issue Links



              • Assignee:
                tison Zili Chen
                tison Zili Chen
              • Votes:
                0 Vote for this issue
                6 Start watching this issue


                • Created: