Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5852

Move JSON generation code into static methods

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Webfrontend
    • Labels:
      None

      Description

      In order to implement the HistoryServer we need a way to generate the JSON responses independent of the REST API. As such i suggest to move the main parts of the generation code for job-specific handlers into static methods.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/3365

          FLINK-5852 Move handler JSON generation code into static methods

          This PR is part of the History Server implementation. It is opened separately to make the review easier.

          The primary change is that the JSON generation of job-specific REST responses was moved from various ```handleRequest``` methods into static methods. This will allow easier re-use. In addition several refactorings have been made and tests were added.

          Other changes include:

          • added a utility method ```JsonUtils#addIOMetrics``` to aggregate ```IOMetrics```
          • added a utility method ```JsonUtils#writeIOMetrics``` to write ```IOMetrics```
          • added a utiltiy method ```JsonUtils#writeMinMaxAvg``` to write ```MinMaxAvgStats``` (checkpointing related)
          • replaced *job-related* hard-coded JSON keys with static constants, defined in ```JsonUtils#Keys```
          • added an additional constructor to each ```Archived*``` class for easier generation in tests
          • added ```BuilderUtils``` class for easier generation of ```Archived*``` classes in tests
          • modified ```IOMetrics``` to allow sub-classing without requiring usage of Meters
          • added a test for every introduced static method

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 5852_static_json

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3365.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3365


          commit 5369cf678c252aa5988ca33c68ab79560ff6cd41
          Author: zentol <chesnay@apache.org>
          Date: 2017-02-13T14:41:29Z

          FLINK-5852 Move handler JSON generation code into static methods


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3365 FLINK-5852 Move handler JSON generation code into static methods This PR is part of the History Server implementation. It is opened separately to make the review easier. The primary change is that the JSON generation of job-specific REST responses was moved from various ```handleRequest``` methods into static methods. This will allow easier re-use. In addition several refactorings have been made and tests were added. Other changes include: added a utility method ```JsonUtils#addIOMetrics``` to aggregate ```IOMetrics``` added a utility method ```JsonUtils#writeIOMetrics``` to write ```IOMetrics``` added a utiltiy method ```JsonUtils#writeMinMaxAvg``` to write ```MinMaxAvgStats``` (checkpointing related) replaced * job-related * hard-coded JSON keys with static constants, defined in ```JsonUtils#Keys``` added an additional constructor to each ```Archived*``` class for easier generation in tests added ```BuilderUtils``` class for easier generation of ```Archived*``` classes in tests modified ```IOMetrics``` to allow sub-classing without requiring usage of Meters added a test for every introduced static method You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5852_static_json Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3365.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3365 commit 5369cf678c252aa5988ca33c68ab79560ff6cd41 Author: zentol <chesnay@apache.org> Date: 2017-02-13T14:41:29Z FLINK-5852 Move handler JSON generation code into static methods
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103429470

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java —
          @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {

          @Override
          public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception

          { + return createJobExceptionsJson(graph); + }

          +
          + public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
          StringWriter writer = new StringWriter();
          JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

          gen.writeStartObject();

          // most important is the root failure cause
          String rootException = graph.getFailureCauseAsString();

          • if (rootException != null) {
          • gen.writeStringField("root-exception", rootException);
            + if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
              • End diff –

          Should we have both?
          ```java
          if (rootException != null && !rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))?
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103429470 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java — @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) { @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { + return createJobExceptionsJson(graph); + } + + public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); gen.writeStartObject(); // most important is the root failure cause String rootException = graph.getFailureCauseAsString(); if (rootException != null) { gen.writeStringField("root-exception", rootException); + if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { End diff – Should we have both? ```java if (rootException != null && !rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))? ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103431060

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          +
          + @Public
          + public static final class Keys {
          + public static final String TASKMANAGERS = "taskmanagers";
          + public static final String JOB_ID = "jid";
          + public static final String ID = "id";
          + public static final String NAME = "name";
          + public static final String STATE = "state";
          + public static final String IS_STOPPABLE = "isStoppable";
          + public static final String PARALLELISM = "parallelism";
          + public static final String PLAN = "plan";
          +
          + public static final String START_TIME = "start-time";
          + public static final String END_TIME = "end-time";
          + public static final String DURATION = "duration";
          + public static final String NOW = "now";
          + public static final String LAST_MODIFICATION = "last-modification";
          +
          + public static final String TIMESTAMP = "timestamp";
          + public static final String TIMESTAMPS = "timestamps";
          + public static final String STATUS_COUNTS = "status-counts";
          +
          + public static final String REFRESH_INTERVAL = "refresh-interval";
          + public static final String TIMEZONE_OFFSET = "timezone-offset";
          + public static final String TIMEZONE_NAME = "timezone-name";
          + public static final String FLINK_VERSION = "flink-version";
          + public static final String FLINK_REVISION = "flink-revision";
          +
          + public static final String EXECUTION_CONFIG = "execution-config";
          + public static final String MODE = "mode";
          + public static final String EXECUTION_MODE = "execution-mode";
          + public static final String RESTART_STRATEGY = "restart-strategy";
          + public static final String JOB_PARALLELISM = "job-parallelism";
          + public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
          + public static final String USER_CONFIG = "user-config";
          +
          + public static final String ROOT_EXCEPTION = "root-exception";
          + public static final String ALL_EXCEPTIONS = "all-exceptions";
          + public static final String EXCEPTION = "exception";
          + public static final String TRUNCATED = "truncated";
          +
          + public static final String HOST = "host";
          + public static final String LOCATION = "location";
          +
          + public static final String VERTICES = "vertices";
          + public static final String TASKS = "tasks";
          + public static final String TASK = "task";
          + public static final String SUBTASKS = "subtasks";
          + public static final String SUBTASK = "subtask";
          + public static final String ATTEMPT = "attempt";
          +
          + public static final String STATUS = "status";
          + public static final String TOTAL = "total";
          + public static final String PENDING = "pending";
          + public static final String RUNNING = "running";
          + public static final String FINISHED = "finished";
          + public static final String CANCELING = "canceling";
          + public static final String CANCELED = "canceled";
          + public static final String FAILED = "failed";
          + public static final String RESTORED = "restored";
          + public static final String PENDING_OR_FAILED = "pending_or_failed";
          + public static final String DISCARDED = "discarded";
          + public static final String IN_PROGRESS = "in_progress";
          + public static final String COMPLETED = "completed";
          +
          + public static final String METRICS = "metrics";
          + public static final String WRITE_BYTES = "write-bytes";
          + public static final String READ_BYTES = "read-bytes";
          + public static final String WRITE_RECORDS = "write-records";
          + public static final String READ_RECORDS = "read-records";
          + public static final String TYPE = "type";
          + public static final String VALUE = "value";
          +
          + public static final String MIN = "min";
          + public static final String MAX = "max";
          + public static final String AVG = "avg";
          +
          + public static final String JOB_ACCUMULATORS = "job-accumulators";
          + public static final String USER_ACCUMULATORS = "user-accumulators";
          + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
          +
          + public static final String COUNTS = "counts";
          + public static final String EXTERNALIZATION = "externalization";
          + public static final String EXTERNAL_PATH = "external-path";
          + public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
          + public static final String HISTORY = "history";
          +
          + public static final String SUMMARY = "summary";
          + public static final String STATE_SIZE = "state_size";
          + public static final String ETE_DURATION = "end_to_end_duration";
          + public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
          + public static final String SAVEPOINT = "savepoint";
          + public static final String IS_SAVEPOINT = "is_savepoint";
          + public static final String CHECKPOINT = "checkpoint";
          + public static final String CHECKPOINT_DURATION = "checkpoint_duration";
          + public static final String SYNC = "sync";
          + public static final String ASYNC = "async";
          + public static final String ALIGNMENT = "alignment";
          + public static final String BUFFERED = "buffered";
          +
          + public static final String LATEST = "latest";
          +
          + public static final String FAILURE_TIMESTAMP = "failure_timestamp";
          + public static final String FAILURE_MESSAGE = "failure_message";
          + public static final String RESTORE_TIMESTAMP = "restore_timestamp";
          +
          + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
          + public static final String ACK_TIMESTAMP = "ack_timestamp";
          + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
          +
          + public static final String NUM_SUBTASKS = "num_subtasks";
          + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
          + public static final String INDEX = "index";
          + public static final String INTERVAL = "interval";
          + public static final String ENABLED = "enabled";
          + public static final String TIMEOUT = "timeout";
          + public static final String MIN_PAUSE = "min_pause";
          + public static final String MAX_CONCURRENT = "max_concurrent";
          +
          + private Keys()

          { + }

          + }
          +
          + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException

          { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + }

          +
          + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
          — End diff –

          I would move this back to the respective handler package and make it package private or public there.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103431060 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + @Public + public static final class Keys { + public static final String TASKMANAGERS = "taskmanagers"; + public static final String JOB_ID = "jid"; + public static final String ID = "id"; + public static final String NAME = "name"; + public static final String STATE = "state"; + public static final String IS_STOPPABLE = "isStoppable"; + public static final String PARALLELISM = "parallelism"; + public static final String PLAN = "plan"; + + public static final String START_TIME = "start-time"; + public static final String END_TIME = "end-time"; + public static final String DURATION = "duration"; + public static final String NOW = "now"; + public static final String LAST_MODIFICATION = "last-modification"; + + public static final String TIMESTAMP = "timestamp"; + public static final String TIMESTAMPS = "timestamps"; + public static final String STATUS_COUNTS = "status-counts"; + + public static final String REFRESH_INTERVAL = "refresh-interval"; + public static final String TIMEZONE_OFFSET = "timezone-offset"; + public static final String TIMEZONE_NAME = "timezone-name"; + public static final String FLINK_VERSION = "flink-version"; + public static final String FLINK_REVISION = "flink-revision"; + + public static final String EXECUTION_CONFIG = "execution-config"; + public static final String MODE = "mode"; + public static final String EXECUTION_MODE = "execution-mode"; + public static final String RESTART_STRATEGY = "restart-strategy"; + public static final String JOB_PARALLELISM = "job-parallelism"; + public static final String OBJECT_REUSE_MODE = "object-reuse-mode"; + public static final String USER_CONFIG = "user-config"; + + public static final String ROOT_EXCEPTION = "root-exception"; + public static final String ALL_EXCEPTIONS = "all-exceptions"; + public static final String EXCEPTION = "exception"; + public static final String TRUNCATED = "truncated"; + + public static final String HOST = "host"; + public static final String LOCATION = "location"; + + public static final String VERTICES = "vertices"; + public static final String TASKS = "tasks"; + public static final String TASK = "task"; + public static final String SUBTASKS = "subtasks"; + public static final String SUBTASK = "subtask"; + public static final String ATTEMPT = "attempt"; + + public static final String STATUS = "status"; + public static final String TOTAL = "total"; + public static final String PENDING = "pending"; + public static final String RUNNING = "running"; + public static final String FINISHED = "finished"; + public static final String CANCELING = "canceling"; + public static final String CANCELED = "canceled"; + public static final String FAILED = "failed"; + public static final String RESTORED = "restored"; + public static final String PENDING_OR_FAILED = "pending_or_failed"; + public static final String DISCARDED = "discarded"; + public static final String IN_PROGRESS = "in_progress"; + public static final String COMPLETED = "completed"; + + public static final String METRICS = "metrics"; + public static final String WRITE_BYTES = "write-bytes"; + public static final String READ_BYTES = "read-bytes"; + public static final String WRITE_RECORDS = "write-records"; + public static final String READ_RECORDS = "read-records"; + public static final String TYPE = "type"; + public static final String VALUE = "value"; + + public static final String MIN = "min"; + public static final String MAX = "max"; + public static final String AVG = "avg"; + + public static final String JOB_ACCUMULATORS = "job-accumulators"; + public static final String USER_ACCUMULATORS = "user-accumulators"; + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators"; + + public static final String COUNTS = "counts"; + public static final String EXTERNALIZATION = "externalization"; + public static final String EXTERNAL_PATH = "external-path"; + public static final String DELETE_ON_CANCEL = "delete_on_cancellation"; + public static final String HISTORY = "history"; + + public static final String SUMMARY = "summary"; + public static final String STATE_SIZE = "state_size"; + public static final String ETE_DURATION = "end_to_end_duration"; + public static final String ALIGNMENT_BUFFERED = "alignment_buffered"; + public static final String SAVEPOINT = "savepoint"; + public static final String IS_SAVEPOINT = "is_savepoint"; + public static final String CHECKPOINT = "checkpoint"; + public static final String CHECKPOINT_DURATION = "checkpoint_duration"; + public static final String SYNC = "sync"; + public static final String ASYNC = "async"; + public static final String ALIGNMENT = "alignment"; + public static final String BUFFERED = "buffered"; + + public static final String LATEST = "latest"; + + public static final String FAILURE_TIMESTAMP = "failure_timestamp"; + public static final String FAILURE_MESSAGE = "failure_message"; + public static final String RESTORE_TIMESTAMP = "restore_timestamp"; + + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp"; + public static final String ACK_TIMESTAMP = "ack_timestamp"; + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String NUM_SUBTASKS = "num_subtasks"; + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + public static final String INDEX = "index"; + public static final String INTERVAL = "interval"; + public static final String ENABLED = "enabled"; + public static final String TIMEOUT = "timeout"; + public static final String MIN_PAUSE = "min_pause"; + public static final String MAX_CONCURRENT = "max_concurrent"; + + private Keys() { + } + } + + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + } + + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { — End diff – I would move this back to the respective handler package and make it package private or public there.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103430940

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          +
          + @Public
          + public static final class Keys {
          — End diff –

          If we want to make this generic we have to go another way I think. This is still very manual. I'm happy to discuss how else we could do it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103430940 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + @Public + public static final class Keys { — End diff – If we want to make this generic we have to go another way I think. This is still very manual. I'm happy to discuss how else we could do it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103431005

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          +
          + @Public
          + public static final class Keys {
          + public static final String TASKMANAGERS = "taskmanagers";
          + public static final String JOB_ID = "jid";
          + public static final String ID = "id";
          + public static final String NAME = "name";
          + public static final String STATE = "state";
          + public static final String IS_STOPPABLE = "isStoppable";
          + public static final String PARALLELISM = "parallelism";
          + public static final String PLAN = "plan";
          +
          + public static final String START_TIME = "start-time";
          + public static final String END_TIME = "end-time";
          + public static final String DURATION = "duration";
          + public static final String NOW = "now";
          + public static final String LAST_MODIFICATION = "last-modification";
          +
          + public static final String TIMESTAMP = "timestamp";
          + public static final String TIMESTAMPS = "timestamps";
          + public static final String STATUS_COUNTS = "status-counts";
          +
          + public static final String REFRESH_INTERVAL = "refresh-interval";
          + public static final String TIMEZONE_OFFSET = "timezone-offset";
          + public static final String TIMEZONE_NAME = "timezone-name";
          + public static final String FLINK_VERSION = "flink-version";
          + public static final String FLINK_REVISION = "flink-revision";
          +
          + public static final String EXECUTION_CONFIG = "execution-config";
          + public static final String MODE = "mode";
          + public static final String EXECUTION_MODE = "execution-mode";
          + public static final String RESTART_STRATEGY = "restart-strategy";
          + public static final String JOB_PARALLELISM = "job-parallelism";
          + public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
          + public static final String USER_CONFIG = "user-config";
          +
          + public static final String ROOT_EXCEPTION = "root-exception";
          + public static final String ALL_EXCEPTIONS = "all-exceptions";
          + public static final String EXCEPTION = "exception";
          + public static final String TRUNCATED = "truncated";
          +
          + public static final String HOST = "host";
          + public static final String LOCATION = "location";
          +
          + public static final String VERTICES = "vertices";
          + public static final String TASKS = "tasks";
          + public static final String TASK = "task";
          + public static final String SUBTASKS = "subtasks";
          + public static final String SUBTASK = "subtask";
          + public static final String ATTEMPT = "attempt";
          +
          + public static final String STATUS = "status";
          + public static final String TOTAL = "total";
          + public static final String PENDING = "pending";
          + public static final String RUNNING = "running";
          + public static final String FINISHED = "finished";
          + public static final String CANCELING = "canceling";
          + public static final String CANCELED = "canceled";
          + public static final String FAILED = "failed";
          + public static final String RESTORED = "restored";
          + public static final String PENDING_OR_FAILED = "pending_or_failed";
          + public static final String DISCARDED = "discarded";
          + public static final String IN_PROGRESS = "in_progress";
          + public static final String COMPLETED = "completed";
          +
          + public static final String METRICS = "metrics";
          + public static final String WRITE_BYTES = "write-bytes";
          + public static final String READ_BYTES = "read-bytes";
          + public static final String WRITE_RECORDS = "write-records";
          + public static final String READ_RECORDS = "read-records";
          + public static final String TYPE = "type";
          + public static final String VALUE = "value";
          +
          + public static final String MIN = "min";
          + public static final String MAX = "max";
          + public static final String AVG = "avg";
          +
          + public static final String JOB_ACCUMULATORS = "job-accumulators";
          + public static final String USER_ACCUMULATORS = "user-accumulators";
          + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
          +
          + public static final String COUNTS = "counts";
          + public static final String EXTERNALIZATION = "externalization";
          + public static final String EXTERNAL_PATH = "external-path";
          + public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
          + public static final String HISTORY = "history";
          +
          + public static final String SUMMARY = "summary";
          + public static final String STATE_SIZE = "state_size";
          + public static final String ETE_DURATION = "end_to_end_duration";
          + public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
          + public static final String SAVEPOINT = "savepoint";
          + public static final String IS_SAVEPOINT = "is_savepoint";
          + public static final String CHECKPOINT = "checkpoint";
          + public static final String CHECKPOINT_DURATION = "checkpoint_duration";
          + public static final String SYNC = "sync";
          + public static final String ASYNC = "async";
          + public static final String ALIGNMENT = "alignment";
          + public static final String BUFFERED = "buffered";
          +
          + public static final String LATEST = "latest";
          +
          + public static final String FAILURE_TIMESTAMP = "failure_timestamp";
          + public static final String FAILURE_MESSAGE = "failure_message";
          + public static final String RESTORE_TIMESTAMP = "restore_timestamp";
          +
          + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
          + public static final String ACK_TIMESTAMP = "ack_timestamp";
          + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
          +
          + public static final String NUM_SUBTASKS = "num_subtasks";
          + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
          + public static final String INDEX = "index";
          + public static final String INTERVAL = "interval";
          + public static final String ENABLED = "enabled";
          + public static final String TIMEOUT = "timeout";
          + public static final String MIN_PAUSE = "min_pause";
          + public static final String MAX_CONCURRENT = "max_concurrent";
          +
          + private Keys()

          { + }

          + }
          +
          + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException {
          — End diff –

          Unused

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103431005 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + @Public + public static final class Keys { + public static final String TASKMANAGERS = "taskmanagers"; + public static final String JOB_ID = "jid"; + public static final String ID = "id"; + public static final String NAME = "name"; + public static final String STATE = "state"; + public static final String IS_STOPPABLE = "isStoppable"; + public static final String PARALLELISM = "parallelism"; + public static final String PLAN = "plan"; + + public static final String START_TIME = "start-time"; + public static final String END_TIME = "end-time"; + public static final String DURATION = "duration"; + public static final String NOW = "now"; + public static final String LAST_MODIFICATION = "last-modification"; + + public static final String TIMESTAMP = "timestamp"; + public static final String TIMESTAMPS = "timestamps"; + public static final String STATUS_COUNTS = "status-counts"; + + public static final String REFRESH_INTERVAL = "refresh-interval"; + public static final String TIMEZONE_OFFSET = "timezone-offset"; + public static final String TIMEZONE_NAME = "timezone-name"; + public static final String FLINK_VERSION = "flink-version"; + public static final String FLINK_REVISION = "flink-revision"; + + public static final String EXECUTION_CONFIG = "execution-config"; + public static final String MODE = "mode"; + public static final String EXECUTION_MODE = "execution-mode"; + public static final String RESTART_STRATEGY = "restart-strategy"; + public static final String JOB_PARALLELISM = "job-parallelism"; + public static final String OBJECT_REUSE_MODE = "object-reuse-mode"; + public static final String USER_CONFIG = "user-config"; + + public static final String ROOT_EXCEPTION = "root-exception"; + public static final String ALL_EXCEPTIONS = "all-exceptions"; + public static final String EXCEPTION = "exception"; + public static final String TRUNCATED = "truncated"; + + public static final String HOST = "host"; + public static final String LOCATION = "location"; + + public static final String VERTICES = "vertices"; + public static final String TASKS = "tasks"; + public static final String TASK = "task"; + public static final String SUBTASKS = "subtasks"; + public static final String SUBTASK = "subtask"; + public static final String ATTEMPT = "attempt"; + + public static final String STATUS = "status"; + public static final String TOTAL = "total"; + public static final String PENDING = "pending"; + public static final String RUNNING = "running"; + public static final String FINISHED = "finished"; + public static final String CANCELING = "canceling"; + public static final String CANCELED = "canceled"; + public static final String FAILED = "failed"; + public static final String RESTORED = "restored"; + public static final String PENDING_OR_FAILED = "pending_or_failed"; + public static final String DISCARDED = "discarded"; + public static final String IN_PROGRESS = "in_progress"; + public static final String COMPLETED = "completed"; + + public static final String METRICS = "metrics"; + public static final String WRITE_BYTES = "write-bytes"; + public static final String READ_BYTES = "read-bytes"; + public static final String WRITE_RECORDS = "write-records"; + public static final String READ_RECORDS = "read-records"; + public static final String TYPE = "type"; + public static final String VALUE = "value"; + + public static final String MIN = "min"; + public static final String MAX = "max"; + public static final String AVG = "avg"; + + public static final String JOB_ACCUMULATORS = "job-accumulators"; + public static final String USER_ACCUMULATORS = "user-accumulators"; + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators"; + + public static final String COUNTS = "counts"; + public static final String EXTERNALIZATION = "externalization"; + public static final String EXTERNAL_PATH = "external-path"; + public static final String DELETE_ON_CANCEL = "delete_on_cancellation"; + public static final String HISTORY = "history"; + + public static final String SUMMARY = "summary"; + public static final String STATE_SIZE = "state_size"; + public static final String ETE_DURATION = "end_to_end_duration"; + public static final String ALIGNMENT_BUFFERED = "alignment_buffered"; + public static final String SAVEPOINT = "savepoint"; + public static final String IS_SAVEPOINT = "is_savepoint"; + public static final String CHECKPOINT = "checkpoint"; + public static final String CHECKPOINT_DURATION = "checkpoint_duration"; + public static final String SYNC = "sync"; + public static final String ASYNC = "async"; + public static final String ALIGNMENT = "alignment"; + public static final String BUFFERED = "buffered"; + + public static final String LATEST = "latest"; + + public static final String FAILURE_TIMESTAMP = "failure_timestamp"; + public static final String FAILURE_MESSAGE = "failure_message"; + public static final String RESTORE_TIMESTAMP = "restore_timestamp"; + + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp"; + public static final String ACK_TIMESTAMP = "ack_timestamp"; + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String NUM_SUBTASKS = "num_subtasks"; + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + public static final String INDEX = "index"; + public static final String INTERVAL = "interval"; + public static final String ENABLED = "enabled"; + public static final String TIMEOUT = "timeout"; + public static final String MIN_PAUSE = "min_pause"; + public static final String MAX_CONCURRENT = "max_concurrent"; + + private Keys() { + } + } + + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException { — End diff – Unused
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103425168

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java —
          @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
          }
          }

          + public ArchivedExecutionConfig(
          — End diff –

          Class is missing `serialVersionUID`

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103425168 — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java — @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) { } } + public ArchivedExecutionConfig( — End diff – Class is missing `serialVersionUID`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103425747

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java —
          @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
          }
          }

          + public ArchivedExecutionConfig(
          — End diff –

          I think this constructor should be the only one. The other `ArchivedExecutionConfig(ExecutionConfig)` constructor should become a static helper method, e.g. `static ArchivedExecutionConfig createFromExecutionConfig(ExecutionConfig)`, that calls the constructor.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103425747 — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java — @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) { } } + public ArchivedExecutionConfig( — End diff – I think this constructor should be the only one. The other `ArchivedExecutionConfig(ExecutionConfig)` constructor should become a static helper method, e.g. `static ArchivedExecutionConfig createFromExecutionConfig(ExecutionConfig)`, that calls the constructor.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103431658

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          +
          + @Public
          + public static final class Keys {
          + public static final String TASKMANAGERS = "taskmanagers";
          + public static final String JOB_ID = "jid";
          + public static final String ID = "id";
          + public static final String NAME = "name";
          + public static final String STATE = "state";
          + public static final String IS_STOPPABLE = "isStoppable";
          + public static final String PARALLELISM = "parallelism";
          + public static final String PLAN = "plan";
          +
          + public static final String START_TIME = "start-time";
          + public static final String END_TIME = "end-time";
          + public static final String DURATION = "duration";
          + public static final String NOW = "now";
          + public static final String LAST_MODIFICATION = "last-modification";
          +
          + public static final String TIMESTAMP = "timestamp";
          + public static final String TIMESTAMPS = "timestamps";
          + public static final String STATUS_COUNTS = "status-counts";
          +
          + public static final String REFRESH_INTERVAL = "refresh-interval";
          + public static final String TIMEZONE_OFFSET = "timezone-offset";
          + public static final String TIMEZONE_NAME = "timezone-name";
          + public static final String FLINK_VERSION = "flink-version";
          + public static final String FLINK_REVISION = "flink-revision";
          +
          + public static final String EXECUTION_CONFIG = "execution-config";
          + public static final String MODE = "mode";
          + public static final String EXECUTION_MODE = "execution-mode";
          + public static final String RESTART_STRATEGY = "restart-strategy";
          + public static final String JOB_PARALLELISM = "job-parallelism";
          + public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
          + public static final String USER_CONFIG = "user-config";
          +
          + public static final String ROOT_EXCEPTION = "root-exception";
          + public static final String ALL_EXCEPTIONS = "all-exceptions";
          + public static final String EXCEPTION = "exception";
          + public static final String TRUNCATED = "truncated";
          +
          + public static final String HOST = "host";
          + public static final String LOCATION = "location";
          +
          + public static final String VERTICES = "vertices";
          + public static final String TASKS = "tasks";
          + public static final String TASK = "task";
          + public static final String SUBTASKS = "subtasks";
          + public static final String SUBTASK = "subtask";
          + public static final String ATTEMPT = "attempt";
          +
          + public static final String STATUS = "status";
          + public static final String TOTAL = "total";
          + public static final String PENDING = "pending";
          + public static final String RUNNING = "running";
          + public static final String FINISHED = "finished";
          + public static final String CANCELING = "canceling";
          + public static final String CANCELED = "canceled";
          + public static final String FAILED = "failed";
          + public static final String RESTORED = "restored";
          + public static final String PENDING_OR_FAILED = "pending_or_failed";
          + public static final String DISCARDED = "discarded";
          + public static final String IN_PROGRESS = "in_progress";
          + public static final String COMPLETED = "completed";
          +
          + public static final String METRICS = "metrics";
          + public static final String WRITE_BYTES = "write-bytes";
          + public static final String READ_BYTES = "read-bytes";
          + public static final String WRITE_RECORDS = "write-records";
          + public static final String READ_RECORDS = "read-records";
          + public static final String TYPE = "type";
          + public static final String VALUE = "value";
          +
          + public static final String MIN = "min";
          + public static final String MAX = "max";
          + public static final String AVG = "avg";
          +
          + public static final String JOB_ACCUMULATORS = "job-accumulators";
          + public static final String USER_ACCUMULATORS = "user-accumulators";
          + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
          +
          + public static final String COUNTS = "counts";
          + public static final String EXTERNALIZATION = "externalization";
          + public static final String EXTERNAL_PATH = "external-path";
          + public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
          + public static final String HISTORY = "history";
          +
          + public static final String SUMMARY = "summary";
          + public static final String STATE_SIZE = "state_size";
          + public static final String ETE_DURATION = "end_to_end_duration";
          + public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
          + public static final String SAVEPOINT = "savepoint";
          + public static final String IS_SAVEPOINT = "is_savepoint";
          + public static final String CHECKPOINT = "checkpoint";
          + public static final String CHECKPOINT_DURATION = "checkpoint_duration";
          + public static final String SYNC = "sync";
          + public static final String ASYNC = "async";
          + public static final String ALIGNMENT = "alignment";
          + public static final String BUFFERED = "buffered";
          +
          + public static final String LATEST = "latest";
          +
          + public static final String FAILURE_TIMESTAMP = "failure_timestamp";
          + public static final String FAILURE_MESSAGE = "failure_message";
          + public static final String RESTORE_TIMESTAMP = "restore_timestamp";
          +
          + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
          + public static final String ACK_TIMESTAMP = "ack_timestamp";
          + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
          +
          + public static final String NUM_SUBTASKS = "num_subtasks";
          + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
          + public static final String INDEX = "index";
          + public static final String INTERVAL = "interval";
          + public static final String ENABLED = "enabled";
          + public static final String TIMEOUT = "timeout";
          + public static final String MIN_PAUSE = "min_pause";
          + public static final String MAX_CONCURRENT = "max_concurrent";
          +
          + private Keys()

          { + }

          + }
          +
          + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException

          { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + }

          +
          + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException

          { + gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum()); + gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum()); + gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage()); + }

          +
          + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
          + if (state.isTerminal()) {
          + if (ioMetrics != null)

          { // execAttempt is already finished, use final metrics stored in ExecutionGraph + summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal()); + summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote()); + summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut()); + summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn()); + summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut()); + }

          + } else { // execAttempt is still running, use MetricQueryService instead
          + if (fetcher != null) {
          + fetcher.update();
          + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex);
          + if (metrics != null)

          { + summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"))); + summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"))); + summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"))); + summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"))); + summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"))); + }

          + }
          + }
          + }
          +
          + public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException {
          — End diff –

          Move this to `MutableIOMetrics`

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103431658 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + @Public + public static final class Keys { + public static final String TASKMANAGERS = "taskmanagers"; + public static final String JOB_ID = "jid"; + public static final String ID = "id"; + public static final String NAME = "name"; + public static final String STATE = "state"; + public static final String IS_STOPPABLE = "isStoppable"; + public static final String PARALLELISM = "parallelism"; + public static final String PLAN = "plan"; + + public static final String START_TIME = "start-time"; + public static final String END_TIME = "end-time"; + public static final String DURATION = "duration"; + public static final String NOW = "now"; + public static final String LAST_MODIFICATION = "last-modification"; + + public static final String TIMESTAMP = "timestamp"; + public static final String TIMESTAMPS = "timestamps"; + public static final String STATUS_COUNTS = "status-counts"; + + public static final String REFRESH_INTERVAL = "refresh-interval"; + public static final String TIMEZONE_OFFSET = "timezone-offset"; + public static final String TIMEZONE_NAME = "timezone-name"; + public static final String FLINK_VERSION = "flink-version"; + public static final String FLINK_REVISION = "flink-revision"; + + public static final String EXECUTION_CONFIG = "execution-config"; + public static final String MODE = "mode"; + public static final String EXECUTION_MODE = "execution-mode"; + public static final String RESTART_STRATEGY = "restart-strategy"; + public static final String JOB_PARALLELISM = "job-parallelism"; + public static final String OBJECT_REUSE_MODE = "object-reuse-mode"; + public static final String USER_CONFIG = "user-config"; + + public static final String ROOT_EXCEPTION = "root-exception"; + public static final String ALL_EXCEPTIONS = "all-exceptions"; + public static final String EXCEPTION = "exception"; + public static final String TRUNCATED = "truncated"; + + public static final String HOST = "host"; + public static final String LOCATION = "location"; + + public static final String VERTICES = "vertices"; + public static final String TASKS = "tasks"; + public static final String TASK = "task"; + public static final String SUBTASKS = "subtasks"; + public static final String SUBTASK = "subtask"; + public static final String ATTEMPT = "attempt"; + + public static final String STATUS = "status"; + public static final String TOTAL = "total"; + public static final String PENDING = "pending"; + public static final String RUNNING = "running"; + public static final String FINISHED = "finished"; + public static final String CANCELING = "canceling"; + public static final String CANCELED = "canceled"; + public static final String FAILED = "failed"; + public static final String RESTORED = "restored"; + public static final String PENDING_OR_FAILED = "pending_or_failed"; + public static final String DISCARDED = "discarded"; + public static final String IN_PROGRESS = "in_progress"; + public static final String COMPLETED = "completed"; + + public static final String METRICS = "metrics"; + public static final String WRITE_BYTES = "write-bytes"; + public static final String READ_BYTES = "read-bytes"; + public static final String WRITE_RECORDS = "write-records"; + public static final String READ_RECORDS = "read-records"; + public static final String TYPE = "type"; + public static final String VALUE = "value"; + + public static final String MIN = "min"; + public static final String MAX = "max"; + public static final String AVG = "avg"; + + public static final String JOB_ACCUMULATORS = "job-accumulators"; + public static final String USER_ACCUMULATORS = "user-accumulators"; + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators"; + + public static final String COUNTS = "counts"; + public static final String EXTERNALIZATION = "externalization"; + public static final String EXTERNAL_PATH = "external-path"; + public static final String DELETE_ON_CANCEL = "delete_on_cancellation"; + public static final String HISTORY = "history"; + + public static final String SUMMARY = "summary"; + public static final String STATE_SIZE = "state_size"; + public static final String ETE_DURATION = "end_to_end_duration"; + public static final String ALIGNMENT_BUFFERED = "alignment_buffered"; + public static final String SAVEPOINT = "savepoint"; + public static final String IS_SAVEPOINT = "is_savepoint"; + public static final String CHECKPOINT = "checkpoint"; + public static final String CHECKPOINT_DURATION = "checkpoint_duration"; + public static final String SYNC = "sync"; + public static final String ASYNC = "async"; + public static final String ALIGNMENT = "alignment"; + public static final String BUFFERED = "buffered"; + + public static final String LATEST = "latest"; + + public static final String FAILURE_TIMESTAMP = "failure_timestamp"; + public static final String FAILURE_MESSAGE = "failure_message"; + public static final String RESTORE_TIMESTAMP = "restore_timestamp"; + + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp"; + public static final String ACK_TIMESTAMP = "ack_timestamp"; + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String NUM_SUBTASKS = "num_subtasks"; + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + public static final String INDEX = "index"; + public static final String INTERVAL = "interval"; + public static final String ENABLED = "enabled"; + public static final String TIMEOUT = "timeout"; + public static final String MIN_PAUSE = "min_pause"; + public static final String MAX_CONCURRENT = "max_concurrent"; + + private Keys() { + } + } + + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + } + + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { + gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum()); + gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum()); + gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage()); + } + + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) { + if (state.isTerminal()) { + if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph + summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal()); + summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote()); + summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut()); + summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn()); + summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut()); + } + } else { // execAttempt is still running, use MetricQueryService instead + if (fetcher != null) { + fetcher.update(); + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex); + if (metrics != null) { + summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"))); + summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"))); + summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"))); + summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"))); + summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"))); + } + } + } + } + + public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException { — End diff – Move this to `MutableIOMetrics`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103430761

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          +
          + @Public
          + public static final class Keys {
          — End diff –

          I'm skeptical about these, because they blow up this class again. I like that that we "guard" the name here by marking them as `@Public`, but I think I would revert this again. Some keys are very common and I don't see the benefit in accessing them here. Other are very rare and only used in one handler anyways.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103430761 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + @Public + public static final class Keys { — End diff – I'm skeptical about these, because they blow up this class again. I like that that we "guard" the name here by marking them as `@Public`, but I think I would revert this again. Some keys are very common and I don't see the benefit in accessing them here. Other are very rare and only used in one handler anyways.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103433744

          — Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/BuilderUtils.java —
          @@ -0,0 +1,436 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import org.apache.flink.api.common.ArchivedExecutionConfig;
          +import org.apache.flink.api.common.ExecutionMode;
          +import org.apache.flink.api.common.JobID;
          +import org.apache.flink.metrics.Counter;
          +import org.apache.flink.metrics.MeterView;
          +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
          +import org.apache.flink.runtime.clusterframework.types.ResourceID;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecution;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
          +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.jobgraph.JobStatus;
          +import org.apache.flink.runtime.jobgraph.JobVertexID;
          +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
          +import org.apache.flink.runtime.util.EvictingBoundedList;
          +import org.apache.flink.util.Preconditions;
          +import org.apache.flink.util.SerializedValue;
          +
          +import java.net.InetAddress;
          +import java.net.UnknownHostException;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Random;
          +
          +public class BuilderUtils {
          — End diff –

          Could you please move the builders out to top level classes and remove the utils class?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103433744 — Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/BuilderUtils.java — @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecution; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.EvictingBoundedList; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +public class BuilderUtils { — End diff – Could you please move the builders out to top level classes and remove the utils class?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103429562

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java —
          @@ -36,20 +38,23 @@ public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {

          @Override
          public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception

          { - StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); - + return createVertexAccumulatorsJson(jobVertex); + }

          + public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException {
          — End diff –

          Empty line missing before method

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103429562 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java — @@ -36,20 +38,23 @@ public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) { @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { - StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); - + return createVertexAccumulatorsJson(jobVertex); + } + public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException { — End diff – Empty line missing before method
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103434413

          — Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/JsonTestUtils.java —
          @@ -0,0 +1,158 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonFactory;
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import com.fasterxml.jackson.databind.JsonNode;
          +import com.fasterxml.jackson.databind.ObjectMapper;
          +import com.fasterxml.jackson.databind.node.ArrayNode;
          +import org.apache.flink.api.common.JobID;
          +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
          +import org.apache.flink.runtime.clusterframework.types.ResourceID;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecution;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecution;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.jobgraph.JobStatus;
          +import org.apache.flink.runtime.jobgraph.JobVertexID;
          +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
          +
          +import java.net.InetAddress;
          +import java.util.HashMap;
          +import java.util.Map;
          +
          +import static org.junit.Assert.assertEquals;
          +
          +public class JsonTestUtils {
          + public static final ObjectMapper mapper = new ObjectMapper();
          + public static final JsonFactory jacksonFactory = new JsonFactory()
          + .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
          + .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
          +
          + private static ArchivedExecutionGraph originalJob;
          + private static ArchivedExecutionJobVertex originalTask;
          + private static ArchivedExecutionVertex originalSubtask;
          + private static ArchivedExecution originalAttempt;
          +
          + private static final Object lock = new Object();
          +
          + private JsonTestUtils()

          { + }

          +
          + public static AccessExecutionGraph getTestJob() throws Exception {
          + synchronized (lock) {
          + if (originalJob == null)

          { + generateObjects(); + }
          + }
          + return originalJob;
          + }
          +
          + public static AccessExecutionJobVertex getTestTask() throws Exception {
          + synchronized (lock) {
          + if (originalJob == null) { + generateObjects(); + }

          + }
          + return originalTask;
          + }
          +
          + public static AccessExecutionVertex getTestSubtask() throws Exception {
          + synchronized (lock) {
          + if (originalJob == null)

          { + generateObjects(); + }
          + }
          + return originalSubtask;
          + }
          +
          + public static AccessExecution getTestAttempt() throws Exception {
          + synchronized (lock) {
          + if (originalJob == null) { + generateObjects(); + }

          + }
          + return originalAttempt;
          + }
          +
          + private static void generateObjects() throws Exception {
          — End diff –

          Very generic name that should have a comment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103434413 — Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/JsonTestUtils.java — @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecution; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class JsonTestUtils { + public static final ObjectMapper mapper = new ObjectMapper(); + public static final JsonFactory jacksonFactory = new JsonFactory() + .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) + .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT); + + private static ArchivedExecutionGraph originalJob; + private static ArchivedExecutionJobVertex originalTask; + private static ArchivedExecutionVertex originalSubtask; + private static ArchivedExecution originalAttempt; + + private static final Object lock = new Object(); + + private JsonTestUtils() { + } + + public static AccessExecutionGraph getTestJob() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateObjects(); + } + } + return originalJob; + } + + public static AccessExecutionJobVertex getTestTask() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateObjects(); + } + } + return originalTask; + } + + public static AccessExecutionVertex getTestSubtask() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateObjects(); + } + } + return originalSubtask; + } + + public static AccessExecution getTestAttempt() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateObjects(); + } + } + return originalAttempt; + } + + private static void generateObjects() throws Exception { — End diff – Very generic name that should have a comment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103431622

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          +
          + @Public
          + public static final class Keys {
          + public static final String TASKMANAGERS = "taskmanagers";
          + public static final String JOB_ID = "jid";
          + public static final String ID = "id";
          + public static final String NAME = "name";
          + public static final String STATE = "state";
          + public static final String IS_STOPPABLE = "isStoppable";
          + public static final String PARALLELISM = "parallelism";
          + public static final String PLAN = "plan";
          +
          + public static final String START_TIME = "start-time";
          + public static final String END_TIME = "end-time";
          + public static final String DURATION = "duration";
          + public static final String NOW = "now";
          + public static final String LAST_MODIFICATION = "last-modification";
          +
          + public static final String TIMESTAMP = "timestamp";
          + public static final String TIMESTAMPS = "timestamps";
          + public static final String STATUS_COUNTS = "status-counts";
          +
          + public static final String REFRESH_INTERVAL = "refresh-interval";
          + public static final String TIMEZONE_OFFSET = "timezone-offset";
          + public static final String TIMEZONE_NAME = "timezone-name";
          + public static final String FLINK_VERSION = "flink-version";
          + public static final String FLINK_REVISION = "flink-revision";
          +
          + public static final String EXECUTION_CONFIG = "execution-config";
          + public static final String MODE = "mode";
          + public static final String EXECUTION_MODE = "execution-mode";
          + public static final String RESTART_STRATEGY = "restart-strategy";
          + public static final String JOB_PARALLELISM = "job-parallelism";
          + public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
          + public static final String USER_CONFIG = "user-config";
          +
          + public static final String ROOT_EXCEPTION = "root-exception";
          + public static final String ALL_EXCEPTIONS = "all-exceptions";
          + public static final String EXCEPTION = "exception";
          + public static final String TRUNCATED = "truncated";
          +
          + public static final String HOST = "host";
          + public static final String LOCATION = "location";
          +
          + public static final String VERTICES = "vertices";
          + public static final String TASKS = "tasks";
          + public static final String TASK = "task";
          + public static final String SUBTASKS = "subtasks";
          + public static final String SUBTASK = "subtask";
          + public static final String ATTEMPT = "attempt";
          +
          + public static final String STATUS = "status";
          + public static final String TOTAL = "total";
          + public static final String PENDING = "pending";
          + public static final String RUNNING = "running";
          + public static final String FINISHED = "finished";
          + public static final String CANCELING = "canceling";
          + public static final String CANCELED = "canceled";
          + public static final String FAILED = "failed";
          + public static final String RESTORED = "restored";
          + public static final String PENDING_OR_FAILED = "pending_or_failed";
          + public static final String DISCARDED = "discarded";
          + public static final String IN_PROGRESS = "in_progress";
          + public static final String COMPLETED = "completed";
          +
          + public static final String METRICS = "metrics";
          + public static final String WRITE_BYTES = "write-bytes";
          + public static final String READ_BYTES = "read-bytes";
          + public static final String WRITE_RECORDS = "write-records";
          + public static final String READ_RECORDS = "read-records";
          + public static final String TYPE = "type";
          + public static final String VALUE = "value";
          +
          + public static final String MIN = "min";
          + public static final String MAX = "max";
          + public static final String AVG = "avg";
          +
          + public static final String JOB_ACCUMULATORS = "job-accumulators";
          + public static final String USER_ACCUMULATORS = "user-accumulators";
          + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
          +
          + public static final String COUNTS = "counts";
          + public static final String EXTERNALIZATION = "externalization";
          + public static final String EXTERNAL_PATH = "external-path";
          + public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
          + public static final String HISTORY = "history";
          +
          + public static final String SUMMARY = "summary";
          + public static final String STATE_SIZE = "state_size";
          + public static final String ETE_DURATION = "end_to_end_duration";
          + public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
          + public static final String SAVEPOINT = "savepoint";
          + public static final String IS_SAVEPOINT = "is_savepoint";
          + public static final String CHECKPOINT = "checkpoint";
          + public static final String CHECKPOINT_DURATION = "checkpoint_duration";
          + public static final String SYNC = "sync";
          + public static final String ASYNC = "async";
          + public static final String ALIGNMENT = "alignment";
          + public static final String BUFFERED = "buffered";
          +
          + public static final String LATEST = "latest";
          +
          + public static final String FAILURE_TIMESTAMP = "failure_timestamp";
          + public static final String FAILURE_MESSAGE = "failure_message";
          + public static final String RESTORE_TIMESTAMP = "restore_timestamp";
          +
          + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
          + public static final String ACK_TIMESTAMP = "ack_timestamp";
          + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
          +
          + public static final String NUM_SUBTASKS = "num_subtasks";
          + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
          + public static final String INDEX = "index";
          + public static final String INTERVAL = "interval";
          + public static final String ENABLED = "enabled";
          + public static final String TIMEOUT = "timeout";
          + public static final String MIN_PAUSE = "min_pause";
          + public static final String MAX_CONCURRENT = "max_concurrent";
          +
          + private Keys()

          { + }

          + }
          +
          + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException

          { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + }

          +
          + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException

          { + gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum()); + gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum()); + gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage()); + }

          +
          + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
          — End diff –

          Move these to the `MutableIOMetrics ` class

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103431622 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + @Public + public static final class Keys { + public static final String TASKMANAGERS = "taskmanagers"; + public static final String JOB_ID = "jid"; + public static final String ID = "id"; + public static final String NAME = "name"; + public static final String STATE = "state"; + public static final String IS_STOPPABLE = "isStoppable"; + public static final String PARALLELISM = "parallelism"; + public static final String PLAN = "plan"; + + public static final String START_TIME = "start-time"; + public static final String END_TIME = "end-time"; + public static final String DURATION = "duration"; + public static final String NOW = "now"; + public static final String LAST_MODIFICATION = "last-modification"; + + public static final String TIMESTAMP = "timestamp"; + public static final String TIMESTAMPS = "timestamps"; + public static final String STATUS_COUNTS = "status-counts"; + + public static final String REFRESH_INTERVAL = "refresh-interval"; + public static final String TIMEZONE_OFFSET = "timezone-offset"; + public static final String TIMEZONE_NAME = "timezone-name"; + public static final String FLINK_VERSION = "flink-version"; + public static final String FLINK_REVISION = "flink-revision"; + + public static final String EXECUTION_CONFIG = "execution-config"; + public static final String MODE = "mode"; + public static final String EXECUTION_MODE = "execution-mode"; + public static final String RESTART_STRATEGY = "restart-strategy"; + public static final String JOB_PARALLELISM = "job-parallelism"; + public static final String OBJECT_REUSE_MODE = "object-reuse-mode"; + public static final String USER_CONFIG = "user-config"; + + public static final String ROOT_EXCEPTION = "root-exception"; + public static final String ALL_EXCEPTIONS = "all-exceptions"; + public static final String EXCEPTION = "exception"; + public static final String TRUNCATED = "truncated"; + + public static final String HOST = "host"; + public static final String LOCATION = "location"; + + public static final String VERTICES = "vertices"; + public static final String TASKS = "tasks"; + public static final String TASK = "task"; + public static final String SUBTASKS = "subtasks"; + public static final String SUBTASK = "subtask"; + public static final String ATTEMPT = "attempt"; + + public static final String STATUS = "status"; + public static final String TOTAL = "total"; + public static final String PENDING = "pending"; + public static final String RUNNING = "running"; + public static final String FINISHED = "finished"; + public static final String CANCELING = "canceling"; + public static final String CANCELED = "canceled"; + public static final String FAILED = "failed"; + public static final String RESTORED = "restored"; + public static final String PENDING_OR_FAILED = "pending_or_failed"; + public static final String DISCARDED = "discarded"; + public static final String IN_PROGRESS = "in_progress"; + public static final String COMPLETED = "completed"; + + public static final String METRICS = "metrics"; + public static final String WRITE_BYTES = "write-bytes"; + public static final String READ_BYTES = "read-bytes"; + public static final String WRITE_RECORDS = "write-records"; + public static final String READ_RECORDS = "read-records"; + public static final String TYPE = "type"; + public static final String VALUE = "value"; + + public static final String MIN = "min"; + public static final String MAX = "max"; + public static final String AVG = "avg"; + + public static final String JOB_ACCUMULATORS = "job-accumulators"; + public static final String USER_ACCUMULATORS = "user-accumulators"; + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators"; + + public static final String COUNTS = "counts"; + public static final String EXTERNALIZATION = "externalization"; + public static final String EXTERNAL_PATH = "external-path"; + public static final String DELETE_ON_CANCEL = "delete_on_cancellation"; + public static final String HISTORY = "history"; + + public static final String SUMMARY = "summary"; + public static final String STATE_SIZE = "state_size"; + public static final String ETE_DURATION = "end_to_end_duration"; + public static final String ALIGNMENT_BUFFERED = "alignment_buffered"; + public static final String SAVEPOINT = "savepoint"; + public static final String IS_SAVEPOINT = "is_savepoint"; + public static final String CHECKPOINT = "checkpoint"; + public static final String CHECKPOINT_DURATION = "checkpoint_duration"; + public static final String SYNC = "sync"; + public static final String ASYNC = "async"; + public static final String ALIGNMENT = "alignment"; + public static final String BUFFERED = "buffered"; + + public static final String LATEST = "latest"; + + public static final String FAILURE_TIMESTAMP = "failure_timestamp"; + public static final String FAILURE_MESSAGE = "failure_message"; + public static final String RESTORE_TIMESTAMP = "restore_timestamp"; + + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp"; + public static final String ACK_TIMESTAMP = "ack_timestamp"; + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String NUM_SUBTASKS = "num_subtasks"; + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + public static final String INDEX = "index"; + public static final String INTERVAL = "interval"; + public static final String ENABLED = "enabled"; + public static final String TIMEOUT = "timeout"; + public static final String MIN_PAUSE = "min_pause"; + public static final String MAX_CONCURRENT = "max_concurrent"; + + private Keys() { + } + } + + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + } + + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { + gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum()); + gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum()); + gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage()); + } + + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) { — End diff – Move these to the `MutableIOMetrics ` class
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103430333

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          — End diff –

          Why is this not simply kept in the respective handler where it is relevant?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103430333 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; — End diff – Why is this not simply kept in the respective handler where it is relevant?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103431495

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          +
          + @Public
          + public static final class Keys {
          + public static final String TASKMANAGERS = "taskmanagers";
          + public static final String JOB_ID = "jid";
          + public static final String ID = "id";
          + public static final String NAME = "name";
          + public static final String STATE = "state";
          + public static final String IS_STOPPABLE = "isStoppable";
          + public static final String PARALLELISM = "parallelism";
          + public static final String PLAN = "plan";
          +
          + public static final String START_TIME = "start-time";
          + public static final String END_TIME = "end-time";
          + public static final String DURATION = "duration";
          + public static final String NOW = "now";
          + public static final String LAST_MODIFICATION = "last-modification";
          +
          + public static final String TIMESTAMP = "timestamp";
          + public static final String TIMESTAMPS = "timestamps";
          + public static final String STATUS_COUNTS = "status-counts";
          +
          + public static final String REFRESH_INTERVAL = "refresh-interval";
          + public static final String TIMEZONE_OFFSET = "timezone-offset";
          + public static final String TIMEZONE_NAME = "timezone-name";
          + public static final String FLINK_VERSION = "flink-version";
          + public static final String FLINK_REVISION = "flink-revision";
          +
          + public static final String EXECUTION_CONFIG = "execution-config";
          + public static final String MODE = "mode";
          + public static final String EXECUTION_MODE = "execution-mode";
          + public static final String RESTART_STRATEGY = "restart-strategy";
          + public static final String JOB_PARALLELISM = "job-parallelism";
          + public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
          + public static final String USER_CONFIG = "user-config";
          +
          + public static final String ROOT_EXCEPTION = "root-exception";
          + public static final String ALL_EXCEPTIONS = "all-exceptions";
          + public static final String EXCEPTION = "exception";
          + public static final String TRUNCATED = "truncated";
          +
          + public static final String HOST = "host";
          + public static final String LOCATION = "location";
          +
          + public static final String VERTICES = "vertices";
          + public static final String TASKS = "tasks";
          + public static final String TASK = "task";
          + public static final String SUBTASKS = "subtasks";
          + public static final String SUBTASK = "subtask";
          + public static final String ATTEMPT = "attempt";
          +
          + public static final String STATUS = "status";
          + public static final String TOTAL = "total";
          + public static final String PENDING = "pending";
          + public static final String RUNNING = "running";
          + public static final String FINISHED = "finished";
          + public static final String CANCELING = "canceling";
          + public static final String CANCELED = "canceled";
          + public static final String FAILED = "failed";
          + public static final String RESTORED = "restored";
          + public static final String PENDING_OR_FAILED = "pending_or_failed";
          + public static final String DISCARDED = "discarded";
          + public static final String IN_PROGRESS = "in_progress";
          + public static final String COMPLETED = "completed";
          +
          + public static final String METRICS = "metrics";
          + public static final String WRITE_BYTES = "write-bytes";
          + public static final String READ_BYTES = "read-bytes";
          + public static final String WRITE_RECORDS = "write-records";
          + public static final String READ_RECORDS = "read-records";
          + public static final String TYPE = "type";
          + public static final String VALUE = "value";
          +
          + public static final String MIN = "min";
          + public static final String MAX = "max";
          + public static final String AVG = "avg";
          +
          + public static final String JOB_ACCUMULATORS = "job-accumulators";
          + public static final String USER_ACCUMULATORS = "user-accumulators";
          + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
          +
          + public static final String COUNTS = "counts";
          + public static final String EXTERNALIZATION = "externalization";
          + public static final String EXTERNAL_PATH = "external-path";
          + public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
          + public static final String HISTORY = "history";
          +
          + public static final String SUMMARY = "summary";
          + public static final String STATE_SIZE = "state_size";
          + public static final String ETE_DURATION = "end_to_end_duration";
          + public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
          + public static final String SAVEPOINT = "savepoint";
          + public static final String IS_SAVEPOINT = "is_savepoint";
          + public static final String CHECKPOINT = "checkpoint";
          + public static final String CHECKPOINT_DURATION = "checkpoint_duration";
          + public static final String SYNC = "sync";
          + public static final String ASYNC = "async";
          + public static final String ALIGNMENT = "alignment";
          + public static final String BUFFERED = "buffered";
          +
          + public static final String LATEST = "latest";
          +
          + public static final String FAILURE_TIMESTAMP = "failure_timestamp";
          + public static final String FAILURE_MESSAGE = "failure_message";
          + public static final String RESTORE_TIMESTAMP = "restore_timestamp";
          +
          + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
          + public static final String ACK_TIMESTAMP = "ack_timestamp";
          + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
          +
          + public static final String NUM_SUBTASKS = "num_subtasks";
          + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
          + public static final String INDEX = "index";
          + public static final String INTERVAL = "interval";
          + public static final String ENABLED = "enabled";
          + public static final String TIMEOUT = "timeout";
          + public static final String MIN_PAUSE = "min_pause";
          + public static final String MAX_CONCURRENT = "max_concurrent";
          +
          + private Keys()

          { + }

          + }
          +
          + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException

          { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + }

          +
          + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException

          { + gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum()); + gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum()); + gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage()); + }

          +
          + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
          + if (state.isTerminal()) {
          + if (ioMetrics != null)

          { // execAttempt is already finished, use final metrics stored in ExecutionGraph + summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal()); + summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote()); + summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut()); + summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn()); + summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut()); + }

          + } else { // execAttempt is still running, use MetricQueryService instead
          + if (fetcher != null) {
          + fetcher.update();
          + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex);
          + if (metrics != null)

          { + summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"))); + summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"))); + summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"))); + summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"))); + summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"))); + }

          + }
          + }
          + }
          +
          + public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException

          { + gen.writeObjectFieldStart(Keys.METRICS); + gen.writeNumberField(Keys.READ_BYTES, metrics.getNumBytesInLocal() + metrics.getNumBytesInRemote()); + gen.writeNumberField(Keys.WRITE_BYTES, metrics.getNumBytesOut()); + gen.writeNumberField(Keys.READ_RECORDS, metrics.getNumRecordsIn()); + gen.writeNumberField(Keys.WRITE_RECORDS, metrics.getNumRecordsOut()); + gen.writeEndObject(); + }

          +
          + public static class MutableIOMetrics extends IOMetrics {
          — End diff –

          Missing `serialVersionUID`. I would move all the related metrics utilities to this class. Please, also add comments to the class. I think it's OK to make this class a top level class in the ` org.apache.flink.runtime.webmonitor.utils` package.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103431495 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + @Public + public static final class Keys { + public static final String TASKMANAGERS = "taskmanagers"; + public static final String JOB_ID = "jid"; + public static final String ID = "id"; + public static final String NAME = "name"; + public static final String STATE = "state"; + public static final String IS_STOPPABLE = "isStoppable"; + public static final String PARALLELISM = "parallelism"; + public static final String PLAN = "plan"; + + public static final String START_TIME = "start-time"; + public static final String END_TIME = "end-time"; + public static final String DURATION = "duration"; + public static final String NOW = "now"; + public static final String LAST_MODIFICATION = "last-modification"; + + public static final String TIMESTAMP = "timestamp"; + public static final String TIMESTAMPS = "timestamps"; + public static final String STATUS_COUNTS = "status-counts"; + + public static final String REFRESH_INTERVAL = "refresh-interval"; + public static final String TIMEZONE_OFFSET = "timezone-offset"; + public static final String TIMEZONE_NAME = "timezone-name"; + public static final String FLINK_VERSION = "flink-version"; + public static final String FLINK_REVISION = "flink-revision"; + + public static final String EXECUTION_CONFIG = "execution-config"; + public static final String MODE = "mode"; + public static final String EXECUTION_MODE = "execution-mode"; + public static final String RESTART_STRATEGY = "restart-strategy"; + public static final String JOB_PARALLELISM = "job-parallelism"; + public static final String OBJECT_REUSE_MODE = "object-reuse-mode"; + public static final String USER_CONFIG = "user-config"; + + public static final String ROOT_EXCEPTION = "root-exception"; + public static final String ALL_EXCEPTIONS = "all-exceptions"; + public static final String EXCEPTION = "exception"; + public static final String TRUNCATED = "truncated"; + + public static final String HOST = "host"; + public static final String LOCATION = "location"; + + public static final String VERTICES = "vertices"; + public static final String TASKS = "tasks"; + public static final String TASK = "task"; + public static final String SUBTASKS = "subtasks"; + public static final String SUBTASK = "subtask"; + public static final String ATTEMPT = "attempt"; + + public static final String STATUS = "status"; + public static final String TOTAL = "total"; + public static final String PENDING = "pending"; + public static final String RUNNING = "running"; + public static final String FINISHED = "finished"; + public static final String CANCELING = "canceling"; + public static final String CANCELED = "canceled"; + public static final String FAILED = "failed"; + public static final String RESTORED = "restored"; + public static final String PENDING_OR_FAILED = "pending_or_failed"; + public static final String DISCARDED = "discarded"; + public static final String IN_PROGRESS = "in_progress"; + public static final String COMPLETED = "completed"; + + public static final String METRICS = "metrics"; + public static final String WRITE_BYTES = "write-bytes"; + public static final String READ_BYTES = "read-bytes"; + public static final String WRITE_RECORDS = "write-records"; + public static final String READ_RECORDS = "read-records"; + public static final String TYPE = "type"; + public static final String VALUE = "value"; + + public static final String MIN = "min"; + public static final String MAX = "max"; + public static final String AVG = "avg"; + + public static final String JOB_ACCUMULATORS = "job-accumulators"; + public static final String USER_ACCUMULATORS = "user-accumulators"; + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators"; + + public static final String COUNTS = "counts"; + public static final String EXTERNALIZATION = "externalization"; + public static final String EXTERNAL_PATH = "external-path"; + public static final String DELETE_ON_CANCEL = "delete_on_cancellation"; + public static final String HISTORY = "history"; + + public static final String SUMMARY = "summary"; + public static final String STATE_SIZE = "state_size"; + public static final String ETE_DURATION = "end_to_end_duration"; + public static final String ALIGNMENT_BUFFERED = "alignment_buffered"; + public static final String SAVEPOINT = "savepoint"; + public static final String IS_SAVEPOINT = "is_savepoint"; + public static final String CHECKPOINT = "checkpoint"; + public static final String CHECKPOINT_DURATION = "checkpoint_duration"; + public static final String SYNC = "sync"; + public static final String ASYNC = "async"; + public static final String ALIGNMENT = "alignment"; + public static final String BUFFERED = "buffered"; + + public static final String LATEST = "latest"; + + public static final String FAILURE_TIMESTAMP = "failure_timestamp"; + public static final String FAILURE_MESSAGE = "failure_message"; + public static final String RESTORE_TIMESTAMP = "restore_timestamp"; + + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp"; + public static final String ACK_TIMESTAMP = "ack_timestamp"; + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String NUM_SUBTASKS = "num_subtasks"; + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + public static final String INDEX = "index"; + public static final String INTERVAL = "interval"; + public static final String ENABLED = "enabled"; + public static final String TIMEOUT = "timeout"; + public static final String MIN_PAUSE = "min_pause"; + public static final String MAX_CONCURRENT = "max_concurrent"; + + private Keys() { + } + } + + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + } + + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { + gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum()); + gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum()); + gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage()); + } + + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) { + if (state.isTerminal()) { + if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph + summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal()); + summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote()); + summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut()); + summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn()); + summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut()); + } + } else { // execAttempt is still running, use MetricQueryService instead + if (fetcher != null) { + fetcher.update(); + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex); + if (metrics != null) { + summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"))); + summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"))); + summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"))); + summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"))); + summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"))); + } + } + } + } + + public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException { + gen.writeObjectFieldStart(Keys.METRICS); + gen.writeNumberField(Keys.READ_BYTES, metrics.getNumBytesInLocal() + metrics.getNumBytesInRemote()); + gen.writeNumberField(Keys.WRITE_BYTES, metrics.getNumBytesOut()); + gen.writeNumberField(Keys.READ_RECORDS, metrics.getNumRecordsIn()); + gen.writeNumberField(Keys.WRITE_RECORDS, metrics.getNumRecordsOut()); + gen.writeEndObject(); + } + + public static class MutableIOMetrics extends IOMetrics { — End diff – Missing `serialVersionUID`. I would move all the related metrics utilities to this class. Please, also add comments to the class. I think it's OK to make this class a top level class in the ` org.apache.flink.runtime.webmonitor.utils` package.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103425958

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java —
          @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
          }
          }

          + public ArchivedExecutionConfig(
          + String executionMode,
          + String restartStrategyDescription,
          + int parallelism,
          + boolean objectReuseEnabled,
          + Map<String, String> globalJobParameters) {
          + this.executionMode = executionMode;
          — End diff –

          Let's add `checkNotNull` checks where applicable and `checkArgument(parallelism >= 1)`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103425958 — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java — @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) { } } + public ArchivedExecutionConfig( + String executionMode, + String restartStrategyDescription, + int parallelism, + boolean objectReuseEnabled, + Map<String, String> globalJobParameters) { + this.executionMode = executionMode; — End diff – Let's add `checkNotNull` checks where applicable and `checkArgument(parallelism >= 1)`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103434202

          — Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/JsonTestUtils.java —
          @@ -0,0 +1,158 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonFactory;
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import com.fasterxml.jackson.databind.JsonNode;
          +import com.fasterxml.jackson.databind.ObjectMapper;
          +import com.fasterxml.jackson.databind.node.ArrayNode;
          +import org.apache.flink.api.common.JobID;
          +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
          +import org.apache.flink.runtime.clusterframework.types.ResourceID;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecution;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecution;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.jobgraph.JobStatus;
          +import org.apache.flink.runtime.jobgraph.JobVertexID;
          +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
          +
          +import java.net.InetAddress;
          +import java.util.HashMap;
          +import java.util.Map;
          +
          +import static org.junit.Assert.assertEquals;
          +
          +public class JsonTestUtils {
          — End diff –

          Could please add comments? It looks like this is a very specific utility for job specific tests. I think this should be easier to get than checking where it is used etc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103434202 — Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/JsonTestUtils.java — @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecution; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class JsonTestUtils { — End diff – Could please add comments? It looks like this is a very specific utility for job specific tests. I think this should be easier to get than checking where it is used etc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103433357

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          +
          + @Public
          + public static final class Keys {
          + public static final String TASKMANAGERS = "taskmanagers";
          + public static final String JOB_ID = "jid";
          + public static final String ID = "id";
          + public static final String NAME = "name";
          + public static final String STATE = "state";
          + public static final String IS_STOPPABLE = "isStoppable";
          + public static final String PARALLELISM = "parallelism";
          + public static final String PLAN = "plan";
          +
          + public static final String START_TIME = "start-time";
          + public static final String END_TIME = "end-time";
          + public static final String DURATION = "duration";
          + public static final String NOW = "now";
          + public static final String LAST_MODIFICATION = "last-modification";
          +
          + public static final String TIMESTAMP = "timestamp";
          + public static final String TIMESTAMPS = "timestamps";
          + public static final String STATUS_COUNTS = "status-counts";
          +
          + public static final String REFRESH_INTERVAL = "refresh-interval";
          + public static final String TIMEZONE_OFFSET = "timezone-offset";
          + public static final String TIMEZONE_NAME = "timezone-name";
          + public static final String FLINK_VERSION = "flink-version";
          + public static final String FLINK_REVISION = "flink-revision";
          +
          + public static final String EXECUTION_CONFIG = "execution-config";
          + public static final String MODE = "mode";
          + public static final String EXECUTION_MODE = "execution-mode";
          + public static final String RESTART_STRATEGY = "restart-strategy";
          + public static final String JOB_PARALLELISM = "job-parallelism";
          + public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
          + public static final String USER_CONFIG = "user-config";
          +
          + public static final String ROOT_EXCEPTION = "root-exception";
          + public static final String ALL_EXCEPTIONS = "all-exceptions";
          + public static final String EXCEPTION = "exception";
          + public static final String TRUNCATED = "truncated";
          +
          + public static final String HOST = "host";
          + public static final String LOCATION = "location";
          +
          + public static final String VERTICES = "vertices";
          + public static final String TASKS = "tasks";
          + public static final String TASK = "task";
          + public static final String SUBTASKS = "subtasks";
          + public static final String SUBTASK = "subtask";
          + public static final String ATTEMPT = "attempt";
          +
          + public static final String STATUS = "status";
          + public static final String TOTAL = "total";
          + public static final String PENDING = "pending";
          + public static final String RUNNING = "running";
          + public static final String FINISHED = "finished";
          + public static final String CANCELING = "canceling";
          + public static final String CANCELED = "canceled";
          + public static final String FAILED = "failed";
          + public static final String RESTORED = "restored";
          + public static final String PENDING_OR_FAILED = "pending_or_failed";
          + public static final String DISCARDED = "discarded";
          + public static final String IN_PROGRESS = "in_progress";
          + public static final String COMPLETED = "completed";
          +
          + public static final String METRICS = "metrics";
          + public static final String WRITE_BYTES = "write-bytes";
          + public static final String READ_BYTES = "read-bytes";
          + public static final String WRITE_RECORDS = "write-records";
          + public static final String READ_RECORDS = "read-records";
          + public static final String TYPE = "type";
          + public static final String VALUE = "value";
          +
          + public static final String MIN = "min";
          + public static final String MAX = "max";
          + public static final String AVG = "avg";
          +
          + public static final String JOB_ACCUMULATORS = "job-accumulators";
          + public static final String USER_ACCUMULATORS = "user-accumulators";
          + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
          +
          + public static final String COUNTS = "counts";
          + public static final String EXTERNALIZATION = "externalization";
          + public static final String EXTERNAL_PATH = "external-path";
          + public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
          + public static final String HISTORY = "history";
          +
          + public static final String SUMMARY = "summary";
          + public static final String STATE_SIZE = "state_size";
          + public static final String ETE_DURATION = "end_to_end_duration";
          + public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
          + public static final String SAVEPOINT = "savepoint";
          + public static final String IS_SAVEPOINT = "is_savepoint";
          + public static final String CHECKPOINT = "checkpoint";
          + public static final String CHECKPOINT_DURATION = "checkpoint_duration";
          + public static final String SYNC = "sync";
          + public static final String ASYNC = "async";
          + public static final String ALIGNMENT = "alignment";
          + public static final String BUFFERED = "buffered";
          +
          + public static final String LATEST = "latest";
          +
          + public static final String FAILURE_TIMESTAMP = "failure_timestamp";
          + public static final String FAILURE_MESSAGE = "failure_message";
          + public static final String RESTORE_TIMESTAMP = "restore_timestamp";
          +
          + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
          + public static final String ACK_TIMESTAMP = "ack_timestamp";
          + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
          +
          + public static final String NUM_SUBTASKS = "num_subtasks";
          + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
          + public static final String INDEX = "index";
          + public static final String INTERVAL = "interval";
          + public static final String ENABLED = "enabled";
          + public static final String TIMEOUT = "timeout";
          + public static final String MIN_PAUSE = "min_pause";
          + public static final String MAX_CONCURRENT = "max_concurrent";
          +
          + private Keys()

          { + }

          + }
          +
          + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException

          { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + }

          +
          + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException

          { + gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum()); + gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum()); + gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage()); + }

          +
          + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
          + if (state.isTerminal()) {
          + if (ioMetrics != null)

          { // execAttempt is already finished, use final metrics stored in ExecutionGraph + summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal()); + summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote()); + summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut()); + summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn()); + summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut()); + }

          + } else { // execAttempt is still running, use MetricQueryService instead
          + if (fetcher != null) {
          + fetcher.update();
          + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex);
          + if (metrics != null)

          { + summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"))); + summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"))); + summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"))); + summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"))); + summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"))); + }

          + }
          + }
          + }
          +
          + public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException

          { + gen.writeObjectFieldStart(Keys.METRICS); + gen.writeNumberField(Keys.READ_BYTES, metrics.getNumBytesInLocal() + metrics.getNumBytesInRemote()); + gen.writeNumberField(Keys.WRITE_BYTES, metrics.getNumBytesOut()); + gen.writeNumberField(Keys.READ_RECORDS, metrics.getNumRecordsIn()); + gen.writeNumberField(Keys.WRITE_RECORDS, metrics.getNumRecordsOut()); + gen.writeEndObject(); + }

          +
          + public static class MutableIOMetrics extends IOMetrics {
          — End diff –

          Let's probably also rename this to make clear that we use it only for the web frontend? Having the other methods in the class doesn't fit well with the name any more :smile:

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103433357 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + @Public + public static final class Keys { + public static final String TASKMANAGERS = "taskmanagers"; + public static final String JOB_ID = "jid"; + public static final String ID = "id"; + public static final String NAME = "name"; + public static final String STATE = "state"; + public static final String IS_STOPPABLE = "isStoppable"; + public static final String PARALLELISM = "parallelism"; + public static final String PLAN = "plan"; + + public static final String START_TIME = "start-time"; + public static final String END_TIME = "end-time"; + public static final String DURATION = "duration"; + public static final String NOW = "now"; + public static final String LAST_MODIFICATION = "last-modification"; + + public static final String TIMESTAMP = "timestamp"; + public static final String TIMESTAMPS = "timestamps"; + public static final String STATUS_COUNTS = "status-counts"; + + public static final String REFRESH_INTERVAL = "refresh-interval"; + public static final String TIMEZONE_OFFSET = "timezone-offset"; + public static final String TIMEZONE_NAME = "timezone-name"; + public static final String FLINK_VERSION = "flink-version"; + public static final String FLINK_REVISION = "flink-revision"; + + public static final String EXECUTION_CONFIG = "execution-config"; + public static final String MODE = "mode"; + public static final String EXECUTION_MODE = "execution-mode"; + public static final String RESTART_STRATEGY = "restart-strategy"; + public static final String JOB_PARALLELISM = "job-parallelism"; + public static final String OBJECT_REUSE_MODE = "object-reuse-mode"; + public static final String USER_CONFIG = "user-config"; + + public static final String ROOT_EXCEPTION = "root-exception"; + public static final String ALL_EXCEPTIONS = "all-exceptions"; + public static final String EXCEPTION = "exception"; + public static final String TRUNCATED = "truncated"; + + public static final String HOST = "host"; + public static final String LOCATION = "location"; + + public static final String VERTICES = "vertices"; + public static final String TASKS = "tasks"; + public static final String TASK = "task"; + public static final String SUBTASKS = "subtasks"; + public static final String SUBTASK = "subtask"; + public static final String ATTEMPT = "attempt"; + + public static final String STATUS = "status"; + public static final String TOTAL = "total"; + public static final String PENDING = "pending"; + public static final String RUNNING = "running"; + public static final String FINISHED = "finished"; + public static final String CANCELING = "canceling"; + public static final String CANCELED = "canceled"; + public static final String FAILED = "failed"; + public static final String RESTORED = "restored"; + public static final String PENDING_OR_FAILED = "pending_or_failed"; + public static final String DISCARDED = "discarded"; + public static final String IN_PROGRESS = "in_progress"; + public static final String COMPLETED = "completed"; + + public static final String METRICS = "metrics"; + public static final String WRITE_BYTES = "write-bytes"; + public static final String READ_BYTES = "read-bytes"; + public static final String WRITE_RECORDS = "write-records"; + public static final String READ_RECORDS = "read-records"; + public static final String TYPE = "type"; + public static final String VALUE = "value"; + + public static final String MIN = "min"; + public static final String MAX = "max"; + public static final String AVG = "avg"; + + public static final String JOB_ACCUMULATORS = "job-accumulators"; + public static final String USER_ACCUMULATORS = "user-accumulators"; + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators"; + + public static final String COUNTS = "counts"; + public static final String EXTERNALIZATION = "externalization"; + public static final String EXTERNAL_PATH = "external-path"; + public static final String DELETE_ON_CANCEL = "delete_on_cancellation"; + public static final String HISTORY = "history"; + + public static final String SUMMARY = "summary"; + public static final String STATE_SIZE = "state_size"; + public static final String ETE_DURATION = "end_to_end_duration"; + public static final String ALIGNMENT_BUFFERED = "alignment_buffered"; + public static final String SAVEPOINT = "savepoint"; + public static final String IS_SAVEPOINT = "is_savepoint"; + public static final String CHECKPOINT = "checkpoint"; + public static final String CHECKPOINT_DURATION = "checkpoint_duration"; + public static final String SYNC = "sync"; + public static final String ASYNC = "async"; + public static final String ALIGNMENT = "alignment"; + public static final String BUFFERED = "buffered"; + + public static final String LATEST = "latest"; + + public static final String FAILURE_TIMESTAMP = "failure_timestamp"; + public static final String FAILURE_MESSAGE = "failure_message"; + public static final String RESTORE_TIMESTAMP = "restore_timestamp"; + + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp"; + public static final String ACK_TIMESTAMP = "ack_timestamp"; + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String NUM_SUBTASKS = "num_subtasks"; + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + public static final String INDEX = "index"; + public static final String INTERVAL = "interval"; + public static final String ENABLED = "enabled"; + public static final String TIMEOUT = "timeout"; + public static final String MIN_PAUSE = "min_pause"; + public static final String MAX_CONCURRENT = "max_concurrent"; + + private Keys() { + } + } + + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + } + + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { + gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum()); + gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum()); + gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage()); + } + + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) { + if (state.isTerminal()) { + if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph + summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal()); + summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote()); + summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut()); + summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn()); + summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut()); + } + } else { // execAttempt is still running, use MetricQueryService instead + if (fetcher != null) { + fetcher.update(); + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex); + if (metrics != null) { + summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"))); + summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"))); + summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"))); + summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"))); + summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"))); + } + } + } + } + + public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException { + gen.writeObjectFieldStart(Keys.METRICS); + gen.writeNumberField(Keys.READ_BYTES, metrics.getNumBytesInLocal() + metrics.getNumBytesInRemote()); + gen.writeNumberField(Keys.WRITE_BYTES, metrics.getNumBytesOut()); + gen.writeNumberField(Keys.READ_RECORDS, metrics.getNumRecordsIn()); + gen.writeNumberField(Keys.WRITE_RECORDS, metrics.getNumRecordsOut()); + gen.writeEndObject(); + } + + public static class MutableIOMetrics extends IOMetrics { — End diff – Let's probably also rename this to make clear that we use it only for the web frontend? Having the other methods in the class doesn't fit well with the name any more :smile:
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103433823

          — Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/BuilderUtils.java —
          @@ -0,0 +1,436 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import org.apache.flink.api.common.ArchivedExecutionConfig;
          +import org.apache.flink.api.common.ExecutionMode;
          +import org.apache.flink.api.common.JobID;
          +import org.apache.flink.metrics.Counter;
          +import org.apache.flink.metrics.MeterView;
          +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
          +import org.apache.flink.runtime.clusterframework.types.ResourceID;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecution;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
          +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
          +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.jobgraph.JobStatus;
          +import org.apache.flink.runtime.jobgraph.JobVertexID;
          +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
          +import org.apache.flink.runtime.util.EvictingBoundedList;
          +import org.apache.flink.util.Preconditions;
          +import org.apache.flink.util.SerializedValue;
          +
          +import java.net.InetAddress;
          +import java.net.UnknownHostException;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Random;
          +
          +public class BuilderUtils {
          + private static final Random RANDOM = new Random();
          +
          + private BuilderUtils()

          { + }

          +
          + public static ArchivedExecutionConfigBuilder createArchivedExecutionConfig()

          { + return new ArchivedExecutionConfigBuilder(); + }

          +
          + public static class ArchivedExecutionConfigBuilder {
          + private String executionMode;
          + private String restartStrategyDescription;
          + private int parallelism;
          + private boolean objectReuseEnabled;
          + private Map<String, String> globalJobParameters;
          +
          + private ArchivedExecutionConfigBuilder()

          { + }

          +
          + public ArchivedExecutionConfigBuilder setExecutionMode(String executionMode)

          { + this.executionMode = executionMode; + return this; + }

          +
          + public ArchivedExecutionConfigBuilder setRestartStrategyDescription(String restartStrategyDescription)

          { + this.restartStrategyDescription = restartStrategyDescription; + return this; + }

          +
          + public ArchivedExecutionConfigBuilder setParallelism(int parallelism)

          { + this.parallelism = parallelism; + return this; + }

          +
          + public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean objectReuseEnabled)

          { + this.objectReuseEnabled = objectReuseEnabled; + return this; + }

          +
          + public ArchivedExecutionConfigBuilder setGlobalJobParameters(Map<String, String> globalJobParameters)

          { + this.globalJobParameters = globalJobParameters; + return this; + }

          +
          + public ArchivedExecutionConfig finish() {
          — End diff –

          I think the canonical way to create the instance with a builder is to call this `build()` (applies to the other builders as well).

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103433823 — Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/BuilderUtils.java — @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecution; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.EvictingBoundedList; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +public class BuilderUtils { + private static final Random RANDOM = new Random(); + + private BuilderUtils() { + } + + public static ArchivedExecutionConfigBuilder createArchivedExecutionConfig() { + return new ArchivedExecutionConfigBuilder(); + } + + public static class ArchivedExecutionConfigBuilder { + private String executionMode; + private String restartStrategyDescription; + private int parallelism; + private boolean objectReuseEnabled; + private Map<String, String> globalJobParameters; + + private ArchivedExecutionConfigBuilder() { + } + + public ArchivedExecutionConfigBuilder setExecutionMode(String executionMode) { + this.executionMode = executionMode; + return this; + } + + public ArchivedExecutionConfigBuilder setRestartStrategyDescription(String restartStrategyDescription) { + this.restartStrategyDescription = restartStrategyDescription; + return this; + } + + public ArchivedExecutionConfigBuilder setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean objectReuseEnabled) { + this.objectReuseEnabled = objectReuseEnabled; + return this; + } + + public ArchivedExecutionConfigBuilder setGlobalJobParameters(Map<String, String> globalJobParameters) { + this.globalJobParameters = globalJobParameters; + return this; + } + + public ArchivedExecutionConfig finish() { — End diff – I think the canonical way to create the instance with a builder is to call this `build()` (applies to the other builders as well).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103453996

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java —
          @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {

          @Override
          public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception

          { + return createJobExceptionsJson(graph); + }

          +
          + public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
          StringWriter writer = new StringWriter();
          JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

          gen.writeStartObject();

          // most important is the root failure cause
          String rootException = graph.getFailureCauseAsString();

          • if (rootException != null) {
          • gen.writeStringField("root-exception", rootException);
            + if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
              • End diff –

          we could have both, but ExecutionGraph#archive() guarantees a non-null value.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103453996 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java — @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) { @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { + return createJobExceptionsJson(graph); + } + + public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); gen.writeStartObject(); // most important is the root failure cause String rootException = graph.getFailureCauseAsString(); if (rootException != null) { gen.writeStringField("root-exception", rootException); + if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { End diff – we could have both, but ExecutionGraph#archive() guarantees a non-null value.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103454740

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          — End diff –

          remnant of the time when all JSON was generated here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103454740 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; — End diff – remnant of the time when all JSON was generated here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103456991

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          +
          + @Public
          + public static final class Keys {
          + public static final String TASKMANAGERS = "taskmanagers";
          + public static final String JOB_ID = "jid";
          + public static final String ID = "id";
          + public static final String NAME = "name";
          + public static final String STATE = "state";
          + public static final String IS_STOPPABLE = "isStoppable";
          + public static final String PARALLELISM = "parallelism";
          + public static final String PLAN = "plan";
          +
          + public static final String START_TIME = "start-time";
          + public static final String END_TIME = "end-time";
          + public static final String DURATION = "duration";
          + public static final String NOW = "now";
          + public static final String LAST_MODIFICATION = "last-modification";
          +
          + public static final String TIMESTAMP = "timestamp";
          + public static final String TIMESTAMPS = "timestamps";
          + public static final String STATUS_COUNTS = "status-counts";
          +
          + public static final String REFRESH_INTERVAL = "refresh-interval";
          + public static final String TIMEZONE_OFFSET = "timezone-offset";
          + public static final String TIMEZONE_NAME = "timezone-name";
          + public static final String FLINK_VERSION = "flink-version";
          + public static final String FLINK_REVISION = "flink-revision";
          +
          + public static final String EXECUTION_CONFIG = "execution-config";
          + public static final String MODE = "mode";
          + public static final String EXECUTION_MODE = "execution-mode";
          + public static final String RESTART_STRATEGY = "restart-strategy";
          + public static final String JOB_PARALLELISM = "job-parallelism";
          + public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
          + public static final String USER_CONFIG = "user-config";
          +
          + public static final String ROOT_EXCEPTION = "root-exception";
          + public static final String ALL_EXCEPTIONS = "all-exceptions";
          + public static final String EXCEPTION = "exception";
          + public static final String TRUNCATED = "truncated";
          +
          + public static final String HOST = "host";
          + public static final String LOCATION = "location";
          +
          + public static final String VERTICES = "vertices";
          + public static final String TASKS = "tasks";
          + public static final String TASK = "task";
          + public static final String SUBTASKS = "subtasks";
          + public static final String SUBTASK = "subtask";
          + public static final String ATTEMPT = "attempt";
          +
          + public static final String STATUS = "status";
          + public static final String TOTAL = "total";
          + public static final String PENDING = "pending";
          + public static final String RUNNING = "running";
          + public static final String FINISHED = "finished";
          + public static final String CANCELING = "canceling";
          + public static final String CANCELED = "canceled";
          + public static final String FAILED = "failed";
          + public static final String RESTORED = "restored";
          + public static final String PENDING_OR_FAILED = "pending_or_failed";
          + public static final String DISCARDED = "discarded";
          + public static final String IN_PROGRESS = "in_progress";
          + public static final String COMPLETED = "completed";
          +
          + public static final String METRICS = "metrics";
          + public static final String WRITE_BYTES = "write-bytes";
          + public static final String READ_BYTES = "read-bytes";
          + public static final String WRITE_RECORDS = "write-records";
          + public static final String READ_RECORDS = "read-records";
          + public static final String TYPE = "type";
          + public static final String VALUE = "value";
          +
          + public static final String MIN = "min";
          + public static final String MAX = "max";
          + public static final String AVG = "avg";
          +
          + public static final String JOB_ACCUMULATORS = "job-accumulators";
          + public static final String USER_ACCUMULATORS = "user-accumulators";
          + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
          +
          + public static final String COUNTS = "counts";
          + public static final String EXTERNALIZATION = "externalization";
          + public static final String EXTERNAL_PATH = "external-path";
          + public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
          + public static final String HISTORY = "history";
          +
          + public static final String SUMMARY = "summary";
          + public static final String STATE_SIZE = "state_size";
          + public static final String ETE_DURATION = "end_to_end_duration";
          + public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
          + public static final String SAVEPOINT = "savepoint";
          + public static final String IS_SAVEPOINT = "is_savepoint";
          + public static final String CHECKPOINT = "checkpoint";
          + public static final String CHECKPOINT_DURATION = "checkpoint_duration";
          + public static final String SYNC = "sync";
          + public static final String ASYNC = "async";
          + public static final String ALIGNMENT = "alignment";
          + public static final String BUFFERED = "buffered";
          +
          + public static final String LATEST = "latest";
          +
          + public static final String FAILURE_TIMESTAMP = "failure_timestamp";
          + public static final String FAILURE_MESSAGE = "failure_message";
          + public static final String RESTORE_TIMESTAMP = "restore_timestamp";
          +
          + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
          + public static final String ACK_TIMESTAMP = "ack_timestamp";
          + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
          +
          + public static final String NUM_SUBTASKS = "num_subtasks";
          + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
          + public static final String INDEX = "index";
          + public static final String INTERVAL = "interval";
          + public static final String ENABLED = "enabled";
          + public static final String TIMEOUT = "timeout";
          + public static final String MIN_PAUSE = "min_pause";
          + public static final String MAX_CONCURRENT = "max_concurrent";
          +
          + private Keys()

          { + }

          + }
          +
          + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException

          { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + }

          +
          + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException

          { + gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum()); + gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum()); + gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage()); + }

          +
          + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
          + if (state.isTerminal()) {
          + if (ioMetrics != null)

          { // execAttempt is already finished, use final metrics stored in ExecutionGraph + summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal()); + summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote()); + summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut()); + summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn()); + summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut()); + }

          + } else { // execAttempt is still running, use MetricQueryService instead
          + if (fetcher != null) {
          + fetcher.update();
          + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex);
          + if (metrics != null)

          { + summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"))); + summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"))); + summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"))); + summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"))); + summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"))); + }

          + }
          + }
          + }
          +
          + public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException

          { + gen.writeObjectFieldStart(Keys.METRICS); + gen.writeNumberField(Keys.READ_BYTES, metrics.getNumBytesInLocal() + metrics.getNumBytesInRemote()); + gen.writeNumberField(Keys.WRITE_BYTES, metrics.getNumBytesOut()); + gen.writeNumberField(Keys.READ_RECORDS, metrics.getNumRecordsIn()); + gen.writeNumberField(Keys.WRITE_RECORDS, metrics.getNumRecordsOut()); + gen.writeEndObject(); + }

          +
          + public static class MutableIOMetrics extends IOMetrics {
          — End diff –

          I think the name is fine; having it in runtime-web already implies that it is only used by the web frontend.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103456991 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + @Public + public static final class Keys { + public static final String TASKMANAGERS = "taskmanagers"; + public static final String JOB_ID = "jid"; + public static final String ID = "id"; + public static final String NAME = "name"; + public static final String STATE = "state"; + public static final String IS_STOPPABLE = "isStoppable"; + public static final String PARALLELISM = "parallelism"; + public static final String PLAN = "plan"; + + public static final String START_TIME = "start-time"; + public static final String END_TIME = "end-time"; + public static final String DURATION = "duration"; + public static final String NOW = "now"; + public static final String LAST_MODIFICATION = "last-modification"; + + public static final String TIMESTAMP = "timestamp"; + public static final String TIMESTAMPS = "timestamps"; + public static final String STATUS_COUNTS = "status-counts"; + + public static final String REFRESH_INTERVAL = "refresh-interval"; + public static final String TIMEZONE_OFFSET = "timezone-offset"; + public static final String TIMEZONE_NAME = "timezone-name"; + public static final String FLINK_VERSION = "flink-version"; + public static final String FLINK_REVISION = "flink-revision"; + + public static final String EXECUTION_CONFIG = "execution-config"; + public static final String MODE = "mode"; + public static final String EXECUTION_MODE = "execution-mode"; + public static final String RESTART_STRATEGY = "restart-strategy"; + public static final String JOB_PARALLELISM = "job-parallelism"; + public static final String OBJECT_REUSE_MODE = "object-reuse-mode"; + public static final String USER_CONFIG = "user-config"; + + public static final String ROOT_EXCEPTION = "root-exception"; + public static final String ALL_EXCEPTIONS = "all-exceptions"; + public static final String EXCEPTION = "exception"; + public static final String TRUNCATED = "truncated"; + + public static final String HOST = "host"; + public static final String LOCATION = "location"; + + public static final String VERTICES = "vertices"; + public static final String TASKS = "tasks"; + public static final String TASK = "task"; + public static final String SUBTASKS = "subtasks"; + public static final String SUBTASK = "subtask"; + public static final String ATTEMPT = "attempt"; + + public static final String STATUS = "status"; + public static final String TOTAL = "total"; + public static final String PENDING = "pending"; + public static final String RUNNING = "running"; + public static final String FINISHED = "finished"; + public static final String CANCELING = "canceling"; + public static final String CANCELED = "canceled"; + public static final String FAILED = "failed"; + public static final String RESTORED = "restored"; + public static final String PENDING_OR_FAILED = "pending_or_failed"; + public static final String DISCARDED = "discarded"; + public static final String IN_PROGRESS = "in_progress"; + public static final String COMPLETED = "completed"; + + public static final String METRICS = "metrics"; + public static final String WRITE_BYTES = "write-bytes"; + public static final String READ_BYTES = "read-bytes"; + public static final String WRITE_RECORDS = "write-records"; + public static final String READ_RECORDS = "read-records"; + public static final String TYPE = "type"; + public static final String VALUE = "value"; + + public static final String MIN = "min"; + public static final String MAX = "max"; + public static final String AVG = "avg"; + + public static final String JOB_ACCUMULATORS = "job-accumulators"; + public static final String USER_ACCUMULATORS = "user-accumulators"; + public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators"; + + public static final String COUNTS = "counts"; + public static final String EXTERNALIZATION = "externalization"; + public static final String EXTERNAL_PATH = "external-path"; + public static final String DELETE_ON_CANCEL = "delete_on_cancellation"; + public static final String HISTORY = "history"; + + public static final String SUMMARY = "summary"; + public static final String STATE_SIZE = "state_size"; + public static final String ETE_DURATION = "end_to_end_duration"; + public static final String ALIGNMENT_BUFFERED = "alignment_buffered"; + public static final String SAVEPOINT = "savepoint"; + public static final String IS_SAVEPOINT = "is_savepoint"; + public static final String CHECKPOINT = "checkpoint"; + public static final String CHECKPOINT_DURATION = "checkpoint_duration"; + public static final String SYNC = "sync"; + public static final String ASYNC = "async"; + public static final String ALIGNMENT = "alignment"; + public static final String BUFFERED = "buffered"; + + public static final String LATEST = "latest"; + + public static final String FAILURE_TIMESTAMP = "failure_timestamp"; + public static final String FAILURE_MESSAGE = "failure_message"; + public static final String RESTORE_TIMESTAMP = "restore_timestamp"; + + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp"; + public static final String ACK_TIMESTAMP = "ack_timestamp"; + public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String NUM_SUBTASKS = "num_subtasks"; + public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + public static final String INDEX = "index"; + public static final String INTERVAL = "interval"; + public static final String ENABLED = "enabled"; + public static final String TIMEOUT = "timeout"; + public static final String MIN_PAUSE = "min_pause"; + public static final String MAX_CONCURRENT = "max_concurrent"; + + private Keys() { + } + } + + public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + } + + public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { + gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum()); + gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum()); + gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage()); + } + + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) { + if (state.isTerminal()) { + if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph + summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal()); + summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote()); + summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut()); + summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn()); + summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut()); + } + } else { // execAttempt is still running, use MetricQueryService instead + if (fetcher != null) { + fetcher.update(); + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex); + if (metrics != null) { + summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"))); + summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"))); + summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"))); + summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"))); + summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"))); + } + } + } + } + + public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException { + gen.writeObjectFieldStart(Keys.METRICS); + gen.writeNumberField(Keys.READ_BYTES, metrics.getNumBytesInLocal() + metrics.getNumBytesInRemote()); + gen.writeNumberField(Keys.WRITE_BYTES, metrics.getNumBytesOut()); + gen.writeNumberField(Keys.READ_RECORDS, metrics.getNumRecordsIn()); + gen.writeNumberField(Keys.WRITE_RECORDS, metrics.getNumRecordsOut()); + gen.writeEndObject(); + } + + public static class MutableIOMetrics extends IOMetrics { — End diff – I think the name is fine; having it in runtime-web already implies that it is only used by the web frontend.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103461677

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java —
          @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
          }
          }

          + public ArchivedExecutionConfig(
          — End diff –

          To change this i would have to touch the runtime Execution[[Job]Vertex] classes which i would rather not do in this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103461677 — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java — @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) { } } + public ArchivedExecutionConfig( — End diff – To change this i would have to touch the runtime Execution[ [Job] Vertex] classes which i would rather not do in this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103463182

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java —
          @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
          }
          }

          + public ArchivedExecutionConfig(
          + String executionMode,
          + String restartStrategyDescription,
          + int parallelism,
          + boolean objectReuseEnabled,
          + Map<String, String> globalJobParameters) {
          + this.executionMode = executionMode;
          — End diff –

          That would lead to maintainability issues. The ArchivedExecutionConfigs very purpose is to truthfully represent some state X. In order to do so it must only impose restrictions that the original object imposed, i.e the ExecutionConfig, otherwise we risk that an ExecutionConfig can't be archived which doesn't make sense. This in turn would require that any change to the conditions that the EC imposes must be propagated to the ArchviedEC, which simply won't happen.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103463182 — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java — @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) { } } + public ArchivedExecutionConfig( + String executionMode, + String restartStrategyDescription, + int parallelism, + boolean objectReuseEnabled, + Map<String, String> globalJobParameters) { + this.executionMode = executionMode; — End diff – That would lead to maintainability issues. The ArchivedExecutionConfigs very purpose is to truthfully represent some state X. In order to do so it must only impose restrictions that the original object imposed, i.e the ExecutionConfig, otherwise we risk that an ExecutionConfig can't be archived which doesn't make sense. This in turn would require that any change to the conditions that the EC imposes must be propagated to the ArchviedEC, which simply won't happen.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103464724

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          +
          + @Public
          + public static final class Keys {
          — End diff –

          Then let's discuss it now before i go through every handler&test again.

          The REST API is loaded with inconsistencies in regards to how things are structured and named. I believe this is in part due to not having a central place where keywords can be stored.

          If every handler rolls their own and lives in a vacuum then an inconsistent API is inevitable. However, with this class, a handler that doesn't use it sticks out compared to other handlers. My hope is this is easier to notice during reviews, leading the contributor to the JsonUtils class and possible reusing keywords instead of adding another set.

          I do agree that readability suffers a bit when comparing ```JsonUtils.Keys.STATUS``` to ```"status"```, but i frankly i think we can refactor the JsonUtils/usage to allow ```STATUS```. And with this i don't see an issue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103464724 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + @Public + public static final class Keys { — End diff – Then let's discuss it now before i go through every handler&test again. The REST API is loaded with inconsistencies in regards to how things are structured and named. I believe this is in part due to not having a central place where keywords can be stored. If every handler rolls their own and lives in a vacuum then an inconsistent API is inevitable. However, with this class, a handler that doesn't use it sticks out compared to other handlers. My hope is this is easier to notice during reviews, leading the contributor to the JsonUtils class and possible reusing keywords instead of adding another set. I do agree that readability suffers a bit when comparing ```JsonUtils.Keys.STATUS``` to ```"status"```, but i frankly i think we can refactor the JsonUtils/usage to allow ```STATUS```. And with this i don't see an issue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3365

          @uce Thank your for the review. I've addressed all issues that you raised, either with further comments or changes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3365 @uce Thank your for the review. I've addressed all issues that you raised, either with further comments or changes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103473295

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java —
          @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
          }
          }

          + public ArchivedExecutionConfig(
          + String executionMode,
          + String restartStrategyDescription,
          + int parallelism,
          + boolean objectReuseEnabled,
          + Map<String, String> globalJobParameters) {
          + this.executionMode = executionMode;
          — End diff –

          But isn't the other constructor already exactly trying to ensure what you say "simply won't happen"?

          ```java
          if (ec.getGlobalJobParameters() != null
          && ec.getGlobalJobParameters().toMap() != null)

          { globalJobParameters = ec.getGlobalJobParameters().toMap(); }

          else

          { globalJobParameters = Collections.emptyMap(); }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103473295 — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java — @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) { } } + public ArchivedExecutionConfig( + String executionMode, + String restartStrategyDescription, + int parallelism, + boolean objectReuseEnabled, + Map<String, String> globalJobParameters) { + this.executionMode = executionMode; — End diff – But isn't the other constructor already exactly trying to ensure what you say "simply won't happen"? ```java if (ec.getGlobalJobParameters() != null && ec.getGlobalJobParameters().toMap() != null) { globalJobParameters = ec.getGlobalJobParameters().toMap(); } else { globalJobParameters = Collections.emptyMap(); } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103473529

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java —
          @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
          }
          }

          + public ArchivedExecutionConfig(
          — End diff –

          What do you mean? The other constructor is only used once if IntelliJ is to be trusted in `ExecutionConfig.archive`. But I'm fine with leaving it as is.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103473529 — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java — @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) { } } + public ArchivedExecutionConfig( — End diff – What do you mean? The other constructor is only used once if IntelliJ is to be trusted in `ExecutionConfig.archive`. But I'm fine with leaving it as is.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103473952

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java —
          @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {

          @Override
          public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception

          { + return createJobExceptionsJson(graph); + }

          +
          + public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
          StringWriter writer = new StringWriter();
          JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

          gen.writeStartObject();

          // most important is the root failure cause
          String rootException = graph.getFailureCauseAsString();

          • if (rootException != null) {
          • gen.writeStringField("root-exception", rootException);
            + if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
              • End diff –

          Then maybe add a nonNull check to guard against unintended future changes of behaviour in ExecutionGraph?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103473952 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java — @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) { @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { + return createJobExceptionsJson(graph); + } + + public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); gen.writeStartObject(); // most important is the root failure cause String rootException = graph.getFailureCauseAsString(); if (rootException != null) { gen.writeStringField("root-exception", rootException); + if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { End diff – Then maybe add a nonNull check to guard against unintended future changes of behaviour in ExecutionGraph?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103475734

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java —
          @@ -0,0 +1,228 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.annotation.Public;
          +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
          +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class JsonUtils {
          + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
          +
          + @Public
          + public static final class Keys {
          — End diff –

          I think the way to do this is completely getting rid of the manual JSON creation, but I don't see that we are going to do this any time soon. A majority of the keys is either only used in a single handler (+ test) or simply identical to the String that it refers to. I understand that is sucks to go through every handler and test again, but this is an unrelated change to begin with. I agree that there can be a problem with inconsistencies etc., but I don't see how this solves it (except for what you said with hoping to make it easier to catch).

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103475734 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java — @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.annotation.Public; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class JsonUtils { + public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + + @Public + public static final class Keys { — End diff – I think the way to do this is completely getting rid of the manual JSON creation, but I don't see that we are going to do this any time soon. A majority of the keys is either only used in a single handler (+ test) or simply identical to the String that it refers to. I understand that is sucks to go through every handler and test again, but this is an unrelated change to begin with. I agree that there can be a problem with inconsistencies etc., but I don't see how this solves it (except for what you said with hoping to make it easier to catch).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103477647

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java —
          @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
          }
          }

          + public ArchivedExecutionConfig(
          — End diff –

          Sorry, i was mixing things up. I thought i would have to change the constructors of other Archived* classes as well to be consistent, but they already had a constructor like this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103477647 — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java — @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) { } } + public ArchivedExecutionConfig( — End diff – Sorry, i was mixing things up. I thought i would have to change the constructors of other Archived* classes as well to be consistent, but they already had a constructor like this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103477833

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java —
          @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
          }
          }

          + public ArchivedExecutionConfig(
          + String executionMode,
          + String restartStrategyDescription,
          + int parallelism,
          + boolean objectReuseEnabled,
          + Map<String, String> globalJobParameters) {
          + this.executionMode = executionMode;
          — End diff –

          Yes but they don't throw exceptions do they?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103477833 — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java — @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) { } } + public ArchivedExecutionConfig( + String executionMode, + String restartStrategyDescription, + int parallelism, + boolean objectReuseEnabled, + Map<String, String> globalJobParameters) { + this.executionMode = executionMode; — End diff – Yes but they don't throw exceptions do they?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103477920

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java —
          @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {

          @Override
          public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception

          { + return createJobExceptionsJson(graph); + }

          +
          + public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
          StringWriter writer = new StringWriter();
          JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

          gen.writeStartObject();

          // most important is the root failure cause
          String rootException = graph.getFailureCauseAsString();

          • if (rootException != null) {
          • gen.writeStringField("root-exception", rootException);
            + if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
              • End diff –

          that's reasonable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103477920 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java — @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) { @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { + return createJobExceptionsJson(graph); + } + + public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); gen.writeStartObject(); // most important is the root failure cause String rootException = graph.getFailureCauseAsString(); if (rootException != null) { gen.writeStringField("root-exception", rootException); + if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { End diff – that's reasonable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103479406

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java —
          @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
          }
          }

          + public ArchivedExecutionConfig(
          + String executionMode,
          + String restartStrategyDescription,
          + int parallelism,
          + boolean objectReuseEnabled,
          + Map<String, String> globalJobParameters) {
          + this.executionMode = executionMode;
          — End diff –

          The checks will only throw exceptions if the current behaviour changes, most probably because of an unintended change. Anyways, feel free to leave it as is.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103479406 — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java — @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) { } } + public ArchivedExecutionConfig( + String executionMode, + String restartStrategyDescription, + int parallelism, + boolean objectReuseEnabled, + Map<String, String> globalJobParameters) { + this.executionMode = executionMode; — End diff – The checks will only throw exceptions if the current behaviour changes, most probably because of an unintended change. Anyways, feel free to leave it as is.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103502713

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java —
          @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
          }
          }

          + public ArchivedExecutionConfig(
          — End diff –

          I wasn't mixing things up. For example, in ```Execution#archive()``` we create an ```ArchivedExecution``` through the use of the ```ArchivedExecution(Execution)``` constructor. This pattern is identical for all Archived* classes except the EG (because we need some internal components and dont want to overload the class even more).

          Changing the constructor implies changing the ExecutionConfig, which imo is out of scope of this PR. Also, to be consistent, we would also have to touch other ```Archivable``` classes as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103502713 — Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java — @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) { } } + public ArchivedExecutionConfig( — End diff – I wasn't mixing things up. For example, in ```Execution#archive()``` we create an ```ArchivedExecution``` through the use of the ```ArchivedExecution(Execution)``` constructor. This pattern is identical for all Archived* classes except the EG (because we need some internal components and dont want to overload the class even more). Changing the constructor implies changing the ExecutionConfig, which imo is out of scope of this PR. Also, to be consistent, we would also have to touch other ```Archivable``` classes as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3365

          @uce I've removed the ```JsonUtils``` class.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3365 @uce I've removed the ```JsonUtils``` class.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103710173

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java —
          @@ -0,0 +1,90 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class MutableIOMetrics extends IOMetrics {
          +
          + private static final long serialVersionUID = -5460777634971381737L;
          +
          + public MutableIOMetrics()

          { + super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D); + }

          +
          + public void addNumBytesInLocal(long toAdd)

          { + this.numBytesInLocal += toAdd; + }

          +
          + public void addNumBytesInRemote(long toAdd)

          { + this.numBytesInRemote += toAdd; + }

          +
          + public void addNumBytesOut(long toAdd)

          { + this.numBytesOut += toAdd; + }

          +
          + public void addNumRecordsIn(long toAdd)

          { + this.numRecordsIn += toAdd; + }

          +
          + public void addNumRecordsOut(long toAdd)

          { + this.numRecordsOut += toAdd; + }

          +
          + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
          — End diff –

          Quick question: could we make this an instance method instead of static?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103710173 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java — @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class MutableIOMetrics extends IOMetrics { + + private static final long serialVersionUID = -5460777634971381737L; + + public MutableIOMetrics() { + super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D); + } + + public void addNumBytesInLocal(long toAdd) { + this.numBytesInLocal += toAdd; + } + + public void addNumBytesInRemote(long toAdd) { + this.numBytesInRemote += toAdd; + } + + public void addNumBytesOut(long toAdd) { + this.numBytesOut += toAdd; + } + + public void addNumRecordsIn(long toAdd) { + this.numRecordsIn += toAdd; + } + + public void addNumRecordsOut(long toAdd) { + this.numRecordsOut += toAdd; + } + + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) { — End diff – Quick question: could we make this an instance method instead of static?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103710293

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java —
          @@ -0,0 +1,90 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class MutableIOMetrics extends IOMetrics {
          — End diff –

          Could you write a high level comment why we have this as a utility for the web frontend?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103710293 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java — @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class MutableIOMetrics extends IOMetrics { — End diff – Could you write a high level comment why we have this as a utility for the web frontend?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3365

          Thanks for addressing the comments. 👍

          I had two last trivial inline questions. Could you go ahead an merge this and decide whether it makes sense to address the questions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3365 Thanks for addressing the comments. 👍 I had two last trivial inline questions. Could you go ahead an merge this and decide whether it makes sense to address the questions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103712282

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java —
          @@ -0,0 +1,90 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class MutableIOMetrics extends IOMetrics {
          +
          + private static final long serialVersionUID = -5460777634971381737L;
          +
          + public MutableIOMetrics()

          { + super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D); + }

          +
          + public void addNumBytesInLocal(long toAdd)

          { + this.numBytesInLocal += toAdd; + }

          +
          + public void addNumBytesInRemote(long toAdd)

          { + this.numBytesInRemote += toAdd; + }

          +
          + public void addNumBytesOut(long toAdd)

          { + this.numBytesOut += toAdd; + }

          +
          + public void addNumRecordsIn(long toAdd)

          { + this.numRecordsIn += toAdd; + }

          +
          + public void addNumRecordsOut(long toAdd)

          { + this.numRecordsOut += toAdd; + }

          +
          + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
          — End diff –

          yup.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103712282 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java — @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class MutableIOMetrics extends IOMetrics { + + private static final long serialVersionUID = -5460777634971381737L; + + public MutableIOMetrics() { + super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D); + } + + public void addNumBytesInLocal(long toAdd) { + this.numBytesInLocal += toAdd; + } + + public void addNumBytesInRemote(long toAdd) { + this.numBytesInRemote += toAdd; + } + + public void addNumBytesOut(long toAdd) { + this.numBytesOut += toAdd; + } + + public void addNumRecordsIn(long toAdd) { + this.numRecordsIn += toAdd; + } + + public void addNumRecordsOut(long toAdd) { + this.numRecordsOut += toAdd; + } + + public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) { — End diff – yup.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3365#discussion_r103712305

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java —
          @@ -0,0 +1,90 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.utils;
          +
          +import com.fasterxml.jackson.core.JsonGenerator;
          +import org.apache.flink.runtime.execution.ExecutionState;
          +import org.apache.flink.runtime.executiongraph.IOMetrics;
          +import org.apache.flink.runtime.metrics.MetricNames;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
          +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
          +
          +import javax.annotation.Nullable;
          +import java.io.IOException;
          +
          +public class MutableIOMetrics extends IOMetrics {
          — End diff –

          ok.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3365#discussion_r103712305 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java — @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +public class MutableIOMetrics extends IOMetrics { — End diff – ok.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3365

          @uce I will address your final comments while merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3365 @uce I will address your final comments while merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3365

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3365
          Hide
          Zentol Chesnay Schepler added a comment -

          Implemented in a552d6746636e26c634c86d6a11732ea9d2f239e.

          Show
          Zentol Chesnay Schepler added a comment - Implemented in a552d6746636e26c634c86d6a11732ea9d2f239e.

            People

            • Assignee:
              Zentol Chesnay Schepler
              Reporter:
              Zentol Chesnay Schepler
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development