Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.4.0, 1.5.0, 1.6.0
-
None
Description
Seems like CLI command: flink list -r returns all jobs except jobs in CREATE state. which conflicts with the CLI description: Running/Restarting Jobs.
Attachments
Issue Links
- is caused by
-
FLINK-7791 Integrate LIST command into REST client
- Closed
- is duplicated by
-
FLINK-9711 Flink CLI --running option does not show RUNNING only jobs
- Closed
- links to
Activity
hi walterddr I have not started this issue, I have released it, you can start this task.
GitHub user walterddr opened a pull request:
https://github.com/apache/flink/pull/6049
FLINK-9398[Client] Fix CLI list running job returns all except scheduled jobs
-
- What is the purpose of the change
This PR fixes CLI command `bin/flink list -r` returning all except scheduled jobs.
-
- Brief change log
Change the behavior of `bin/flink list` to
- Adding in a `-a` option to list all jobs. Including `CREATED`, `RUNNING` & `RESTARTING`
- Fixing `-r` option to list only `RUNNING` and `RESTARTING`.
- Internally map `-a` with `other` job type so it won't affect the behavior of explicit listing options when adding other options to `list` command.
-
- Verifying this change
This change added tests and can be verified as follows:
- Added unit-test to verify `-a` options.
- Added `ListOptions` unit-test suite which never existed before.
-
- Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: no
- The S3 file system connector: no
-
- Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? documented in `cli.md`
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/walterddr/flink FLINK-9398
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6049.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6049
commit d742f3ec5ffd4b411162db234e108a496a4f5aaa
Author: Rong Rong <walter_ddr@...>
Date: 2018-05-20T14:29:56Z
fix CLI list options showing all jobs instead of just running / restarting jobs
commit e6d827700427a3f31a699ff02cb5b9906715c629
Author: Rong Rong <walter_ddr@...>
Date: 2018-05-20T23:45:18Z
adding in documentation for list all jobs option
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6049#discussion_r189803747
— Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
@@ -394,29 +394,33 @@ protected void list(String[] args) throws Exception {
final boolean running;
final boolean scheduled;
+ final boolean other;
// print running and scheduled jobs if not option supplied
- if (!listOptions.getRunning() && !listOptions.getScheduled()) {
+ if (!listOptions.getRunning() && !listOptions.getScheduled() && !listOptions.getOther()) { running = true; scheduled = true; + other = false; }else
{ running = listOptions.getRunning(); scheduled = listOptions.getScheduled(); + other = listOptions.getOther(); }
final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
runClusterAction(
activeCommandLine,
commandLine,
- clusterClient -> listJobs(clusterClient, running, scheduled));
+ clusterClient -> listJobs(clusterClient, running, scheduled, other));
}
private <T> void listJobs(
ClusterClient<T> clusterClient,
boolean running,
- boolean scheduled) throws FlinkException {
+ boolean scheduled,
+ boolean other) throws FlinkException {-
- End diff –
-
rename to `showRemaining`
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6049#discussion_r189804643
— Diff: flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java —
@@ -30,11 +31,13 @@
private final boolean running;
private final boolean scheduled;
+ private final boolean other;
public ListOptions(CommandLine line) {
super(line);
- this.running = line.hasOption(RUNNING_OPTION.getOpt());
- this.scheduled = line.hasOption(SCHEDULED_OPTION.getOpt());
+ this.other = line.hasOption(ALL_OPTION.getOpt());-
- End diff –
-
rename to `showAll`
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6049#discussion_r189803645
— Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
@@ -436,11 +440,15 @@ protected void list(String[] args) throws Exception {
final List<JobStatusMessage> runningJobs = new ArrayList<>();
final List<JobStatusMessage> scheduledJobs = new ArrayList<>();
+ final List<JobStatusMessage> otherJobs = new ArrayList<>();
— End diff –
rename to `remainingJobs`
Github user walterddr commented on the issue:
https://github.com/apache/flink/pull/6049
@zentol yeah, I totally agree with the naming issue and just updated with a more clear `ListOptions` definitions. Please take another look.
Thanks
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6049#discussion_r192696972
— Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
@@ -436,15 +440,19 @@ protected void list(String[] args) throws Exception {
final List<JobStatusMessage> runningJobs = new ArrayList<>();
final List<JobStatusMessage> scheduledJobs = new ArrayList<>();
+ final List<JobStatusMessage> remainingJobs = new ArrayList<>();
jobDetails.forEach(details -> {
if (details.getJobState() == JobStatus.CREATED)
else
{ + } else if (details.getJobState() == JobStatus.RUNNING
+ || details.getJobState() == JobStatus.RESTARTING)
else
{ + remainingJobs.add(details); }});
- if (running) {
+ if (showRunning) {-
- End diff –
-
after the change in `ListOptions`: `showRunning || showAll`
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6049#discussion_r192696977
— Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
@@ -459,7 +467,7 @@ protected void list(String[] args) throws Exception
}
- if (scheduled) {
+ if (showScheduled) {-
- End diff –
-
after the change in `ListOptions`: `showScheduled || showAll`
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6049#discussion_r192696608
— Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
@@ -436,15 +440,19 @@ protected void list(String[] args) throws Exception {
final List<JobStatusMessage> runningJobs = new ArrayList<>();
final List<JobStatusMessage> scheduledJobs = new ArrayList<>();
+ final List<JobStatusMessage> remainingJobs = new ArrayList<>();
jobDetails.forEach(details -> {
if (details.getJobState() == JobStatus.CREATED)
else
{ + } else if (details.getJobState() == JobStatus.RUNNING
— End diff –
use `JobStatus#isGloballyTerminalState` instead. This covers all cases where a job can still be considered running.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6049#discussion_r192697261
— Diff: flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java —
@@ -28,20 +29,26 @@
*/
public class ListOptions extends CommandLineOptions {
- private final boolean running;
- private final boolean scheduled;
+ private final boolean showRunning;
+ private final boolean showScheduled;
+ private final boolean showAll;
public ListOptions(CommandLine line) {
super(line);
- this.running = line.hasOption(RUNNING_OPTION.getOpt());
- this.scheduled = line.hasOption(SCHEDULED_OPTION.getOpt());
+ this.showAll = line.hasOption(ALL_OPTION.getOpt());
+ this.showRunning = line.hasOption(RUNNING_OPTION.getOpt()) || this.showAll;
+ this.showScheduled = line.hasOption(SCHEDULED_OPTION.getOpt()) || this.showAll;-
- End diff –
-
I would prefer this class being a dumb getter and having the OR logic in CLIFrontend.
This class is just a representation of what was configured, and I would leave the interpretation up to the using classes.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6049#discussion_r192699721
— Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
@@ -474,6 +482,19 @@ protected void list(String[] args) throws Exception
}
+ if (showRemaining) {
+ if (remainingJobs.size() != 0) {
+ remainingJobs.sort(startTimeComparator);
— End diff –
given the existing pattern of separating by `JobStatus` we may want to do the same for the remaining jobs.
```
runningJobs.stream().collect(Collectors.groupingBy(JobStatusMessage::getJobState))
.entrySet().stream().sorted(<sort by JobStatus>)
.map(Map.Entry::getValue).flatMap(List::stream).sorted(<sort by timestamp>).forEachOrdered(<print>);
```
Github user walterddr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6049#discussion_r192778560
— Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
@@ -474,6 +482,19 @@ protected void list(String[] args) throws Exception
}
+ if (showRemaining) {
+ if (remainingJobs.size() != 0) {
+ remainingJobs.sort(startTimeComparator);
— End diff –
Thanks for the reply @zentol , I wasn't sure I fully understand this comment. Are you suggesting that we should always group jobs with same status together during print for all 3 cases?
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6049#discussion_r192837393
— Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
@@ -474,6 +482,19 @@ protected void list(String[] args) throws Exception
}
+ if (showRemaining) {
+ if (remainingJobs.size() != 0) {
+ remainingJobs.sort(startTimeComparator);
— End diff –
only for the last case, specifically this would separate `FAILED`, `COMPLETED` and `CANCELED` jobs.
That said the other sections aren't that fine-grained either. That reminds me, we could name the last section `terminated` instead of `remaining`.
Github user walterddr commented on the issue:
https://github.com/apache/flink/pull/6049
Thanks for the comment @zentol . I actually thought about it and think we should group same jobs status listings together, there are also some other non-terminating states as well. Regarding the user-facing change to whether use `-t` (terminated) or `-a` (all). I lean towards using `-a`, but internally we named them terminated. Later on we can easily add supports like `finished` `cancelled` etc if necessary. What do you think?
master: a2a85bcb3cdb5202055984604f45809260787fd5
1.5: 470691145bf67b60e07e5ed5ec302c8525689fee
Hi yanghua, have you started working on this task?
I was hoping to get this in and work together with
FLINK-8985. If you haven't started working on it, can I assign it to myself?Thanks,
Rong