Details
-
New Feature
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.4.0, 1.5.0
-
None
Description
TaskManagerRunner current just support one specific port :
final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
It should support port range as the document described : https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#taskmanager-rpc-port
Attachments
Issue Links
- links to
Activity
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5834
cc @tillrohrmann for mode : `new` (flip-6) the `TaskManagerRunner` could not specify a port range, I suggest we could merge this PR as a hotfix issue and add it into 1.5 release.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5834
cc @tillrohrmann : If you have time, please review this PR, thanks.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5834
cc @zentol @tzulitai
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5834#discussion_r184030230
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
@@ -355,13 +359,53 @@ public static RpcService createRpcService(
taskManagerHostname, taskManagerAddress.getHostAddress());
}
- final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
- checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
- "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
- "use 0 to let the system choose port automatically.",
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+ // parse port range definition and create port iterator
+ Iterator<Integer> portsIterator;
+ try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }catch (Exception e)
{ + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }+
+ while (portsIterator.hasNext()) {
+ // first, we check if the port is available by opening a socket
+ // if the actor system fails to start on the port, we try further
+ ServerSocket availableSocket = NetUtils.createSocketFromPorts(
+ portsIterator,
+ new NetUtils.SocketFactory()Unknown macro: { + @Override + public ServerSocket createSocket(int port) throws IOException { + return new ServerSocket(port); + } + });
{ + throw new BindException("Unable to allocate further port in port range: " + portRangeDefinition); + }
+
+ int port;
+ if (availableSocket == null)else {
{ + availableSocket.close(); + }
+ port = availableSocket.getLocalPort();
+ trycatch (IOException ignored) {}
{ + return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, port, configuration); + }
+ }
+
+ try+ catch (Exception e)
Unknown macro: { + // we can continue to try if this contains a netty channel exception + Throwable cause = e.getCause(); + if (!(cause instanceof org.jboss.netty.channel.ChannelException || + cause instanceof java.net.BindException)) { + throw e; + } // else fall through the loop and try the next port + }+ }
- return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration);
+ // if we come here, we have exhausted the port range
+ throw new BindException("Could not start actor system on any port in port range "-
- End diff –
-
should not mention actor system but taskmanager instead
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5834#discussion_r184029857
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
@@ -355,13 +359,53 @@ public static RpcService createRpcService(
taskManagerHostname, taskManagerAddress.getHostAddress());
}
- final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");-
- End diff –
-
don't need the override default as `0` is already the default.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5834#discussion_r184030144
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
@@ -355,13 +359,53 @@ public static RpcService createRpcService(
taskManagerHostname, taskManagerAddress.getHostAddress());
}
- final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
- checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
- "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
- "use 0 to let the system choose port automatically.",
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+ // parse port range definition and create port iterator
+ Iterator<Integer> portsIterator;
+ try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }catch (Exception e)
{ + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }+
+ while (portsIterator.hasNext()) {
+ // first, we check if the port is available by opening a socket
+ // if the actor system fails to start on the port, we try further
+ ServerSocket availableSocket = NetUtils.createSocketFromPorts(-
- End diff –
-
I guess you took this code from `BootstrapTools#startActorSystem()`, but I'm wondering why we don't pass the port directly to `createRpcService`.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/5834#discussion_r184031845
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
@@ -355,13 +359,53 @@ public static RpcService createRpcService(
taskManagerHostname, taskManagerAddress.getHostAddress());
}
- final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
- checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
- "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
- "use 0 to let the system choose port automatically.",
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+ // parse port range definition and create port iterator
+ Iterator<Integer> portsIterator;
+ try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }catch (Exception e)
{ + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }+
+ while (portsIterator.hasNext()) {
+ // first, we check if the port is available by opening a socket
+ // if the actor system fails to start on the port, we try further
+ ServerSocket availableSocket = NetUtils.createSocketFromPorts(-
- End diff –
-
@zentol the document always states it support the port range : https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#taskmanager-rpc-port
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/5834#discussion_r184032097
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
@@ -355,13 +359,53 @@ public static RpcService createRpcService(
taskManagerHostname, taskManagerAddress.getHostAddress());
}
- final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
- checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
- "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
- "use 0 to let the system choose port automatically.",
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+ // parse port range definition and create port iterator
+ Iterator<Integer> portsIterator;
+ try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }catch (Exception e)
{ + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }+
+ while (portsIterator.hasNext()) {
+ // first, we check if the port is available by opening a socket
+ // if the actor system fails to start on the port, we try further
+ ServerSocket availableSocket = NetUtils.createSocketFromPorts(-
- End diff –
-
and we also has the requirement about this feature
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5834#discussion_r184032815
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
@@ -355,13 +359,53 @@ public static RpcService createRpcService(
taskManagerHostname, taskManagerAddress.getHostAddress());
}
- final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
- checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
- "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
- "use 0 to let the system choose port automatically.",
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+ // parse port range definition and create port iterator
+ Iterator<Integer> portsIterator;
+ try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }catch (Exception e)
{ + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }+
+ while (portsIterator.hasNext()) {
+ // first, we check if the port is available by opening a socket
+ // if the actor system fails to start on the port, we try further
+ ServerSocket availableSocket = NetUtils.createSocketFromPorts(-
- End diff –
-
that's...not my point.
Currently:
1. You take a port from the range,
2. create a socket to check its availability,
2.a if success, goto 3
2.b if failure, goto 1
3. close the socket,
4. pass port to `createRpcService`
4.a if success, exit
4.b if failure, goto 1
So, why do step 2 and 3 at all?
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/5834#discussion_r184034826
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
@@ -355,13 +359,53 @@ public static RpcService createRpcService(
taskManagerHostname, taskManagerAddress.getHostAddress());
}
- final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
- checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
- "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
- "use 0 to let the system choose port automatically.",
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+ // parse port range definition and create port iterator
+ Iterator<Integer> portsIterator;
+ try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }catch (Exception e)
{ + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }+
+ while (portsIterator.hasNext()) {
+ // first, we check if the port is available by opening a socket
+ // if the actor system fails to start on the port, we try further
+ ServerSocket availableSocket = NetUtils.createSocketFromPorts(-
- End diff –
-
it pick a available port and check it is not be applied. these logic is refered to the exists logic. And the jm, tm also support the port range.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5834#discussion_r184035989
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
@@ -355,13 +359,53 @@ public static RpcService createRpcService(
taskManagerHostname, taskManagerAddress.getHostAddress());
}
- final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
- checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
- "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
- "use 0 to let the system choose port automatically.",
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+ // parse port range definition and create port iterator
+ Iterator<Integer> portsIterator;
+ try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }catch (Exception e)
{ + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }+
+ while (portsIterator.hasNext()) {
+ // first, we check if the port is available by opening a socket
+ // if the actor system fails to start on the port, we try further
+ ServerSocket availableSocket = NetUtils.createSocketFromPorts(-
- End diff –
-
I know you're re-using existing code. That doesn't mean the code is perfect and shouldn't be changed.
The code explicitly says that the actor system initialization can also fail with a `BindException`:
```
try
catch (Exception e) {
// we can continue to try if this contains a netty channel exception
Throwable cause = e.getCause();
if (!(cause instanceof org.jboss.netty.channel.ChannelException ||
cause instanceof java.net.BindException))
// else fall through the loop and try the next port
}
```
This means that the check with the socket is redundant.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/5834#discussion_r184037491
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java —
@@ -355,13 +359,53 @@ public static RpcService createRpcService(
taskManagerHostname, taskManagerAddress.getHostAddress());
}
- final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+ final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0");
- checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
- "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
- "use 0 to let the system choose port automatically.",
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+ // parse port range definition and create port iterator
+ Iterator<Integer> portsIterator;
+ try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + }catch (Exception e)
{ + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + }+
+ while (portsIterator.hasNext()) {
+ // first, we check if the port is available by opening a socket
+ // if the actor system fails to start on the port, we try further
+ ServerSocket availableSocket = NetUtils.createSocketFromPorts(-
- End diff –
-
it seems you are right, I will remove the check code segment.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5834
cc @zentol refactored based on your suggestion, please review it again
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5834
@zentol this PR also reviewed by you, and takes a long time, can you review this? thanks~
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5834
@zentol this PR also reviewed by you, and takes a long time, can you review this? thanks~
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5834
hi @StephanEwen why the committers do not review those old PRs? I have serval PRs which take so long time.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5834
Committers usually have a lot of different responsibilities (releases, testing, helping users on mailing lists, working on roadmap features, etc.). All that takes a lot of time. Reviewing PRs is one important part, and we try to do this well, but with so many users now, it is not always perfect.
One big problem is that very few committers actually take the time to look at external contributions.
I might help to not always ping the same people (for example @zentol , @tillrohrmann , me, etc.) but some other committers as well. Here is a list of other committers, it is not quite complete, some newer ones are not yet listed: http://flink.apache.org/community.html#people
Hope that helps you understand...
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/5834
Generally I would refrain from pinging specific committers. This can be counter-productive as it discourages other committers, or even contributors, from taking a look.
Similarly, pinging a committer in every PR because you saw them reviewing other PRs at the time (as has happened to me last week) isn't that helpful either. It just pushes even more work/pressure on the few committers that actually do reviews.
(Note that frequent pinging inherently puts more work on me as i actually monitor all PR updates!)
At last, please keep in mind that not all PRs have the same priority, especially when working towards the next release.
Documentation changes (#5773) can be merged after a release (since the docs aren't part of the release!),
code-cleanups (#5777, #5799) and minor fixes (#5798) are usually non-critical and always pose the risk of introducing new bugs which is the last thing we want shortly before a release.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/5834
@zentol thanks for your clarification. Actually, when I start join Flink community, most of the PRs were reviewed by you and till. Both of you gave me a lot of suggestion and technical opinion. What's more, you are the most frequently in the issue mailing list. So I always ping you and till, I don't know this behavior brought you burden. Maybe there is a bad phenomenon: more times been saw, more times been pinged.
Actually, from my (a contributor like others) view point, I don't know the committer's review plan. And the PRs take more time would take more cost (especially, like this PR in reviewing status). The contributors and committers both would look back into it's context. I ping you sometimes because I think you are reviewing other PRs, at that point, maybe this behavior would not disturb your coding. And sometimes, I may not need you to review immediately. You can give a explication or time point about reviewing or ping another committer (who work together with you) to review. Generally, a effective feedback.
Now, I know your standpoint and trouble. Sorry about my behavior. You and others are good.
cc @StephanEwen
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/5834
LGTM đź‘Ť Will rebase and merge once my Travis is green.
GitHub user yanghua opened a pull request:
https://github.com/apache/flink/pull/5834
FLINK-9153TaskManagerRunner should support rpc port rangeThis pull request makes `TaskManagerRunner` (FLIP-6) supports rpc port range
This change is a trivial rework / code cleanup without any test coverage.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/yanghua/flink
FLINK-9153Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5834.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5834
commit 212884207844d9485eb63e5e1ba32118e9fa1567
Author: yanghua <yanghua1127@...>
Date: 2018-04-10T11:52:16Z
FLINK-9153TaskManagerRunner should support rpc port range