Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9153

TaskManagerRunner should support rpc port range

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.4.0, 1.5.0
    • 1.5.1, 1.6.0
    • Runtime / Coordination
    • None

    Description

      TaskManagerRunner current just support one specific port :

      final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
      

      It should support port range as the document described : https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#taskmanager-rpc-port

       

      Attachments

        Issue Links

          Activity

            githubbot ASF GitHub Bot added a comment -

            GitHub user yanghua opened a pull request:

            https://github.com/apache/flink/pull/5834

            FLINK-9153 TaskManagerRunner should support rpc port range

              1. What is the purpose of the change

            This pull request makes `TaskManagerRunner` (FLIP-6) supports rpc port range

              1. Brief change log
            • *Fixed a config item reading bug and let taskmanager runner support rpc port range *
              1. Verifying this change

            This change is a trivial rework / code cleanup without any test coverage.

              1. Does this pull request potentially affect one of the following parts:
            • Dependencies (does it add or upgrade a dependency): (yes / *no*)
            • The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / *no*)
            • The serializers: (yes / *no* / don't know)
            • The runtime per-record code paths (performance sensitive): (yes / *no* / don't know)
            • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know)
            • The S3 file system connector: (yes / *no* / don't know)
              1. Documentation
            • Does this pull request introduce a new feature? (yes / *no*)
            • If yes, how is the feature documented? (not applicable / docs / JavaDocs / *not documented*)

            You can merge this pull request into a Git repository by running:

            $ git pull https://github.com/yanghua/flink FLINK-9153

            Alternatively you can review and apply these changes as the patch at:

            https://github.com/apache/flink/pull/5834.patch

            To close this pull request, make a commit to your master/trunk branch
            with (at least) the following in the commit message:

            This closes #5834


            commit 212884207844d9485eb63e5e1ba32118e9fa1567
            Author: yanghua <yanghua1127@...>
            Date: 2018-04-10T11:52:16Z

            FLINK-9153 TaskManagerRunner should support rpc port range


            githubbot ASF GitHub Bot added a comment - GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/5834 FLINK-9153 TaskManagerRunner should support rpc port range What is the purpose of the change This pull request makes `TaskManagerRunner` (FLIP-6) supports rpc port range Brief change log *Fixed a config item reading bug and let taskmanager runner support rpc port range * Verifying this change This change is a trivial rework / code cleanup without any test coverage. Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): (yes / * no *) The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / * no *) The serializers: (yes / * no * / don't know) The runtime per-record code paths (performance sensitive): (yes / * no * / don't know) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / * no * / don't know) The S3 file system connector: (yes / * no * / don't know) Documentation Does this pull request introduce a new feature? (yes / * no *) If yes, how is the feature documented? (not applicable / docs / JavaDocs / * not documented *) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-9153 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5834.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5834 commit 212884207844d9485eb63e5e1ba32118e9fa1567 Author: yanghua <yanghua1127@...> Date: 2018-04-10T11:52:16Z FLINK-9153 TaskManagerRunner should support rpc port range
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on the issue:

            https://github.com/apache/flink/pull/5834

            cc @tillrohrmann for mode : `new` (flip-6) the `TaskManagerRunner` could not specify a port range, I suggest we could merge this PR as a hotfix issue and add it into 1.5 release.

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5834 cc @tillrohrmann for mode : `new` (flip-6) the `TaskManagerRunner` could not specify a port range, I suggest we could merge this PR as a hotfix issue and add it into 1.5 release.
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on the issue:

            https://github.com/apache/flink/pull/5834

            cc @tillrohrmann : If you have time, please review this PR, thanks.

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5834 cc @tillrohrmann : If you have time, please review this PR, thanks.
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on the issue:

            https://github.com/apache/flink/pull/5834

            cc @zentol

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5834 cc @zentol
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on the issue:

            https://github.com/apache/flink/pull/5834

            cc @zentol @tzulitai

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5834 cc @zentol @tzulitai
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/5834#discussion_r184030230

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
            @@ -355,13 +359,53 @@ public static RpcService createRpcService(
            taskManagerHostname, taskManagerAddress.getHostAddress());
            }

            • final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
              + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
            • checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
            • "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
            • "use 0 to let the system choose port automatically.",
            • ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
              + // parse port range definition and create port iterator
              + Iterator<Integer> portsIterator;
              + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }

              catch (Exception e)

              { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }

              +
              + while (portsIterator.hasNext()) {
              + // first, we check if the port is available by opening a socket
              + // if the actor system fails to start on the port, we try further
              + ServerSocket availableSocket = NetUtils.createSocketFromPorts(
              + portsIterator,
              + new NetUtils.SocketFactory()

              Unknown macro: { + @Override + public ServerSocket createSocket(int port) throws IOException { + return new ServerSocket(port); + } + }

              );
              +
              + int port;
              + if (availableSocket == null)

              { + throw new BindException("Unable to allocate further port in port range: " + portRangeDefinition); + }

              else {
              + port = availableSocket.getLocalPort();
              + try

              { + availableSocket.close(); + }

              catch (IOException ignored) {}
              + }
              +
              + try

              { + return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, port, configuration); + }

              + catch (Exception e)

              Unknown macro: { + // we can continue to try if this contains a netty channel exception + Throwable cause = e.getCause(); + if (!(cause instanceof org.jboss.netty.channel.ChannelException || + cause instanceof java.net.BindException)) { + throw e; + } // else fall through the loop and try the next port + }

              + }

            • return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration);
              + // if we come here, we have exhausted the port range
              + throw new BindException("Could not start actor system on any port in port range "
                • End diff –

            should not mention actor system but taskmanager instead

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5834#discussion_r184030230 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java — @@ -355,13 +359,53 @@ public static RpcService createRpcService( taskManagerHostname, taskManagerAddress.getHostAddress()); } final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0"); checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " + "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " + "use 0 to let the system choose port automatically.", ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort); + // parse port range definition and create port iterator + Iterator<Integer> portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + } + + while (portsIterator.hasNext()) { + // first, we check if the port is available by opening a socket + // if the actor system fails to start on the port, we try further + ServerSocket availableSocket = NetUtils.createSocketFromPorts( + portsIterator, + new NetUtils.SocketFactory() Unknown macro: { + @Override + public ServerSocket createSocket(int port) throws IOException { + return new ServerSocket(port); + } + } ); + + int port; + if (availableSocket == null) { + throw new BindException("Unable to allocate further port in port range: " + portRangeDefinition); + } else { + port = availableSocket.getLocalPort(); + try { + availableSocket.close(); + } catch (IOException ignored) {} + } + + try { + return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, port, configuration); + } + catch (Exception e) Unknown macro: { + // we can continue to try if this contains a netty channel exception + Throwable cause = e.getCause(); + if (!(cause instanceof org.jboss.netty.channel.ChannelException || + cause instanceof java.net.BindException)) { + throw e; + } // else fall through the loop and try the next port + } + } return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration); + // if we come here, we have exhausted the port range + throw new BindException("Could not start actor system on any port in port range " End diff – should not mention actor system but taskmanager instead
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/5834#discussion_r184029857

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
            @@ -355,13 +359,53 @@ public static RpcService createRpcService(
            taskManagerHostname, taskManagerAddress.getHostAddress());
            }

            • final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
              + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
                • End diff –

            don't need the override default as `0` is already the default.

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5834#discussion_r184029857 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java — @@ -355,13 +359,53 @@ public static RpcService createRpcService( taskManagerHostname, taskManagerAddress.getHostAddress()); } final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0"); End diff – don't need the override default as `0` is already the default.
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/5834#discussion_r184030144

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
            @@ -355,13 +359,53 @@ public static RpcService createRpcService(
            taskManagerHostname, taskManagerAddress.getHostAddress());
            }

            • final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
              + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
            • checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
            • "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
            • "use 0 to let the system choose port automatically.",
            • ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
              + // parse port range definition and create port iterator
              + Iterator<Integer> portsIterator;
              + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }

              catch (Exception e)

              { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }

              +
              + while (portsIterator.hasNext()) {
              + // first, we check if the port is available by opening a socket
              + // if the actor system fails to start on the port, we try further
              + ServerSocket availableSocket = NetUtils.createSocketFromPorts(

                • End diff –

            I guess you took this code from `BootstrapTools#startActorSystem()`, but I'm wondering why we don't pass the port directly to `createRpcService`.

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5834#discussion_r184030144 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java — @@ -355,13 +359,53 @@ public static RpcService createRpcService( taskManagerHostname, taskManagerAddress.getHostAddress()); } final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0"); checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " + "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " + "use 0 to let the system choose port automatically.", ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort); + // parse port range definition and create port iterator + Iterator<Integer> portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + } + + while (portsIterator.hasNext()) { + // first, we check if the port is available by opening a socket + // if the actor system fails to start on the port, we try further + ServerSocket availableSocket = NetUtils.createSocketFromPorts( End diff – I guess you took this code from `BootstrapTools#startActorSystem()`, but I'm wondering why we don't pass the port directly to `createRpcService`.
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on a diff in the pull request:

            https://github.com/apache/flink/pull/5834#discussion_r184031845

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
            @@ -355,13 +359,53 @@ public static RpcService createRpcService(
            taskManagerHostname, taskManagerAddress.getHostAddress());
            }

            • final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
              + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
            • checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
            • "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
            • "use 0 to let the system choose port automatically.",
            • ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
              + // parse port range definition and create port iterator
              + Iterator<Integer> portsIterator;
              + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }

              catch (Exception e)

              { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }

              +
              + while (portsIterator.hasNext()) {
              + // first, we check if the port is available by opening a socket
              + // if the actor system fails to start on the port, we try further
              + ServerSocket availableSocket = NetUtils.createSocketFromPorts(

                • End diff –

            @zentol the document always states it support the port range : https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#taskmanager-rpc-port

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/5834#discussion_r184031845 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java — @@ -355,13 +359,53 @@ public static RpcService createRpcService( taskManagerHostname, taskManagerAddress.getHostAddress()); } final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0"); checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " + "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " + "use 0 to let the system choose port automatically.", ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort); + // parse port range definition and create port iterator + Iterator<Integer> portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + } + + while (portsIterator.hasNext()) { + // first, we check if the port is available by opening a socket + // if the actor system fails to start on the port, we try further + ServerSocket availableSocket = NetUtils.createSocketFromPorts( End diff – @zentol the document always states it support the port range : https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#taskmanager-rpc-port
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on a diff in the pull request:

            https://github.com/apache/flink/pull/5834#discussion_r184032097

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
            @@ -355,13 +359,53 @@ public static RpcService createRpcService(
            taskManagerHostname, taskManagerAddress.getHostAddress());
            }

            • final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
              + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
            • checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
            • "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
            • "use 0 to let the system choose port automatically.",
            • ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
              + // parse port range definition and create port iterator
              + Iterator<Integer> portsIterator;
              + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }

              catch (Exception e)

              { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }

              +
              + while (portsIterator.hasNext()) {
              + // first, we check if the port is available by opening a socket
              + // if the actor system fails to start on the port, we try further
              + ServerSocket availableSocket = NetUtils.createSocketFromPorts(

                • End diff –

            and we also has the requirement about this feature

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/5834#discussion_r184032097 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java — @@ -355,13 +359,53 @@ public static RpcService createRpcService( taskManagerHostname, taskManagerAddress.getHostAddress()); } final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0"); checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " + "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " + "use 0 to let the system choose port automatically.", ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort); + // parse port range definition and create port iterator + Iterator<Integer> portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + } + + while (portsIterator.hasNext()) { + // first, we check if the port is available by opening a socket + // if the actor system fails to start on the port, we try further + ServerSocket availableSocket = NetUtils.createSocketFromPorts( End diff – and we also has the requirement about this feature
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/5834#discussion_r184032815

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
            @@ -355,13 +359,53 @@ public static RpcService createRpcService(
            taskManagerHostname, taskManagerAddress.getHostAddress());
            }

            • final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
              + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
            • checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
            • "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
            • "use 0 to let the system choose port automatically.",
            • ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
              + // parse port range definition and create port iterator
              + Iterator<Integer> portsIterator;
              + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }

              catch (Exception e)

              { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }

              +
              + while (portsIterator.hasNext()) {
              + // first, we check if the port is available by opening a socket
              + // if the actor system fails to start on the port, we try further
              + ServerSocket availableSocket = NetUtils.createSocketFromPorts(

                • End diff –

            that's...not my point.

            Currently:
            1. You take a port from the range,
            2. create a socket to check its availability,
            2.a if success, goto 3
            2.b if failure, goto 1
            3. close the socket,
            4. pass port to `createRpcService`
            4.a if success, exit
            4.b if failure, goto 1

            So, why do step 2 and 3 at all?

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5834#discussion_r184032815 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java — @@ -355,13 +359,53 @@ public static RpcService createRpcService( taskManagerHostname, taskManagerAddress.getHostAddress()); } final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0"); checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " + "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " + "use 0 to let the system choose port automatically.", ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort); + // parse port range definition and create port iterator + Iterator<Integer> portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + } + + while (portsIterator.hasNext()) { + // first, we check if the port is available by opening a socket + // if the actor system fails to start on the port, we try further + ServerSocket availableSocket = NetUtils.createSocketFromPorts( End diff – that's...not my point. Currently: 1. You take a port from the range, 2. create a socket to check its availability, 2.a if success, goto 3 2.b if failure, goto 1 3. close the socket, 4. pass port to `createRpcService` 4.a if success, exit 4.b if failure, goto 1 So, why do step 2 and 3 at all?
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on a diff in the pull request:

            https://github.com/apache/flink/pull/5834#discussion_r184034826

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
            @@ -355,13 +359,53 @@ public static RpcService createRpcService(
            taskManagerHostname, taskManagerAddress.getHostAddress());
            }

            • final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
              + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
            • checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
            • "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
            • "use 0 to let the system choose port automatically.",
            • ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
              + // parse port range definition and create port iterator
              + Iterator<Integer> portsIterator;
              + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }

              catch (Exception e)

              { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }

              +
              + while (portsIterator.hasNext()) {
              + // first, we check if the port is available by opening a socket
              + // if the actor system fails to start on the port, we try further
              + ServerSocket availableSocket = NetUtils.createSocketFromPorts(

                • End diff –

            it pick a available port and check it is not be applied. these logic is refered to the exists logic. And the jm, tm also support the port range.

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/5834#discussion_r184034826 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java — @@ -355,13 +359,53 @@ public static RpcService createRpcService( taskManagerHostname, taskManagerAddress.getHostAddress()); } final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0"); checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " + "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " + "use 0 to let the system choose port automatically.", ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort); + // parse port range definition and create port iterator + Iterator<Integer> portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + } + + while (portsIterator.hasNext()) { + // first, we check if the port is available by opening a socket + // if the actor system fails to start on the port, we try further + ServerSocket availableSocket = NetUtils.createSocketFromPorts( End diff – it pick a available port and check it is not be applied. these logic is refered to the exists logic. And the jm, tm also support the port range.
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/5834#discussion_r184035989

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
            @@ -355,13 +359,53 @@ public static RpcService createRpcService(
            taskManagerHostname, taskManagerAddress.getHostAddress());
            }

            • final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
              + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
            • checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
            • "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
            • "use 0 to let the system choose port automatically.",
            • ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
              + // parse port range definition and create port iterator
              + Iterator<Integer> portsIterator;
              + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }

              catch (Exception e)

              { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }

              +
              + while (portsIterator.hasNext()) {
              + // first, we check if the port is available by opening a socket
              + // if the actor system fails to start on the port, we try further
              + ServerSocket availableSocket = NetUtils.createSocketFromPorts(

                • End diff –

            I know you're re-using existing code. That doesn't mean the code is perfect and shouldn't be changed.

            The code explicitly says that the actor system initialization can also fail with a `BindException`:
            ```
            try

            { return startActorSystem(configuration, listeningAddress, port, logger); }

            catch (Exception e) {
            // we can continue to try if this contains a netty channel exception
            Throwable cause = e.getCause();
            if (!(cause instanceof org.jboss.netty.channel.ChannelException ||
            cause instanceof java.net.BindException))

            { throw e; }

            // else fall through the loop and try the next port
            }
            ```

            This means that the check with the socket is redundant.

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5834#discussion_r184035989 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java — @@ -355,13 +359,53 @@ public static RpcService createRpcService( taskManagerHostname, taskManagerAddress.getHostAddress()); } final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0"); checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " + "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " + "use 0 to let the system choose port automatically.", ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort); + // parse port range definition and create port iterator + Iterator<Integer> portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + } + + while (portsIterator.hasNext()) { + // first, we check if the port is available by opening a socket + // if the actor system fails to start on the port, we try further + ServerSocket availableSocket = NetUtils.createSocketFromPorts( End diff – I know you're re-using existing code. That doesn't mean the code is perfect and shouldn't be changed. The code explicitly says that the actor system initialization can also fail with a `BindException`: ``` try { return startActorSystem(configuration, listeningAddress, port, logger); } catch (Exception e) { // we can continue to try if this contains a netty channel exception Throwable cause = e.getCause(); if (!(cause instanceof org.jboss.netty.channel.ChannelException || cause instanceof java.net.BindException)) { throw e; } // else fall through the loop and try the next port } ``` This means that the check with the socket is redundant.
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on a diff in the pull request:

            https://github.com/apache/flink/pull/5834#discussion_r184037491

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
            @@ -355,13 +359,53 @@ public static RpcService createRpcService(
            taskManagerHostname, taskManagerAddress.getHostAddress());
            }

            • final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
              + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
            • checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
            • "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
            • "use 0 to let the system choose port automatically.",
            • ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
              + // parse port range definition and create port iterator
              + Iterator<Integer> portsIterator;
              + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }

              catch (Exception e)

              { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }

              +
              + while (portsIterator.hasNext()) {
              + // first, we check if the port is available by opening a socket
              + // if the actor system fails to start on the port, we try further
              + ServerSocket availableSocket = NetUtils.createSocketFromPorts(

                • End diff –

            it seems you are right, I will remove the check code segment.

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/5834#discussion_r184037491 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java — @@ -355,13 +359,53 @@ public static RpcService createRpcService( taskManagerHostname, taskManagerAddress.getHostAddress()); } final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0"); checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " + "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " + "use 0 to let the system choose port automatically.", ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort); + // parse port range definition and create port iterator + Iterator<Integer> portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + } + + while (portsIterator.hasNext()) { + // first, we check if the port is available by opening a socket + // if the actor system fails to start on the port, we try further + ServerSocket availableSocket = NetUtils.createSocketFromPorts( End diff – it seems you are right, I will remove the check code segment.
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on the issue:

            https://github.com/apache/flink/pull/5834

            cc @zentol refactored based on your suggestion, please review it again

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5834 cc @zentol refactored based on your suggestion, please review it again
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on the issue:

            https://github.com/apache/flink/pull/5834

            @zentol this PR also reviewed by you, and takes a long time, can you review this? thanks~

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5834 @zentol this PR also reviewed by you, and takes a long time, can you review this? thanks~
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on the issue:

            https://github.com/apache/flink/pull/5834

            cc @zentol

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5834 cc @zentol
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on the issue:

            https://github.com/apache/flink/pull/5834

            @zentol this PR also reviewed by you, and takes a long time, can you review this? thanks~

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5834 @zentol this PR also reviewed by you, and takes a long time, can you review this? thanks~
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on the issue:

            https://github.com/apache/flink/pull/5834

            cc @zentol

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5834 cc @zentol
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on the issue:

            https://github.com/apache/flink/pull/5834

            hi @StephanEwen why the committers do not review those old PRs? I have serval PRs which take so long time.

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5834 hi @StephanEwen why the committers do not review those old PRs? I have serval PRs which take so long time.
            githubbot ASF GitHub Bot added a comment -

            Github user StephanEwen commented on the issue:

            https://github.com/apache/flink/pull/5834

            Committers usually have a lot of different responsibilities (releases, testing, helping users on mailing lists, working on roadmap features, etc.). All that takes a lot of time. Reviewing PRs is one important part, and we try to do this well, but with so many users now, it is not always perfect.

            One big problem is that very few committers actually take the time to look at external contributions.

            I might help to not always ping the same people (for example @zentol , @tillrohrmann , me, etc.) but some other committers as well. Here is a list of other committers, it is not quite complete, some newer ones are not yet listed: http://flink.apache.org/community.html#people

            Hope that helps you understand...

            githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5834 Committers usually have a lot of different responsibilities (releases, testing, helping users on mailing lists, working on roadmap features, etc.). All that takes a lot of time. Reviewing PRs is one important part, and we try to do this well, but with so many users now, it is not always perfect. One big problem is that very few committers actually take the time to look at external contributions. I might help to not always ping the same people (for example @zentol , @tillrohrmann , me, etc.) but some other committers as well. Here is a list of other committers, it is not quite complete, some newer ones are not yet listed: http://flink.apache.org/community.html#people Hope that helps you understand...
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on the issue:

            https://github.com/apache/flink/pull/5834

            Generally I would refrain from pinging specific committers. This can be counter-productive as it discourages other committers, or even contributors, from taking a look.

            Similarly, pinging a committer in every PR because you saw them reviewing other PRs at the time (as has happened to me last week) isn't that helpful either. It just pushes even more work/pressure on the few committers that actually do reviews.

            (Note that frequent pinging inherently puts more work on me as i actually monitor all PR updates!)

            At last, please keep in mind that not all PRs have the same priority, especially when working towards the next release.
            Documentation changes (#5773) can be merged after a release (since the docs aren't part of the release!),
            code-cleanups (#5777, #5799) and minor fixes (#5798) are usually non-critical and always pose the risk of introducing new bugs which is the last thing we want shortly before a release.

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/5834 Generally I would refrain from pinging specific committers. This can be counter-productive as it discourages other committers, or even contributors, from taking a look. Similarly, pinging a committer in every PR because you saw them reviewing other PRs at the time (as has happened to me last week) isn't that helpful either. It just pushes even more work/pressure on the few committers that actually do reviews. (Note that frequent pinging inherently puts more work on me as i actually monitor all PR updates!) At last, please keep in mind that not all PRs have the same priority, especially when working towards the next release. Documentation changes (#5773) can be merged after a release (since the docs aren't part of the release!), code-cleanups (#5777, #5799) and minor fixes (#5798) are usually non-critical and always pose the risk of introducing new bugs which is the last thing we want shortly before a release.
            githubbot ASF GitHub Bot added a comment -

            Github user yanghua commented on the issue:

            https://github.com/apache/flink/pull/5834

            @zentol thanks for your clarification. Actually, when I start join Flink community, most of the PRs were reviewed by you and till. Both of you gave me a lot of suggestion and technical opinion. What's more, you are the most frequently in the issue mailing list. So I always ping you and till, I don't know this behavior brought you burden. Maybe there is a bad phenomenon: more times been saw, more times been pinged.

            Actually, from my (a contributor like others) view point, I don't know the committer's review plan. And the PRs take more time would take more cost (especially, like this PR in reviewing status). The contributors and committers both would look back into it's context. I ping you sometimes because I think you are reviewing other PRs, at that point, maybe this behavior would not disturb your coding. And sometimes, I may not need you to review immediately. You can give a explication or time point about reviewing or ping another committer (who work together with you) to review. Generally, a effective feedback.

            Now, I know your standpoint and trouble. Sorry about my behavior. You and others are good.

            cc @StephanEwen

            githubbot ASF GitHub Bot added a comment - Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5834 @zentol thanks for your clarification. Actually, when I start join Flink community, most of the PRs were reviewed by you and till. Both of you gave me a lot of suggestion and technical opinion. What's more, you are the most frequently in the issue mailing list. So I always ping you and till, I don't know this behavior brought you burden. Maybe there is a bad phenomenon: more times been saw, more times been pinged. Actually, from my (a contributor like others) view point, I don't know the committer's review plan. And the PRs take more time would take more cost (especially, like this PR in reviewing status). The contributors and committers both would look back into it's context. I ping you sometimes because I think you are reviewing other PRs, at that point, maybe this behavior would not disturb your coding. And sometimes, I may not need you to review immediately. You can give a explication or time point about reviewing or ping another committer (who work together with you) to review. Generally, a effective feedback. Now, I know your standpoint and trouble. Sorry about my behavior. You and others are good. cc @StephanEwen
            githubbot ASF GitHub Bot added a comment -

            Github user StefanRRichter commented on the issue:

            https://github.com/apache/flink/pull/5834

            LGTM đź‘Ť Will rebase and merge once my Travis is green.

            githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5834 LGTM đź‘Ť Will rebase and merge once my Travis is green.
            githubbot ASF GitHub Bot added a comment -

            Github user asfgit closed the pull request at:

            https://github.com/apache/flink/pull/5834

            githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5834
            srichter Stefan Richter added a comment -

            Merged in:
            master: 7c90447849
            release-1.5: b57bf73e57

            srichter Stefan Richter added a comment - Merged in: master: 7c90447849 release-1.5: b57bf73e57

            People

              yanghua vinoyang
              yanghua vinoyang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: