Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5645

IOMetrics transfer through ExecGraph does not work for failed jobs

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: Metrics, Webfrontend
    • Labels:
      None

      Description

      While a job is running the web-interface polls IO metrics using the MetricFetcher/MetricQueryService. For finished jobs these metrics are stored in the ExecutionGraph; each vertex has an IOMetrics object; basically a snapshot taken when the task was finished.

      For completed jobs this mechanism is working. However, for failed jobs it doesn't; in this case last fetched metrics are displayed.

      This isn't intended behavior and should be fixed.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3377

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3377
          Hide
          Zentol Chesnay Schepler added a comment -

          Fixed in 821da81fe6bee4c5a33be19c08064491fd6280de.

          Show
          Zentol Chesnay Schepler added a comment - Fixed in 821da81fe6bee4c5a33be19c08064491fd6280de.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3377

          Thanks for addressing Tills comments. I think this is good to merge now. Could you go ahead and do it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3377 Thanks for addressing Tills comments. I think this is good to merge now. Could you go ahead and do it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3377#discussion_r104394092

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java —
          @@ -1311,6 +1311,19 @@ public boolean updateState(TaskExecutionState state) {
          }
          }

          + private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState state) {
          — End diff –

          I've checked that returning `null` is the same behaviour as in `accumulators.deserializeUserAccumulators(userClassLoader)`. In general it would be good to add annotations for `@Nullable` return types or simply return empty collections, but that's an orthogonal change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3377#discussion_r104394092 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java — @@ -1311,6 +1311,19 @@ public boolean updateState(TaskExecutionState state) { } } + private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState state) { — End diff – I've checked that returning `null` is the same behaviour as in `accumulators.deserializeUserAccumulators(userClassLoader)`. In general it would be good to add annotations for `@Nullable` return types or simply return empty collections, but that's an orthogonal change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3377

          @tillrohrmann I've addressed your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3377 @tillrohrmann I've addressed your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3377#discussion_r102471279

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java —
          @@ -53,6 +53,22 @@ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter by
          this.numBytesOutPerSecond = bytesOut.getRate();
          }

          + public IOMetrics(
          + int numBytesInLocal, int numBytesInRemote, int numBytesOut, int numRecordsIn, int numRecordsOut,
          + double numBytesInLocalPerSecond, double numBytesInRemotePerSecond, double numBytesOutPerSecond,
          + double numRecordsInPerSecond, double numRecordsOutPerSecond) {
          — End diff –

          well there is a pattern: L1 are counts, L2 are byte rates, L3 are record rates.

          Can change it though.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3377#discussion_r102471279 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java — @@ -53,6 +53,22 @@ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter by this.numBytesOutPerSecond = bytesOut.getRate(); } + public IOMetrics( + int numBytesInLocal, int numBytesInRemote, int numBytesOut, int numRecordsIn, int numRecordsOut, + double numBytesInLocalPerSecond, double numBytesInRemotePerSecond, double numBytesOutPerSecond, + double numRecordsInPerSecond, double numRecordsOutPerSecond) { — End diff – well there is a pattern: L1 are counts, L2 are byte rates, L3 are record rates. Can change it though.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3377#discussion_r102467283

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java —
          @@ -855,14 +877,23 @@ private boolean processFail(Throwable t, boolean isCallback) {
          }

          if (current == CANCELING)

          { - cancelingComplete(); + cancelingComplete(userAccumulators, metrics); return false; }

          if (transitionState(current, FAILED, t)) {
          // success (in a manner of speaking)
          this.failureCause = t;

          + if (userAccumulators != null) {
          + synchronized (accumulatorLock)

          { + this.userAccumulators = userAccumulators; + }

          + }
          + if (metrics != null)

          { + this.ioMetrics = metrics; + }

          — End diff –

          Can this also be a separate method?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3377#discussion_r102467283 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java — @@ -855,14 +877,23 @@ private boolean processFail(Throwable t, boolean isCallback) { } if (current == CANCELING) { - cancelingComplete(); + cancelingComplete(userAccumulators, metrics); return false; } if (transitionState(current, FAILED, t)) { // success (in a manner of speaking) this.failureCause = t; + if (userAccumulators != null) { + synchronized (accumulatorLock) { + this.userAccumulators = userAccumulators; + } + } + if (metrics != null) { + this.ioMetrics = metrics; + } — End diff – Can this also be a separate method?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3377#discussion_r102465162

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java —
          @@ -53,6 +53,22 @@ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter by
          this.numBytesOutPerSecond = bytesOut.getRate();
          }

          + public IOMetrics(
          + int numBytesInLocal, int numBytesInRemote, int numBytesOut, int numRecordsIn, int numRecordsOut,
          + double numBytesInLocalPerSecond, double numBytesInRemotePerSecond, double numBytesOutPerSecond,
          + double numRecordsInPerSecond, double numRecordsOutPerSecond) {
          — End diff –

          I'm not sure whether this line breaking pattern is consistent.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3377#discussion_r102465162 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java — @@ -53,6 +53,22 @@ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter by this.numBytesOutPerSecond = bytesOut.getRate(); } + public IOMetrics( + int numBytesInLocal, int numBytesInRemote, int numBytesOut, int numRecordsIn, int numRecordsOut, + double numBytesInLocalPerSecond, double numBytesInRemotePerSecond, double numBytesOutPerSecond, + double numRecordsInPerSecond, double numRecordsOutPerSecond) { — End diff – I'm not sure whether this line breaking pattern is consistent.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3377#discussion_r102464984

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java —
          @@ -1292,10 +1292,28 @@ public boolean updateState(TaskExecutionState state) {
          }
          return true;
          case CANCELED:

          • attempt.cancelingComplete();
            + AccumulatorSnapshot acc1 = state.getAccumulators();
            + Map<String, Accumulator<?, ?>> userAcc1 = null;
            + if (acc1 != null)
            Unknown macro: { + try { + userAcc1 = acc1.deserializeUserAccumulators(userClassLoader); + } catch (Exception e) { + LOG.error("Failed to deserialize final accumulator results.", e); + }
            + }
            + attempt.cancelingComplete(userAcc1, state.getIOMetrics());
            return true;
            case FAILED:
            - attempt.markFailed(state.getError(userClassLoader));
            + AccumulatorSnapshot acc2 = state.getAccumulators();
            + Map<String, Accumulator<?, ?>> userAcc2 = null;
            + if (acc2 != null) {
            + try { + userAcc2 = acc2.deserializeUserAccumulators(userClassLoader); + } catch (Exception e) { + LOG.error("Failed to deserialize final accumulator results.", e); + } + }
              • End diff –

          Can this deserialization logic be factored out into a method?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3377#discussion_r102464984 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java — @@ -1292,10 +1292,28 @@ public boolean updateState(TaskExecutionState state) { } return true; case CANCELED: attempt.cancelingComplete(); + AccumulatorSnapshot acc1 = state.getAccumulators(); + Map<String, Accumulator<?, ?>> userAcc1 = null; + if (acc1 != null) Unknown macro: { + try { + userAcc1 = acc1.deserializeUserAccumulators(userClassLoader); + } catch (Exception e) { + LOG.error("Failed to deserialize final accumulator results.", e); + } + } + attempt.cancelingComplete(userAcc1, state.getIOMetrics()); return true; case FAILED: - attempt.markFailed(state.getError(userClassLoader)); + AccumulatorSnapshot acc2 = state.getAccumulators(); + Map<String, Accumulator<?, ?>> userAcc2 = null; + if (acc2 != null) { + try { + userAcc2 = acc2.deserializeUserAccumulators(userClassLoader); + } catch (Exception e) { + LOG.error("Failed to deserialize final accumulator results.", e); + } + } End diff – Can this deserialization logic be factored out into a method?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/3377

          FLINK-5645 Store accumulators/metrics for canceled/failed tasks

          This PR modified the Execution/ExecutionGraph to store transmitted io-metrics/accumulators for canceled/failed tasks. Previously these were only stored for finished tasks. For other tasks they were simply discarded.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 5645_metrics_failed

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3377.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3377


          commit 1a68ee62a86de283a1a0210dc637ac65d3860bef
          Author: zentol <chesnay@apache.org>
          Date: 2017-02-21T11:36:17Z

          FLINK-5645 Store accumulators/metrics for canceled/failed tasks


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3377 FLINK-5645 Store accumulators/metrics for canceled/failed tasks This PR modified the Execution/ExecutionGraph to store transmitted io-metrics/accumulators for canceled/failed tasks. Previously these were only stored for finished tasks. For other tasks they were simply discarded. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5645_metrics_failed Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3377.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3377 commit 1a68ee62a86de283a1a0210dc637ac65d3860bef Author: zentol <chesnay@apache.org> Date: 2017-02-21T11:36:17Z FLINK-5645 Store accumulators/metrics for canceled/failed tasks

            People

            • Assignee:
              Zentol Chesnay Schepler
              Reporter:
              Zentol Chesnay Schepler
            • Votes:
              1 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development