Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9398

Flink CLI list running job returns all jobs except in CREATE state

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.4.0, 1.5.0, 1.6.0
    • 1.4.3, 1.5.1, 1.6.0
    • Command Line Client
    • None

    Description

      See: https://github.com/apache/flink/blob/4922ced71a307a26b9f5070b41f72fd5d93b0ac8/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L437-L445

      Seems like CLI command: flink list -r returns all jobs except jobs in CREATE state. which conflicts with the CLI description: Running/Restarting Jobs.

      Attachments

        Issue Links

          Activity

            rongr Rong Rong added a comment - - edited

            Hi yanghua, have you started working on this task?

            I was hoping to get this in and work together with FLINK-8985. If you haven't started working on it, can I assign it to myself?

            Thanks,
            Rong

            rongr Rong Rong added a comment - - edited Hi yanghua , have you started working on this task? I was hoping to get this in and work together with FLINK-8985 . If you haven't started working on it, can I assign it to myself? Thanks, Rong
            yanghua vinoyang added a comment -

            hi walterddr I have not started this issue, I have released it, you can start this task.

            yanghua vinoyang added a comment - hi walterddr I have not started this issue, I have released it, you can start this task.
            githubbot ASF GitHub Bot added a comment -

            GitHub user walterddr opened a pull request:

            https://github.com/apache/flink/pull/6049

            FLINK-9398[Client] Fix CLI list running job returns all except scheduled jobs

              1. What is the purpose of the change

            This PR fixes CLI command `bin/flink list -r` returning all except scheduled jobs.

              1. Brief change log

            Change the behavior of `bin/flink list` to

            • Adding in a `-a` option to list all jobs. Including `CREATED`, `RUNNING` & `RESTARTING`
            • Fixing `-r` option to list only `RUNNING` and `RESTARTING`.
            • Internally map `-a` with `other` job type so it won't affect the behavior of explicit listing options when adding other options to `list` command.
              1. Verifying this change

            This change added tests and can be verified as follows:

            • Added unit-test to verify `-a` options.
            • Added `ListOptions` unit-test suite which never existed before.
              1. Does this pull request potentially affect one of the following parts:
            • Dependencies (does it add or upgrade a dependency): no
            • The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
            • The serializers: no
            • The runtime per-record code paths (performance sensitive): no
            • Anything that affects deployment or recovery: no
            • The S3 file system connector: no
              1. Documentation
            • Does this pull request introduce a new feature? yes
            • If yes, how is the feature documented? documented in `cli.md`

            You can merge this pull request into a Git repository by running:

            $ git pull https://github.com/walterddr/flink FLINK-9398

            Alternatively you can review and apply these changes as the patch at:

            https://github.com/apache/flink/pull/6049.patch

            To close this pull request, make a commit to your master/trunk branch
            with (at least) the following in the commit message:

            This closes #6049


            commit d742f3ec5ffd4b411162db234e108a496a4f5aaa
            Author: Rong Rong <walter_ddr@...>
            Date: 2018-05-20T14:29:56Z

            fix CLI list options showing all jobs instead of just running / restarting jobs

            commit e6d827700427a3f31a699ff02cb5b9906715c629
            Author: Rong Rong <walter_ddr@...>
            Date: 2018-05-20T23:45:18Z

            adding in documentation for list all jobs option


            githubbot ASF GitHub Bot added a comment - GitHub user walterddr opened a pull request: https://github.com/apache/flink/pull/6049 FLINK-9398 [Client] Fix CLI list running job returns all except scheduled jobs What is the purpose of the change This PR fixes CLI command `bin/flink list -r` returning all except scheduled jobs. Brief change log Change the behavior of `bin/flink list` to Adding in a `-a` option to list all jobs. Including `CREATED`, `RUNNING` & `RESTARTING` Fixing `-r` option to list only `RUNNING` and `RESTARTING`. Internally map `-a` with `other` job type so it won't affect the behavior of explicit listing options when adding other options to `list` command. Verifying this change This change added tests and can be verified as follows: Added unit-test to verify `-a` options. Added `ListOptions` unit-test suite which never existed before. Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): no The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no The serializers: no The runtime per-record code paths (performance sensitive): no Anything that affects deployment or recovery: no The S3 file system connector: no Documentation Does this pull request introduce a new feature? yes If yes, how is the feature documented? documented in `cli.md` You can merge this pull request into a Git repository by running: $ git pull https://github.com/walterddr/flink FLINK-9398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6049.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6049 commit d742f3ec5ffd4b411162db234e108a496a4f5aaa Author: Rong Rong <walter_ddr@...> Date: 2018-05-20T14:29:56Z fix CLI list options showing all jobs instead of just running / restarting jobs commit e6d827700427a3f31a699ff02cb5b9906715c629 Author: Rong Rong <walter_ddr@...> Date: 2018-05-20T23:45:18Z adding in documentation for list all jobs option
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6049#discussion_r189803747

            — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
            @@ -394,29 +394,33 @@ protected void list(String[] args) throws Exception {

            final boolean running;
            final boolean scheduled;
            + final boolean other;

            // print running and scheduled jobs if not option supplied

            • if (!listOptions.getRunning() && !listOptions.getScheduled()) {
              + if (!listOptions.getRunning() && !listOptions.getScheduled() && !listOptions.getOther()) { running = true; scheduled = true; + other = false; }

              else

              { running = listOptions.getRunning(); scheduled = listOptions.getScheduled(); + other = listOptions.getOther(); }

            final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);

            runClusterAction(
            activeCommandLine,
            commandLine,

            • clusterClient -> listJobs(clusterClient, running, scheduled));
              + clusterClient -> listJobs(clusterClient, running, scheduled, other));

            }

            private <T> void listJobs(
            ClusterClient<T> clusterClient,
            boolean running,

            • boolean scheduled) throws FlinkException {
              + boolean scheduled,
              + boolean other) throws FlinkException {
                • End diff –

            rename to `showRemaining`

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6049#discussion_r189803747 — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java — @@ -394,29 +394,33 @@ protected void list(String[] args) throws Exception { final boolean running; final boolean scheduled; + final boolean other; // print running and scheduled jobs if not option supplied if (!listOptions.getRunning() && !listOptions.getScheduled()) { + if (!listOptions.getRunning() && !listOptions.getScheduled() && !listOptions.getOther()) { running = true; scheduled = true; + other = false; } else { running = listOptions.getRunning(); scheduled = listOptions.getScheduled(); + other = listOptions.getOther(); } final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); runClusterAction( activeCommandLine, commandLine, clusterClient -> listJobs(clusterClient, running, scheduled)); + clusterClient -> listJobs(clusterClient, running, scheduled, other)); } private <T> void listJobs( ClusterClient<T> clusterClient, boolean running, boolean scheduled) throws FlinkException { + boolean scheduled, + boolean other) throws FlinkException { End diff – rename to `showRemaining`
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6049#discussion_r189804643

            — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java —
            @@ -30,11 +31,13 @@

            private final boolean running;
            private final boolean scheduled;
            + private final boolean other;

            public ListOptions(CommandLine line) {
            super(line);

            • this.running = line.hasOption(RUNNING_OPTION.getOpt());
            • this.scheduled = line.hasOption(SCHEDULED_OPTION.getOpt());
              + this.other = line.hasOption(ALL_OPTION.getOpt());
                • End diff –

            rename to `showAll`

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6049#discussion_r189804643 — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java — @@ -30,11 +31,13 @@ private final boolean running; private final boolean scheduled; + private final boolean other; public ListOptions(CommandLine line) { super(line); this.running = line.hasOption(RUNNING_OPTION.getOpt()); this.scheduled = line.hasOption(SCHEDULED_OPTION.getOpt()); + this.other = line.hasOption(ALL_OPTION.getOpt()); End diff – rename to `showAll`
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6049#discussion_r189803645

            — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
            @@ -436,11 +440,15 @@ protected void list(String[] args) throws Exception {

            final List<JobStatusMessage> runningJobs = new ArrayList<>();
            final List<JobStatusMessage> scheduledJobs = new ArrayList<>();
            + final List<JobStatusMessage> otherJobs = new ArrayList<>();
            — End diff –

            rename to `remainingJobs`

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6049#discussion_r189803645 — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java — @@ -436,11 +440,15 @@ protected void list(String[] args) throws Exception { final List<JobStatusMessage> runningJobs = new ArrayList<>(); final List<JobStatusMessage> scheduledJobs = new ArrayList<>(); + final List<JobStatusMessage> otherJobs = new ArrayList<>(); — End diff – rename to `remainingJobs`
            githubbot ASF GitHub Bot added a comment -

            Github user walterddr commented on the issue:

            https://github.com/apache/flink/pull/6049

            @zentol yeah, I totally agree with the naming issue and just updated with a more clear `ListOptions` definitions. Please take another look.

            Thanks

            githubbot ASF GitHub Bot added a comment - Github user walterddr commented on the issue: https://github.com/apache/flink/pull/6049 @zentol yeah, I totally agree with the naming issue and just updated with a more clear `ListOptions` definitions. Please take another look. Thanks
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6049#discussion_r192696972

            — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
            @@ -436,15 +440,19 @@ protected void list(String[] args) throws Exception {

            final List<JobStatusMessage> runningJobs = new ArrayList<>();
            final List<JobStatusMessage> scheduledJobs = new ArrayList<>();
            + final List<JobStatusMessage> remainingJobs = new ArrayList<>();
            jobDetails.forEach(details -> {
            if (details.getJobState() == JobStatus.CREATED)

            { scheduledJobs.add(details); - }

            else

            { + }

            else if (details.getJobState() == JobStatus.RUNNING
            + || details.getJobState() == JobStatus.RESTARTING)

            { runningJobs.add(details); + }

            else

            { + remainingJobs.add(details); }

            });

            • if (running) {
              + if (showRunning) {
                • End diff –

            after the change in `ListOptions`: `showRunning || showAll`

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6049#discussion_r192696972 — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java — @@ -436,15 +440,19 @@ protected void list(String[] args) throws Exception { final List<JobStatusMessage> runningJobs = new ArrayList<>(); final List<JobStatusMessage> scheduledJobs = new ArrayList<>(); + final List<JobStatusMessage> remainingJobs = new ArrayList<>(); jobDetails.forEach(details -> { if (details.getJobState() == JobStatus.CREATED) { scheduledJobs.add(details); - } else { + } else if (details.getJobState() == JobStatus.RUNNING + || details.getJobState() == JobStatus.RESTARTING) { runningJobs.add(details); + } else { + remainingJobs.add(details); } }); if (running) { + if (showRunning) { End diff – after the change in `ListOptions`: `showRunning || showAll`
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6049#discussion_r192696977

            — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
            @@ -459,7 +467,7 @@ protected void list(String[] args) throws Exception

            { System.out.println("--------------------------------------------------------------"); }

            }

            • if (scheduled) {
              + if (showScheduled) {
                • End diff –

            after the change in `ListOptions`: `showScheduled || showAll`

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6049#discussion_r192696977 — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java — @@ -459,7 +467,7 @@ protected void list(String[] args) throws Exception { System.out.println("--------------------------------------------------------------"); } } if (scheduled) { + if (showScheduled) { End diff – after the change in `ListOptions`: `showScheduled || showAll`
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6049#discussion_r192696608

            — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
            @@ -436,15 +440,19 @@ protected void list(String[] args) throws Exception {

            final List<JobStatusMessage> runningJobs = new ArrayList<>();
            final List<JobStatusMessage> scheduledJobs = new ArrayList<>();
            + final List<JobStatusMessage> remainingJobs = new ArrayList<>();
            jobDetails.forEach(details -> {
            if (details.getJobState() == JobStatus.CREATED)

            { scheduledJobs.add(details); - }

            else

            { + }

            else if (details.getJobState() == JobStatus.RUNNING
            — End diff –

            use `JobStatus#isGloballyTerminalState` instead. This covers all cases where a job can still be considered running.

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6049#discussion_r192696608 — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java — @@ -436,15 +440,19 @@ protected void list(String[] args) throws Exception { final List<JobStatusMessage> runningJobs = new ArrayList<>(); final List<JobStatusMessage> scheduledJobs = new ArrayList<>(); + final List<JobStatusMessage> remainingJobs = new ArrayList<>(); jobDetails.forEach(details -> { if (details.getJobState() == JobStatus.CREATED) { scheduledJobs.add(details); - } else { + } else if (details.getJobState() == JobStatus.RUNNING — End diff – use `JobStatus#isGloballyTerminalState` instead. This covers all cases where a job can still be considered running.
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6049#discussion_r192697261

            — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java —
            @@ -28,20 +29,26 @@
            */
            public class ListOptions extends CommandLineOptions {

            • private final boolean running;
            • private final boolean scheduled;
              + private final boolean showRunning;
              + private final boolean showScheduled;
              + private final boolean showAll;

            public ListOptions(CommandLine line) {
            super(line);

            • this.running = line.hasOption(RUNNING_OPTION.getOpt());
            • this.scheduled = line.hasOption(SCHEDULED_OPTION.getOpt());
              + this.showAll = line.hasOption(ALL_OPTION.getOpt());
              + this.showRunning = line.hasOption(RUNNING_OPTION.getOpt()) || this.showAll;
              + this.showScheduled = line.hasOption(SCHEDULED_OPTION.getOpt()) || this.showAll;
                • End diff –

            I would prefer this class being a dumb getter and having the OR logic in CLIFrontend.
            This class is just a representation of what was configured, and I would leave the interpretation up to the using classes.

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6049#discussion_r192697261 — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java — @@ -28,20 +29,26 @@ */ public class ListOptions extends CommandLineOptions { private final boolean running; private final boolean scheduled; + private final boolean showRunning; + private final boolean showScheduled; + private final boolean showAll; public ListOptions(CommandLine line) { super(line); this.running = line.hasOption(RUNNING_OPTION.getOpt()); this.scheduled = line.hasOption(SCHEDULED_OPTION.getOpt()); + this.showAll = line.hasOption(ALL_OPTION.getOpt()); + this.showRunning = line.hasOption(RUNNING_OPTION.getOpt()) || this.showAll; + this.showScheduled = line.hasOption(SCHEDULED_OPTION.getOpt()) || this.showAll; End diff – I would prefer this class being a dumb getter and having the OR logic in CLIFrontend. This class is just a representation of what was configured, and I would leave the interpretation up to the using classes.
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6049#discussion_r192699721

            — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
            @@ -474,6 +482,19 @@ protected void list(String[] args) throws Exception

            { System.out.println("--------------------------------------------------------------"); }

            }
            + if (showRemaining) {
            + if (remainingJobs.size() != 0) {
            + remainingJobs.sort(startTimeComparator);
            — End diff –

            given the existing pattern of separating by `JobStatus` we may want to do the same for the remaining jobs.

            ```
            runningJobs.stream().collect(Collectors.groupingBy(JobStatusMessage::getJobState))
            .entrySet().stream().sorted(<sort by JobStatus>)
            .map(Map.Entry::getValue).flatMap(List::stream).sorted(<sort by timestamp>).forEachOrdered(<print>);
            ```

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6049#discussion_r192699721 — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java — @@ -474,6 +482,19 @@ protected void list(String[] args) throws Exception { System.out.println("--------------------------------------------------------------"); } } + if (showRemaining) { + if (remainingJobs.size() != 0) { + remainingJobs.sort(startTimeComparator); — End diff – given the existing pattern of separating by `JobStatus` we may want to do the same for the remaining jobs. ``` runningJobs.stream().collect(Collectors.groupingBy(JobStatusMessage::getJobState)) .entrySet().stream().sorted(<sort by JobStatus>) .map(Map.Entry::getValue).flatMap(List::stream).sorted(<sort by timestamp>).forEachOrdered(<print>); ```
            githubbot ASF GitHub Bot added a comment -

            Github user walterddr commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6049#discussion_r192778560

            — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
            @@ -474,6 +482,19 @@ protected void list(String[] args) throws Exception

            { System.out.println("--------------------------------------------------------------"); }

            }
            + if (showRemaining) {
            + if (remainingJobs.size() != 0) {
            + remainingJobs.sort(startTimeComparator);
            — End diff –

            Thanks for the reply @zentol , I wasn't sure I fully understand this comment. Are you suggesting that we should always group jobs with same status together during print for all 3 cases?

            githubbot ASF GitHub Bot added a comment - Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6049#discussion_r192778560 — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java — @@ -474,6 +482,19 @@ protected void list(String[] args) throws Exception { System.out.println("--------------------------------------------------------------"); } } + if (showRemaining) { + if (remainingJobs.size() != 0) { + remainingJobs.sort(startTimeComparator); — End diff – Thanks for the reply @zentol , I wasn't sure I fully understand this comment. Are you suggesting that we should always group jobs with same status together during print for all 3 cases?
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6049#discussion_r192837393

            — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java —
            @@ -474,6 +482,19 @@ protected void list(String[] args) throws Exception

            { System.out.println("--------------------------------------------------------------"); }

            }
            + if (showRemaining) {
            + if (remainingJobs.size() != 0) {
            + remainingJobs.sort(startTimeComparator);
            — End diff –

            only for the last case, specifically this would separate `FAILED`, `COMPLETED` and `CANCELED` jobs.

            That said the other sections aren't that fine-grained either. That reminds me, we could name the last section `terminated` instead of `remaining`.

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6049#discussion_r192837393 — Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java — @@ -474,6 +482,19 @@ protected void list(String[] args) throws Exception { System.out.println("--------------------------------------------------------------"); } } + if (showRemaining) { + if (remainingJobs.size() != 0) { + remainingJobs.sort(startTimeComparator); — End diff – only for the last case, specifically this would separate `FAILED`, `COMPLETED` and `CANCELED` jobs. That said the other sections aren't that fine-grained either. That reminds me, we could name the last section `terminated` instead of `remaining`.
            githubbot ASF GitHub Bot added a comment -

            Github user walterddr commented on the issue:

            https://github.com/apache/flink/pull/6049

            Thanks for the comment @zentol . I actually thought about it and think we should group same jobs status listings together, there are also some other non-terminating states as well. Regarding the user-facing change to whether use `-t` (terminated) or `-a` (all). I lean towards using `-a`, but internally we named them terminated. Later on we can easily add supports like `finished` `cancelled` etc if necessary. What do you think?

            githubbot ASF GitHub Bot added a comment - Github user walterddr commented on the issue: https://github.com/apache/flink/pull/6049 Thanks for the comment @zentol . I actually thought about it and think we should group same jobs status listings together, there are also some other non-terminating states as well. Regarding the user-facing change to whether use `-t` (terminated) or `-a` (all). I lean towards using `-a`, but internally we named them terminated. Later on we can easily add supports like `finished` `cancelled` etc if necessary. What do you think?

            1.4: 021266719f2a7699e355d16462c255de54c1546a

            chesnay Chesnay Schepler added a comment - 1.4: 021266719f2a7699e355d16462c255de54c1546a

            master: a2a85bcb3cdb5202055984604f45809260787fd5
            1.5: 470691145bf67b60e07e5ed5ec302c8525689fee

            chesnay Chesnay Schepler added a comment - master: a2a85bcb3cdb5202055984604f45809260787fd5 1.5: 470691145bf67b60e07e5ed5ec302c8525689fee
            githubbot ASF GitHub Bot added a comment -

            Github user asfgit closed the pull request at:

            https://github.com/apache/flink/pull/6049

            githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6049

            People

              rongr Rong Rong
              rongr Rong Rong
              Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: