Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: 1.0.0
    • Fix Version/s: 1.3.0
    • Component/s: Startup Shell Scripts
    • Labels:
      None

      Description

      On NUMA systems Flink can be pinned to a single physical processor ("node") using numactl --membind=$node --cpunodebind=$node <command>. Commonly available NUMA systems include the largest AWS and Google Compute instances.

      For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could configure a single TaskManager with 36 slots or have Flink create two TaskManagers bound to each of the NUMA nodes, each with 18 slots.

      There may be some extra overhead in transferring network buffers between TaskManagers on the same system, though the fraction of data shuffled in this manner decreases with the size of the cluster. The performance improvement from only accessing local memory looks to be significant though difficult to benchmark.

      The JobManagers may fit into NUMA nodes rather than requiring full systems.

        Issue Links

          Activity

          Hide
          greghogan Greg Hogan added a comment -

          Implemented in 11c868f91db773af626ac6ac4dcba9820c13fa8a

          Show
          greghogan Greg Hogan added a comment - Implemented in 11c868f91db773af626ac6ac4dcba9820c13fa8a
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3249

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3249
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user greghogan commented on the issue:

          https://github.com/apache/flink/pull/3249

          @StephanEwen thanks for the review. I'll verify, test, and merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3249 @StephanEwen thanks for the review. I'll verify, test, and merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3249

          Looks good.

          +1 from my side!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3249 Looks good. +1 from my side!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user greghogan commented on the issue:

          https://github.com/apache/flink/pull/3249

          Added note specifying NUMA support is applicable for standalone only.

          This is a much harder feature to support in a multi-application environment, which is likely why none of these cluster managers have added support.

          Show
          githubbot ASF GitHub Bot added a comment - Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3249 Added note specifying NUMA support is applicable for standalone only. This is a much harder feature to support in a multi-application environment, which is likely why none of these cluster managers have added support.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3249

          We can have this in YARN at least as well, because YARN starts its TaskManagers in each container via a bash command. We can also merge this one first, but then it would be good to add to the docs that this applies only to standalone mode at the moment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3249 We can have this in YARN at least as well, because YARN starts its TaskManagers in each container via a bash command. We can also merge this one first, but then it would be good to add to the docs that this applies only to standalone mode at the moment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user greghogan commented on the issue:

          https://github.com/apache/flink/pull/3249

          @StephanEwen from the discussion of FLINK-3163 I also had the idea of `taskmanager.compute.fraction` where the number of slots would be a multiple of the number of cores / vcores. Since Flink processes these as opaque strings the only purpose is to help organize [config page](https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html).

          I have found YARN-5764, MESOS-5342, and MESOS-314 discussing NUMA support for containers but all are works in progress. I see that Docker supports `-cpuset-cpus` and `-cpuset-mems` in [docker run](https://docs.docker.com/engine/reference/run/) and in [docker compose](https://docs.docker.com/compose/compose-file) config version 2 (using `cpuset`). It's not clear how to dynamically bind Flink to numa nodes without scripting Flink's docker commands.

          Show
          githubbot ASF GitHub Bot added a comment - Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3249 @StephanEwen from the discussion of FLINK-3163 I also had the idea of `taskmanager.compute.fraction` where the number of slots would be a multiple of the number of cores / vcores. Since Flink processes these as opaque strings the only purpose is to help organize [config page] ( https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html ). I have found YARN-5764 , MESOS-5342 , and MESOS-314 discussing NUMA support for containers but all are works in progress. I see that Docker supports `- cpuset-cpus` and ` -cpuset-mems` in [docker run] ( https://docs.docker.com/engine/reference/run/ ) and in [docker compose] ( https://docs.docker.com/compose/compose-file ) config version 2 (using `cpuset`). It's not clear how to dynamically bind Flink to numa nodes without scripting Flink's docker commands.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3249

          I think this is a great idea. Can we also get this integrated with the Yarn / Mesos / Docker setup scripts and code? Keeping all these different deployment options on par would be nice.

          Minor comment: I think you can also name the parameter `taskmanager.numa`, rather than `taskmanager.compute.numa`, unless we plan to have further options under the `taskmanager.compute.` namespace.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3249 I think this is a great idea. Can we also get this integrated with the Yarn / Mesos / Docker setup scripts and code? Keeping all these different deployment options on par would be nice. Minor comment: I think you can also name the parameter `taskmanager.numa`, rather than `taskmanager.compute.numa`, unless we plan to have further options under the `taskmanager.compute.` namespace.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user greghogan opened a pull request:

          https://github.com/apache/flink/pull/3249

          FLINK-3163 [scripts] Configure Flink for NUMA systems

          Start a TaskManager on each NUMA node on each worker when the new configuration option 'taskmanager.compute.numa' is enabled.

          This does not affect the runtime process for the JobManager (or future ResourceManager) as the startup scripts do not provide a simple means of disambiguating masters and slaves. I expect most large clusters to run these master processes on separate machines, and for small clusters the JobManager can run alongside a TaskManager.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/greghogan/flink 3163_configure_flink_for_numa_systems

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3249.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3249


          commit 57767e67dc7306d18df07d5224c81a8d359df620
          Author: Greg Hogan <code@greghogan.com>
          Date: 2017-02-01T17:13:49Z

          FLINK-3163 [scripts] Configure Flink for NUMA systems

          Start a TaskManager on each NUMA node on each worker when the new
          configuration option 'taskmanager.compute.numa' is enabled.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3249 FLINK-3163 [scripts] Configure Flink for NUMA systems Start a TaskManager on each NUMA node on each worker when the new configuration option 'taskmanager.compute.numa' is enabled. This does not affect the runtime process for the JobManager (or future ResourceManager) as the startup scripts do not provide a simple means of disambiguating masters and slaves. I expect most large clusters to run these master processes on separate machines, and for small clusters the JobManager can run alongside a TaskManager. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3163_configure_flink_for_numa_systems Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3249.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3249 commit 57767e67dc7306d18df07d5224c81a8d359df620 Author: Greg Hogan <code@greghogan.com> Date: 2017-02-01T17:13:49Z FLINK-3163 [scripts] Configure Flink for NUMA systems Start a TaskManager on each NUMA node on each worker when the new configuration option 'taskmanager.compute.numa' is enabled.
          Hide
          greghogan Greg Hogan added a comment -

          I think we can achieve "good enough" without changing the format of masters and slaves. Mesos and YARN provide cluster management, and it might be best to keep the Flink configuration simple.

          What if we added

          • a configuration parameter to enable NUMA which would result in a TaskManager started on each NUMA node for each IP in slaves
          • a configuration parameter (one or two?) for the JobManager and ResourceManager to run in their own NUMA node, not shared with a TaskManager (would the JM and RM share a NUMA node if on the same IP?)

          These could be taskmanager.compute.numa, jobmanager.compute.numa, and resourcemanager.compute.numa.

          We could also add, as a related idea, taskmanager.compute.fraction. This would operate relative to taskmanager.numberOfTaskSlots as taskmanager.memory.fraction operates relative to taskmanager.memory.size. If set to 1.0 you would get one slot per (hyper-threaded) processor.

          As Saliya Ekanayake noted, binding processes is quite easy. Since I have only dealt with single-socket systems I have temporarily hard-coded the following in my build:

          diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
          index e579c0c..5f076d5 100755
          --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
          +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
          @@ -96,4 +96,10 @@ if [[ $STARTSTOP == "start" ]]; then
               args=("--configDir" "${FLINK_CONF_DIR}")
           fi
           
          -"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
          +command -v numactl >/dev/null 2>&1
          +if [[ $? -ne 0 ]]; then
          +    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
          +else
          +    numactl --membind=0 --cpunodebind=0 -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
          +    numactl --membind=1 --cpunodebind=1 -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
          +fi
          
          Show
          greghogan Greg Hogan added a comment - I think we can achieve "good enough" without changing the format of masters and slaves . Mesos and YARN provide cluster management, and it might be best to keep the Flink configuration simple. What if we added a configuration parameter to enable NUMA which would result in a TaskManager started on each NUMA node for each IP in slaves a configuration parameter (one or two?) for the JobManager and ResourceManager to run in their own NUMA node, not shared with a TaskManager (would the JM and RM share a NUMA node if on the same IP?) These could be taskmanager.compute.numa , jobmanager.compute.numa , and resourcemanager.compute.numa . We could also add, as a related idea, taskmanager.compute.fraction . This would operate relative to taskmanager.numberOfTaskSlots as taskmanager.memory.fraction operates relative to taskmanager.memory.size . If set to 1.0 you would get one slot per (hyper-threaded) processor. As Saliya Ekanayake noted, binding processes is quite easy. Since I have only dealt with single-socket systems I have temporarily hard-coded the following in my build: diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index e579c0c..5f076d5 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -96,4 +96,10 @@ if [[ $STARTSTOP == "start" ]]; then args=( "--configDir" "${FLINK_CONF_DIR}" ) fi - "${FLINK_BIN_DIR}" /flink-daemon.sh $STARTSTOP taskmanager "${args[@]}" +command -v numactl >/dev/ null 2>&1 + if [[ $? -ne 0 ]]; then + "${FLINK_BIN_DIR}" /flink-daemon.sh $STARTSTOP taskmanager "${args[@]}" + else + numactl --membind=0 --cpunodebind=0 -- "${FLINK_BIN_DIR}" /flink-daemon.sh $STARTSTOP taskmanager "${args[@]}" + numactl --membind=1 --cpunodebind=1 -- "${FLINK_BIN_DIR}" /flink-daemon.sh $STARTSTOP taskmanager "${args[@]}" +fi
          Hide
          saliya Saliya Ekanayake added a comment -

          In the standalone cluster setup, doing this with start scripts would be a relatively easy. The slaves file can support a format like, "IP N TM-bind-to-resource Slot-bind-to-resource", where N is the number of TMs to spawn in the particular host. I haven't looked into the workings of slots, but TMs are JVM processes, so it's possible to prefix the start command with numactl or taskset for pinning.

          Show
          saliya Saliya Ekanayake added a comment - In the standalone cluster setup, doing this with start scripts would be a relatively easy. The slaves file can support a format like, "IP N TM-bind-to-resource Slot-bind-to-resource", where N is the number of TMs to spawn in the particular host. I haven't looked into the workings of slots, but TMs are JVM processes, so it's possible to prefix the start command with numactl or taskset for pinning.
          Hide
          StephanEwen Stephan Ewen added a comment -

          What steps does it involve to add this to Flink?

          Show
          StephanEwen Stephan Ewen added a comment - What steps does it involve to add this to Flink?
          Hide
          saliya Saliya Ekanayake added a comment -

          This is very relevant, especially when launching Flink on HPC clusters, which have multi-socket configurations. Running a single process (Task Manager) across NUMA boundaries is very inefficient due to memory transfers that happen through sockets, for example, Intel QPI links.

          Show
          saliya Saliya Ekanayake added a comment - This is very relevant, especially when launching Flink on HPC clusters, which have multi-socket configurations. Running a single process (Task Manager) across NUMA boundaries is very inefficient due to memory transfers that happen through sockets, for example, Intel QPI links.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Very interesting idea. Can this logic be generically added to the start and stop scripts?

          Does it make sense to think about integrating this with the YARN node launching as well?

          Show
          StephanEwen Stephan Ewen added a comment - Very interesting idea. Can this logic be generically added to the start and stop scripts? Does it make sense to think about integrating this with the YARN node launching as well?

            People

            • Assignee:
              greghogan Greg Hogan
              Reporter:
              greghogan Greg Hogan
            • Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development