Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5090

Expose optionally detailed metrics about network queue lengths

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.3
    • Fix Version/s: 1.3.0
    • Component/s: Metrics, Network
    • Labels:
      None

      Description

      For debugging purposes, it is important to have access to more detailed metrics about the length of network input and output queues.

        Issue Links

          Activity

          Hide
          xiaogang.shi Xiaogang Shi added a comment -

          I suggest the metrics to be channel-wise. With these metrics, we will easily find those hotspot channels.

          What do you think?

          Show
          xiaogang.shi Xiaogang Shi added a comment - I suggest the metrics to be channel-wise. With these metrics, we will easily find those hotspot channels. What do you think?
          Hide
          StephanEwen Stephan Ewen added a comment -

          I have added min/max/avg across the channels for now. Having all channels creates a flood of metrics.
          Do you think min/max/avg is okay, or are all channels needed?

          Show
          StephanEwen Stephan Ewen added a comment - I have added min/max/avg across the channels for now. Having all channels creates a flood of metrics. Do you think min/max/avg is okay, or are all channels needed?
          Hide
          xiaogang.sxg Xiaogang Shi added a comment -

          In flink, the performance bottlenecks are usually caused by
          1. the mismatched parallelism of the producer and the consumer operators.
          2. the imbalanced load across the different tasks of the same operator

          The metrics of all channels help a lot to figure out the two problems.
          But the solution to the second problem usually needs modification to the application logic.

          The gate-wise metrics are sufficient to identify the first problem.
          I think it requires few additional overheads (due to two input operators).

          Show
          xiaogang.sxg Xiaogang Shi added a comment - In flink, the performance bottlenecks are usually caused by 1. the mismatched parallelism of the producer and the consumer operators. 2. the imbalanced load across the different tasks of the same operator The metrics of all channels help a lot to figure out the two problems. But the solution to the second problem usually needs modification to the application logic. The gate-wise metrics are sufficient to identify the first problem. I think it requires few additional overheads (due to two input operators).
          Hide
          greghogan Greg Hogan added a comment -

          Stephan Ewen is your code ready for a PR?

          Show
          greghogan Greg Hogan added a comment - Stephan Ewen is your code ready for a PR?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user NicoK opened a pull request:

          https://github.com/apache/flink/pull/3348

          FLINK-5090 [network] Add metrics for details about inbound/outbound network queues

          These metrics are optimised go go through the channels only once in order to
          gather all metrics, i.e. min, max, avg and sum. Whenever a request to either
          of those is made, all metrics are refreshed and cached. Requests to the other
          metrics will be served from the cache. However, each value will be served only
          once from the cache and a second call to retrieve the minimum, for example,
          will refresh the cache for all values.

          This setup may at first be a bit strange but ensures that the statistics belong
          together logically and originate from a common point in time. This is not
          necessarily the point in time the metric was requested though.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/NicoK/flink flink-5090

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3348.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3348


          commit 59e64c4187c8533e0d802bf415e289624db99f06
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2016-11-17T18:36:56Z

          FLINK-5090 [network] Add metrics for details about inbound/outbound network queues

          These metrics are optimised go go through the channels only once in order to
          gather all metrics, i.e. min, max, avg and sum. Whenever a request to either
          of those is made, all metrics are refreshed and cached. Requests to the other
          metrics will be served from the cache. However, each value will be served only
          once from the cache and a second call to retrieve the minimum, for example,
          will refresh the cache for all values.

          This setup may at first be a bit strange but ensures that the statistics belong
          together logically and originate from a common point in time. This is not
          necessarily the point in time the metric was requested though.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3348 FLINK-5090 [network] Add metrics for details about inbound/outbound network queues These metrics are optimised go go through the channels only once in order to gather all metrics, i.e. min, max, avg and sum. Whenever a request to either of those is made, all metrics are refreshed and cached. Requests to the other metrics will be served from the cache. However, each value will be served only once from the cache and a second call to retrieve the minimum, for example, will refresh the cache for all values. This setup may at first be a bit strange but ensures that the statistics belong together logically and originate from a common point in time. This is not necessarily the point in time the metric was requested though. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-5090 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3348.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3348 commit 59e64c4187c8533e0d802bf415e289624db99f06 Author: Stephan Ewen <sewen@apache.org> Date: 2016-11-17T18:36:56Z FLINK-5090 [network] Add metrics for details about inbound/outbound network queues These metrics are optimised go go through the channels only once in order to gather all metrics, i.e. min, max, avg and sum. Whenever a request to either of those is made, all metrics are refreshed and cached. Requests to the other metrics will be served from the cache. However, each value will be served only once from the cache and a second call to retrieve the minimum, for example, will refresh the cache for all values. This setup may at first be a bit strange but ensures that the statistics belong together logically and originate from a common point in time. This is not necessarily the point in time the metric was requested though.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r101993298

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java —
          @@ -227,6 +227,12 @@
          public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";

          /**
          + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths
          + */
          + @PublicEvolving
          + public static final String NETWORK_DETAILED_METRICS_KEY = "taskmanager.net.detailed-metrics";
          — End diff –

          Please add this as a ```ConfigOption```.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r101993298 — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java — @@ -227,6 +227,12 @@ public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers"; /** + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths + */ + @PublicEvolving + public static final String NETWORK_DETAILED_METRICS_KEY = "taskmanager.net.detailed-metrics"; — End diff – Please add this as a ```ConfigOption```.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r101991021

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java —
          @@ -227,6 +227,12 @@
          public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";

          /**
          + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths
          + */
          + @PublicEvolving
          + public static final String NETWORK_DETAILED_METRICS_KEY = "taskmanager.net.detailed-metrics";
          — End diff –

          based on the config key just above this one "taskmanager.network..." would be more appropriate.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r101991021 — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java — @@ -227,6 +227,12 @@ public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers"; /** + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths + */ + @PublicEvolving + public static final String NETWORK_DETAILED_METRICS_KEY = "taskmanager.net.detailed-metrics"; — End diff – based on the config key just above this one "taskmanager.network..." would be more appropriate.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r101993111

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java —
          @@ -0,0 +1,167 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.io.network.partition.consumer;
          +
          +import org.apache.flink.metrics.Gauge;
          +import org.apache.flink.metrics.MetricGroup;
          +
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +public class InputGateMetrics {
          — End diff –

          It would be cool if we could move these into the ```MetricUtils``` class, to consolidate things a bit. Or move it to ```o.a.f.runtime.metrics.util```.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r101993111 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java — @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class InputGateMetrics { — End diff – It would be cool if we could move these into the ```MetricUtils``` class, to consolidate things a bit. Or move it to ```o.a.f.runtime.metrics.util```.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r101992784

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -385,6 +388,20 @@ public Task(
          ++counter;
          }

          + // register detailed network metrics, if configured
          + if (tmConfig.getBoolean(ConfigConstants.NETWORK_DETAILED_METRICS_KEY, false)) {
          + // output metrics
          — End diff –

          Regarding the MetricGroup struccture/naming i would suggest the following:
          ```
          MetricGroup networkGroup = metricGroup.addGroup("Network"); // this is for consistency purposes
          MetricGroup inputGroup = networkGroup.addGroup("Input"); // this is optional
          MetricGroup outputGroup = networkGroup.addGroup("Output"); // this is optional
          for (...)

          { X.registerQueueLengthMetrics(metricGroup.addGroup(i), <gate/partition>); }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r101992784 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -385,6 +388,20 @@ public Task( ++counter; } + // register detailed network metrics, if configured + if (tmConfig.getBoolean(ConfigConstants.NETWORK_DETAILED_METRICS_KEY, false)) { + // output metrics — End diff – Regarding the MetricGroup struccture/naming i would suggest the following: ``` MetricGroup networkGroup = metricGroup.addGroup("Network"); // this is for consistency purposes MetricGroup inputGroup = networkGroup.addGroup("Input"); // this is optional MetricGroup outputGroup = networkGroup.addGroup("Output"); // this is optional for (...) { X.registerQueueLengthMetrics(metricGroup.addGroup(i), <gate/partition>); } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r101993135

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java —
          @@ -0,0 +1,136 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.io.network.partition;
          +
          +import org.apache.flink.metrics.Gauge;
          +import org.apache.flink.metrics.MetricGroup;
          +
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +public class ResultPartitionMetrics {
          — End diff –

          It would be cool if we could move these into the ```MetricUtils``` class, to consolidate things a bit. Or move it to ```o.a.f.runtime.metrics.util```.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r101993135 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java — @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class ResultPartitionMetrics { — End diff – It would be cool if we could move these into the ```MetricUtils``` class, to consolidate things a bit. Or move it to ```o.a.f.runtime.metrics.util```.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r102206831

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java —
          @@ -0,0 +1,167 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.io.network.partition.consumer;
          +
          +import org.apache.flink.metrics.Gauge;
          +import org.apache.flink.metrics.MetricGroup;
          +
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +public class InputGateMetrics {
          — End diff –

          ...or keep it close to the implementation of the network stack - it kind of belongs to both.
          In comparison, all things in `MetricUtils` don't really implement own metrics collection as in the InputGateMetrics class.
          I'd prefer to leave it where it is but alternatively move it to `o.a.f.runtime.metrics.util`

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r102206831 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java — @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class InputGateMetrics { — End diff – ...or keep it close to the implementation of the network stack - it kind of belongs to both. In comparison, all things in `MetricUtils` don't really implement own metrics collection as in the InputGateMetrics class. I'd prefer to leave it where it is but alternatively move it to `o.a.f.runtime.metrics.util`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r102207017

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java —
          @@ -0,0 +1,136 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.io.network.partition;
          +
          +import org.apache.flink.metrics.Gauge;
          +import org.apache.flink.metrics.MetricGroup;
          +
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +public class ResultPartitionMetrics {
          — End diff –

          same as above...

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r102207017 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java — @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class ResultPartitionMetrics { — End diff – same as above...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r102210056

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java —
          @@ -227,6 +227,14 @@
          public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";

          /**
          + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths
          + */
          + @PublicEvolving
          + public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS_KEY =
          — End diff –

          I don't think ConfigOptions are supposed to be in this class; itshould be moved to either ```NetworkOptions``` or ```MetricOptions```.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r102210056 — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java — @@ -227,6 +227,14 @@ public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers"; /** + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths + */ + @PublicEvolving + public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS_KEY = — End diff – I don't think ConfigOptions are supposed to be in this class; itshould be moved to either ```NetworkOptions``` or ```MetricOptions```.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r102223763

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java —
          @@ -227,6 +227,14 @@
          public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";

          /**
          + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths
          + */
          + @PublicEvolving
          + public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS_KEY =
          — End diff –

          actually, while looking into that, I noticed that some similar options already went into `TaskManagerOptions` but are also prefixed with `taskmanager.net.` so I'll drop the new config there and revert to this namespace

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r102223763 — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java — @@ -227,6 +227,14 @@ public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers"; /** + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths + */ + @PublicEvolving + public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS_KEY = — End diff – actually, while looking into that, I noticed that some similar options already went into `TaskManagerOptions` but are also prefixed with `taskmanager.net.` so I'll drop the new config there and revert to this namespace
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r102227821

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java —
          @@ -227,6 +227,14 @@
          public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";

          /**
          + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths
          + */
          + @PublicEvolving
          + public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS_KEY =
          — End diff –

          makes sense.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r102227821 — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java — @@ -227,6 +227,14 @@ public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers"; /** + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths + */ + @PublicEvolving + public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS_KEY = — End diff – makes sense.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r102427382

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java —
          @@ -0,0 +1,167 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.io.network.partition.consumer;
          +
          +import org.apache.flink.metrics.Gauge;
          +import org.apache.flink.metrics.MetricGroup;
          +
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +public class InputGateMetrics {
          — End diff –

          fair enough

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r102427382 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java — @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class InputGateMetrics { — End diff – fair enough
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3348

          The metrics documentation must be update to contain the new metrics.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3348 The metrics documentation must be update to contain the new metrics.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/3348

          Right, that was missing indeed. I also found some bugs and useful extensions / inconsistencies that I fixed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3348 Right, that was missing indeed. I also found some bugs and useful extensions / inconsistencies that I fixed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r103936964

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -389,11 +389,20 @@ public Task(
          ++counter;
          }

          + invokableHasBeenCanceled = new AtomicBoolean(false);
          +
          + // finally, create the executing thread, but do not start it
          + executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
          +
          + // add metrics for buffers
          + this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
          +
          // register detailed network metrics, if configured
          if (tmConfig.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS_KEY)) {

          • MetricGroup networkGroup = metricGroup.addGroup("Network"); // same as in MetricUtils.instantiateNetworkMetrics()
          • MetricGroup outputGroup = networkGroup.addGroup("Output"); // this is optional
          • MetricGroup inputGroup = networkGroup.addGroup("Input"); // this is optional
            + // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
            + MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
              • End diff –

          this is equivalent to ```metricGroup.addGroup("Network")```. The IOMetricGroup is just a proxy that forwards calls to the parent group, i.e. the task metric group.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r103936964 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -389,11 +389,20 @@ public Task( ++counter; } + invokableHasBeenCanceled = new AtomicBoolean(false); + + // finally, create the executing thread, but do not start it + executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); + + // add metrics for buffers + this.metrics.getIOMetricGroup().initializeBufferMetrics(this); + // register detailed network metrics, if configured if (tmConfig.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS_KEY)) { MetricGroup networkGroup = metricGroup.addGroup("Network"); // same as in MetricUtils.instantiateNetworkMetrics() MetricGroup outputGroup = networkGroup.addGroup("Output"); // this is optional MetricGroup inputGroup = networkGroup.addGroup("Input"); // this is optional + // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup + MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network"); End diff – this is equivalent to ```metricGroup.addGroup("Network")```. The IOMetricGroup is just a proxy that forwards calls to the parent group, i.e. the task metric group.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r103948091

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.io.network.partition.consumer;
          +
          +import org.apache.flink.metrics.Gauge;
          +import org.apache.flink.metrics.MetricGroup;
          +
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +public class InputGateMetrics {
          +
          + private final SingleInputGate inputGate;
          +
          + private long lastTotal = -1;
          +
          + private int lastMin = -1;
          +
          + private int lastMax = -1;
          +
          + private float lastAvg = -1.0f;
          +
          + // ------------------------------------------------------------------------
          +
          + private InputGateMetrics(SingleInputGate inputGate)

          { + this.inputGate = checkNotNull(inputGate); + }

          +
          + // ------------------------------------------------------------------------
          +
          + // these methods are package private to make access from the nested classes faster
          +
          + long refreshAndGetTotal() {
          + long total;
          + if ((total = lastTotal) == -1) {
          + refresh();
          — End diff –

          I'm not sure whether this pattern is really worth it.

          The stated goal is that the values returned by the gauges all originate from the same point in time. However this only holds if all gauges are accessed in sequence. The moment that a metric is accessed multiple times (like a reporter that writes to multiple systems and first loops over metrics and then over reporters), or in a random pattern (JMX) this breaks down. In this case the end result isn't just out-of-sync but potentially outdated metrics.

          Say with JMX a user accesses one of the gauges a single time and then waits for, let's say 10 minutes. If he now access another metric the result will be that of 10 minutes ago. If he accesses the value again he will experience a(n abnormally large) jump in time, which is inconsistent with all other metrics that always provide the most up-to-date value.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r103948091 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class InputGateMetrics { + + private final SingleInputGate inputGate; + + private long lastTotal = -1; + + private int lastMin = -1; + + private int lastMax = -1; + + private float lastAvg = -1.0f; + + // ------------------------------------------------------------------------ + + private InputGateMetrics(SingleInputGate inputGate) { + this.inputGate = checkNotNull(inputGate); + } + + // ------------------------------------------------------------------------ + + // these methods are package private to make access from the nested classes faster + + long refreshAndGetTotal() { + long total; + if ((total = lastTotal) == -1) { + refresh(); — End diff – I'm not sure whether this pattern is really worth it. The stated goal is that the values returned by the gauges all originate from the same point in time. However this only holds if all gauges are accessed in sequence. The moment that a metric is accessed multiple times (like a reporter that writes to multiple systems and first loops over metrics and then over reporters), or in a random pattern (JMX) this breaks down. In this case the end result isn't just out-of-sync but potentially outdated metrics. Say with JMX a user accesses one of the gauges a single time and then waits for, let's say 10 minutes. If he now access another metric the result will be that of 10 minutes ago. If he accesses the value again he will experience a(n abnormally large) jump in time, which is inconsistent with all other metrics that always provide the most up-to-date value.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r104388593

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.io.network.partition.consumer;
          +
          +import org.apache.flink.metrics.Gauge;
          +import org.apache.flink.metrics.MetricGroup;
          +
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +public class InputGateMetrics {
          +
          + private final SingleInputGate inputGate;
          +
          + private long lastTotal = -1;
          +
          + private int lastMin = -1;
          +
          + private int lastMax = -1;
          +
          + private float lastAvg = -1.0f;
          +
          + // ------------------------------------------------------------------------
          +
          + private InputGateMetrics(SingleInputGate inputGate)

          { + this.inputGate = checkNotNull(inputGate); + }

          +
          + // ------------------------------------------------------------------------
          +
          + // these methods are package private to make access from the nested classes faster
          +
          + long refreshAndGetTotal() {
          + long total;
          + if ((total = lastTotal) == -1)

          { + refresh(); + total = lastTotal; + }

          +
          + lastTotal = -1;
          + return total;
          + }
          +
          + int refreshAndGetMin() {
          + int min;
          + if ((min = lastMin) == -1)

          { + refresh(); + min = lastMin; + }

          +
          + lastMin = -1;
          + return min;
          + }
          +
          + int refreshAndGetMax() {
          + int max;
          + if ((max = lastMax) == -1)

          { + refresh(); + max = lastMax; + }

          +
          + lastMax = -1;
          + return max;
          + }
          +
          + float refreshAndGetAvg() {
          + float avg;
          + if ((avg = lastAvg) < 0.0f)

          { + refresh(); + avg = lastAvg; + }

          +
          + lastAvg = -1.0f;
          + return avg;
          + }
          +
          + private void refresh() {
          + long total = 0;
          + int min = Integer.MAX_VALUE;
          + int max = 0;
          + int count = 0;
          +
          + for (InputChannel channel : inputGate.getInputChannels().values()) {
          + if (channel.getClass() == RemoteInputChannel.class)

          { + RemoteInputChannel rc = (RemoteInputChannel) channel; + + int size = rc.unsynchronizedGetNumberOfQueuedBuffers(); + total += size; + min = Math.min(min, size); + max = Math.max(max, size); + count++; + }

          + }
          +
          + this.lastTotal = total;
          + this.lastMin = min;
          + this.lastMax = max;
          + this.lastAvg = total / (float) count;
          + }
          +
          + // ------------------------------------------------------------------------
          + // Gauges to access the stats
          + // ------------------------------------------------------------------------
          +
          + private Gauge<Long> getTotalQueueLenGauge() {
          + return new Gauge<Long>() {
          + @Override
          + public Long getValue()

          { + return refreshAndGetTotal(); + }

          + };
          + }
          +
          + private Gauge<Integer> getMinQueueLenGauge() {
          + return new Gauge<Integer>() {
          + @Override
          + public Integer getValue()

          { + return refreshAndGetMin(); + }

          + };
          + }
          +
          + private Gauge<Integer> getMaxQueueLenGauge() {
          + return new Gauge<Integer>() {
          + @Override
          + public Integer getValue()

          { + return refreshAndGetMax(); + }

          + };
          + }
          +
          + private Gauge<Float> getAvgQueueLenGauge() {
          + return new Gauge<Float>() {
          + @Override
          + public Float getValue()

          { + return refreshAndGetAvg(); + }

          + };
          + }
          +
          + // ------------------------------------------------------------------------
          + // Static access
          + // ------------------------------------------------------------------------
          +
          + public static void registerQueueLengthMetrics(MetricGroup group, SingleInputGate gate) {
          + InputGateMetrics metrics = new InputGateMetrics(gate);
          +
          + group.gauge("total-queue-len", metrics.getTotalQueueLenGauge());
          — End diff –

          These metric names aren't consistent with other metrics; they should be named ```totalQueueLen``` or similar.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r104388593 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class InputGateMetrics { + + private final SingleInputGate inputGate; + + private long lastTotal = -1; + + private int lastMin = -1; + + private int lastMax = -1; + + private float lastAvg = -1.0f; + + // ------------------------------------------------------------------------ + + private InputGateMetrics(SingleInputGate inputGate) { + this.inputGate = checkNotNull(inputGate); + } + + // ------------------------------------------------------------------------ + + // these methods are package private to make access from the nested classes faster + + long refreshAndGetTotal() { + long total; + if ((total = lastTotal) == -1) { + refresh(); + total = lastTotal; + } + + lastTotal = -1; + return total; + } + + int refreshAndGetMin() { + int min; + if ((min = lastMin) == -1) { + refresh(); + min = lastMin; + } + + lastMin = -1; + return min; + } + + int refreshAndGetMax() { + int max; + if ((max = lastMax) == -1) { + refresh(); + max = lastMax; + } + + lastMax = -1; + return max; + } + + float refreshAndGetAvg() { + float avg; + if ((avg = lastAvg) < 0.0f) { + refresh(); + avg = lastAvg; + } + + lastAvg = -1.0f; + return avg; + } + + private void refresh() { + long total = 0; + int min = Integer.MAX_VALUE; + int max = 0; + int count = 0; + + for (InputChannel channel : inputGate.getInputChannels().values()) { + if (channel.getClass() == RemoteInputChannel.class) { + RemoteInputChannel rc = (RemoteInputChannel) channel; + + int size = rc.unsynchronizedGetNumberOfQueuedBuffers(); + total += size; + min = Math.min(min, size); + max = Math.max(max, size); + count++; + } + } + + this.lastTotal = total; + this.lastMin = min; + this.lastMax = max; + this.lastAvg = total / (float) count; + } + + // ------------------------------------------------------------------------ + // Gauges to access the stats + // ------------------------------------------------------------------------ + + private Gauge<Long> getTotalQueueLenGauge() { + return new Gauge<Long>() { + @Override + public Long getValue() { + return refreshAndGetTotal(); + } + }; + } + + private Gauge<Integer> getMinQueueLenGauge() { + return new Gauge<Integer>() { + @Override + public Integer getValue() { + return refreshAndGetMin(); + } + }; + } + + private Gauge<Integer> getMaxQueueLenGauge() { + return new Gauge<Integer>() { + @Override + public Integer getValue() { + return refreshAndGetMax(); + } + }; + } + + private Gauge<Float> getAvgQueueLenGauge() { + return new Gauge<Float>() { + @Override + public Float getValue() { + return refreshAndGetAvg(); + } + }; + } + + // ------------------------------------------------------------------------ + // Static access + // ------------------------------------------------------------------------ + + public static void registerQueueLengthMetrics(MetricGroup group, SingleInputGate gate) { + InputGateMetrics metrics = new InputGateMetrics(gate); + + group.gauge("total-queue-len", metrics.getTotalQueueLenGauge()); — End diff – These metric names aren't consistent with other metrics; they should be named ```totalQueueLen``` or similar.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r105742696

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.io.network.partition.consumer;
          +
          +import org.apache.flink.metrics.Gauge;
          +import org.apache.flink.metrics.MetricGroup;
          +
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +public class InputGateMetrics {
          +
          + private final SingleInputGate inputGate;
          +
          + private long lastTotal = -1;
          +
          + private int lastMin = -1;
          +
          + private int lastMax = -1;
          +
          + private float lastAvg = -1.0f;
          +
          + // ------------------------------------------------------------------------
          +
          + private InputGateMetrics(SingleInputGate inputGate)

          { + this.inputGate = checkNotNull(inputGate); + }

          +
          + // ------------------------------------------------------------------------
          +
          + // these methods are package private to make access from the nested classes faster
          +
          + long refreshAndGetTotal() {
          + long total;
          + if ((total = lastTotal) == -1) {
          + refresh();
          — End diff –

          I guess the main reason for this was performance in case all (up to) 4 metrics are requested. What's the preferred way of exposing these kind of metrics? Should I gather all 4 in a custom object and enclose that one in a Gauge? How could the metrics then be displayed, e.g. in the web interface?

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r105742696 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class InputGateMetrics { + + private final SingleInputGate inputGate; + + private long lastTotal = -1; + + private int lastMin = -1; + + private int lastMax = -1; + + private float lastAvg = -1.0f; + + // ------------------------------------------------------------------------ + + private InputGateMetrics(SingleInputGate inputGate) { + this.inputGate = checkNotNull(inputGate); + } + + // ------------------------------------------------------------------------ + + // these methods are package private to make access from the nested classes faster + + long refreshAndGetTotal() { + long total; + if ((total = lastTotal) == -1) { + refresh(); — End diff – I guess the main reason for this was performance in case all (up to) 4 metrics are requested. What's the preferred way of exposing these kind of metrics? Should I gather all 4 in a custom object and enclose that one in a Gauge? How could the metrics then be displayed, e.g. in the web interface?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r105753418

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.io.network.partition.consumer;
          +
          +import org.apache.flink.metrics.Gauge;
          +import org.apache.flink.metrics.MetricGroup;
          +
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +public class InputGateMetrics {
          +
          + private final SingleInputGate inputGate;
          +
          + private long lastTotal = -1;
          +
          + private int lastMin = -1;
          +
          + private int lastMax = -1;
          +
          + private float lastAvg = -1.0f;
          +
          + // ------------------------------------------------------------------------
          +
          + private InputGateMetrics(SingleInputGate inputGate)

          { + this.inputGate = checkNotNull(inputGate); + }

          +
          + // ------------------------------------------------------------------------
          +
          + // these methods are package private to make access from the nested classes faster
          +
          + long refreshAndGetTotal() {
          + long total;
          + if ((total = lastTotal) == -1) {
          + refresh();
          — End diff –

          Custom objects can't be displayed properly in the web interface since we call ```toString()``` on it. The same happens in most reporters; so this isn't really an option.

          As it stands we don't have a single metrics that is guaranteed to be 100% consistent with other metrics. numRecordsOut and numBytesOut to not descriibe the same moment in time. Neither is this guaranteed for the checkpoint metrics; while these are updated all at once (from the outside), there is no mechanism that prevents this update in the middle of a report.

          I don't know a lot about the network stack; so whether it is truly necessary to have all metrics describe one point in time I can't say.

          If this is necessary the only way i can think of right now is abusing the View metric type. View's are meant an add-on for metrics that want to be updated in regular intervals (5 seconds) regardless of when their value is actually requested. A metric that only implements the View interface is never reported, but still updated, so you could have this view update a shared data-structure from which the other gauges simply retrieve the current value,

          If this is not necessary i would simply separate them and don't worry about the performance overhead of the metrics; as long as this doesn't affect the job via taking locks or similar.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r105753418 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class InputGateMetrics { + + private final SingleInputGate inputGate; + + private long lastTotal = -1; + + private int lastMin = -1; + + private int lastMax = -1; + + private float lastAvg = -1.0f; + + // ------------------------------------------------------------------------ + + private InputGateMetrics(SingleInputGate inputGate) { + this.inputGate = checkNotNull(inputGate); + } + + // ------------------------------------------------------------------------ + + // these methods are package private to make access from the nested classes faster + + long refreshAndGetTotal() { + long total; + if ((total = lastTotal) == -1) { + refresh(); — End diff – Custom objects can't be displayed properly in the web interface since we call ```toString()``` on it. The same happens in most reporters; so this isn't really an option. As it stands we don't have a single metrics that is guaranteed to be 100% consistent with other metrics. numRecordsOut and numBytesOut to not descriibe the same moment in time. Neither is this guaranteed for the checkpoint metrics; while these are updated all at once (from the outside), there is no mechanism that prevents this update in the middle of a report. I don't know a lot about the network stack; so whether it is truly necessary to have all metrics describe one point in time I can't say. If this is necessary the only way i can think of right now is abusing the View metric type. View's are meant an add-on for metrics that want to be updated in regular intervals (5 seconds) regardless of when their value is actually requested. A metric that only implements the View interface is never reported, but still updated, so you could have this view update a shared data-structure from which the other gauges simply retrieve the current value, If this is not necessary i would simply separate them and don't worry about the performance overhead of the metrics; as long as this doesn't affect the job via taking locks or similar.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r105891279

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -389,11 +389,20 @@ public Task(
          ++counter;
          }

          + invokableHasBeenCanceled = new AtomicBoolean(false);
          +
          + // finally, create the executing thread, but do not start it
          + executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
          +
          + // add metrics for buffers
          + this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
          +
          // register detailed network metrics, if configured
          if (tmConfig.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS_KEY)) {

          • MetricGroup networkGroup = metricGroup.addGroup("Network"); // same as in MetricUtils.instantiateNetworkMetrics()
          • MetricGroup outputGroup = networkGroup.addGroup("Output"); // this is optional
          • MetricGroup inputGroup = networkGroup.addGroup("Input"); // this is optional
            + // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
            + MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
              • End diff –

          this seems a bit strange to me in general and I see several occurrences of `this.metrics.getIOMetricGroup()` in the code base - which use is preferred?

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r105891279 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -389,11 +389,20 @@ public Task( ++counter; } + invokableHasBeenCanceled = new AtomicBoolean(false); + + // finally, create the executing thread, but do not start it + executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); + + // add metrics for buffers + this.metrics.getIOMetricGroup().initializeBufferMetrics(this); + // register detailed network metrics, if configured if (tmConfig.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS_KEY)) { MetricGroup networkGroup = metricGroup.addGroup("Network"); // same as in MetricUtils.instantiateNetworkMetrics() MetricGroup outputGroup = networkGroup.addGroup("Output"); // this is optional MetricGroup inputGroup = networkGroup.addGroup("Input"); // this is optional + // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup + MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network"); End diff – this seems a bit strange to me in general and I see several occurrences of `this.metrics.getIOMetricGroup()` in the code base - which use is preferred?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r105891947

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java —
          @@ -0,0 +1,168 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.io.network.partition.consumer;
          +
          +import org.apache.flink.metrics.Gauge;
          +import org.apache.flink.metrics.MetricGroup;
          +
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +public class InputGateMetrics {
          +
          + private final SingleInputGate inputGate;
          +
          + private long lastTotal = -1;
          +
          + private int lastMin = -1;
          +
          + private int lastMax = -1;
          +
          + private float lastAvg = -1.0f;
          +
          + // ------------------------------------------------------------------------
          +
          + private InputGateMetrics(SingleInputGate inputGate)

          { + this.inputGate = checkNotNull(inputGate); + }

          +
          + // ------------------------------------------------------------------------
          +
          + // these methods are package private to make access from the nested classes faster
          +
          + long refreshAndGetTotal() {
          + long total;
          + if ((total = lastTotal) == -1) {
          + refresh();
          — End diff –

          I doubt, it is necessary to have the 4 metrics represent a single point in time. I think this construct was just set up to go over the channels only once and gather all statistics in one go.
          I'll adapt this PR to update the metrics individually instead...

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r105891947 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java — @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class InputGateMetrics { + + private final SingleInputGate inputGate; + + private long lastTotal = -1; + + private int lastMin = -1; + + private int lastMax = -1; + + private float lastAvg = -1.0f; + + // ------------------------------------------------------------------------ + + private InputGateMetrics(SingleInputGate inputGate) { + this.inputGate = checkNotNull(inputGate); + } + + // ------------------------------------------------------------------------ + + // these methods are package private to make access from the nested classes faster + + long refreshAndGetTotal() { + long total; + if ((total = lastTotal) == -1) { + refresh(); — End diff – I doubt, it is necessary to have the 4 metrics represent a single point in time. I think this construct was just set up to go over the channels only once and gather all statistics in one go. I'll adapt this PR to update the metrics individually instead...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r105895733

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -389,11 +389,20 @@ public Task(
          ++counter;
          }

          + invokableHasBeenCanceled = new AtomicBoolean(false);
          +
          + // finally, create the executing thread, but do not start it
          + executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
          +
          + // add metrics for buffers
          + this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
          +
          // register detailed network metrics, if configured
          if (tmConfig.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS_KEY)) {

          • MetricGroup networkGroup = metricGroup.addGroup("Network"); // same as in MetricUtils.instantiateNetworkMetrics()
          • MetricGroup outputGroup = networkGroup.addGroup("Output"); // this is optional
          • MetricGroup inputGroup = networkGroup.addGroup("Input"); // this is optional
            + // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
            + MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
              • End diff –

          The point of the IOMetricGroup is to keep a lot of details out of the TaskMetricGroup without affecting the actual MetricGroup structure. IO metrics are handled a bit differently than other metrics in that they are a) also stored in the ExecutionGraph and b) are used from different parts of the code (like multiple RecordWriters). We preemptively moved this logic into a separate class so that the TaskMG doesn't blow up over time.

          There isn't anything wrong with registering metrics/adding groups on it, they aren't lost or anything. I'm only mentioning it since you modified existing code with something that is equivalent.

          If we want there to be an actual "IO" group we only have to modify these 2 lines:
          ```
          TaskMetricGroup:
          this.ioMetrics = new TaskIOMetricGroup();
          =>
          this.ioMetrics = new TaskIOMetricGroup(addGroup("IO"));
          ```

          ```
          TaskIOMetricGroup:
          public TaskIOMetricGroup(TaskMetricGroup parent) {
          =>
          public TaskIOMetricGroup(MetricGroup parent) {
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r105895733 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -389,11 +389,20 @@ public Task( ++counter; } + invokableHasBeenCanceled = new AtomicBoolean(false); + + // finally, create the executing thread, but do not start it + executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); + + // add metrics for buffers + this.metrics.getIOMetricGroup().initializeBufferMetrics(this); + // register detailed network metrics, if configured if (tmConfig.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS_KEY)) { MetricGroup networkGroup = metricGroup.addGroup("Network"); // same as in MetricUtils.instantiateNetworkMetrics() MetricGroup outputGroup = networkGroup.addGroup("Output"); // this is optional MetricGroup inputGroup = networkGroup.addGroup("Input"); // this is optional + // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup + MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network"); End diff – The point of the IOMetricGroup is to keep a lot of details out of the TaskMetricGroup without affecting the actual MetricGroup structure. IO metrics are handled a bit differently than other metrics in that they are a) also stored in the ExecutionGraph and b) are used from different parts of the code (like multiple RecordWriters). We preemptively moved this logic into a separate class so that the TaskMG doesn't blow up over time. There isn't anything wrong with registering metrics/adding groups on it, they aren't lost or anything. I'm only mentioning it since you modified existing code with something that is equivalent. If we want there to be an actual "IO" group we only have to modify these 2 lines: ``` TaskMetricGroup: this.ioMetrics = new TaskIOMetricGroup(); => this.ioMetrics = new TaskIOMetricGroup(addGroup("IO")); ``` ``` TaskIOMetricGroup: public TaskIOMetricGroup(TaskMetricGroup parent) { => public TaskIOMetricGroup(MetricGroup parent) { ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r105898097

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -389,11 +389,20 @@ public Task(
          ++counter;
          }

          + invokableHasBeenCanceled = new AtomicBoolean(false);
          +
          + // finally, create the executing thread, but do not start it
          + executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
          +
          + // add metrics for buffers
          + this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
          +
          // register detailed network metrics, if configured
          if (tmConfig.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS_KEY)) {

          • MetricGroup networkGroup = metricGroup.addGroup("Network"); // same as in MetricUtils.instantiateNetworkMetrics()
          • MetricGroup outputGroup = networkGroup.addGroup("Output"); // this is optional
          • MetricGroup inputGroup = networkGroup.addGroup("Input"); // this is optional
            + // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
            + MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
              • End diff –

          ok, thanks for the clarification - I'll keep it the way it is now though since this "modified" code is actually new in this PR and this way at least, it is somewhat consistent with the code from MetricUtils

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r105898097 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -389,11 +389,20 @@ public Task( ++counter; } + invokableHasBeenCanceled = new AtomicBoolean(false); + + // finally, create the executing thread, but do not start it + executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); + + // add metrics for buffers + this.metrics.getIOMetricGroup().initializeBufferMetrics(this); + // register detailed network metrics, if configured if (tmConfig.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS_KEY)) { MetricGroup networkGroup = metricGroup.addGroup("Network"); // same as in MetricUtils.instantiateNetworkMetrics() MetricGroup outputGroup = networkGroup.addGroup("Output"); // this is optional MetricGroup inputGroup = networkGroup.addGroup("Input"); // this is optional + // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup + MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network"); End diff – ok, thanks for the clarification - I'll keep it the way it is now though since this "modified" code is actually new in this PR and this way at least, it is somewhat consistent with the code from MetricUtils
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/3348

          I'm wondering: is it actually useful to be able to enable/disable detailed metric stats via `taskmanager.net.detailed-metrics` or can we enable them always since they do not incur any overhead unless used anyways?

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3348 I'm wondering: is it actually useful to be able to enable/disable detailed metric stats via `taskmanager.net.detailed-metrics` or can we enable them always since they do not incur any overhead unless used anyways?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/3348

          @zentol can you have a look again?

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3348 @zentol can you have a look again?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r109387164

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
          @@ -391,9 +395,26 @@ public Task(
          // finally, create the executing thread, but do not start it
          executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

          • if (this.metrics != null && this.metrics.getIOMetricGroup() != null) {
          • // add metrics for buffers
          • this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
            + // add metrics for buffers
            + this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
            +
            + // register detailed network metrics, if configured
            + if (tmConfig.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS_KEY)) {
              • End diff –

          It would be good to per-emptively move this into the run() method, specifically below `network.registerTask(this);`, as in #3610. Instantiating them in the constructor can lead to NullPointerExceptions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r109387164 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -391,9 +395,26 @@ public Task( // finally, create the executing thread, but do not start it executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); if (this.metrics != null && this.metrics.getIOMetricGroup() != null) { // add metrics for buffers this.metrics.getIOMetricGroup().initializeBufferMetrics(this); + // add metrics for buffers + this.metrics.getIOMetricGroup().initializeBufferMetrics(this); + + // register detailed network metrics, if configured + if (tmConfig.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS_KEY)) { End diff – It would be good to per-emptively move this into the run() method, specifically below `network.registerTask(this);`, as in #3610. Instantiating them in the constructor can lead to NullPointerExceptions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r109387705

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java —
          @@ -67,6 +67,14 @@
          key("taskmanager.net.memory.extra-buffers-per-gate")
          .defaultValue(8);

          + /**
          + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
          + * lengths.
          + */
          + public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS_KEY =
          — End diff –

          For ConfigOptions we typically don't append `_KEY`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r109387705 — Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java — @@ -67,6 +67,14 @@ key("taskmanager.net.memory.extra-buffers-per-gate") .defaultValue(8); + /** + * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue + * lengths. + */ + public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS_KEY = — End diff – For ConfigOptions we typically don't append `_KEY`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3348#discussion_r109386391

          — Diff: docs/monitoring/metrics.md —
          @@ -657,6 +657,24 @@ Thus, in order to infer the metric identifier:
          <td>outPoolUsage</td>
          <td>An estimate of the output buffers usage.</td>
          </tr>
          + <tr>
          + <td rowspan="4">Network.<Input|Output>.<gate><br />
          + <strong>(only available if <tt>taskmanager.net.detailed-metrics</tt> config option is set)</strong></td>
          + <td>total-queue-len</td>
          + <td>Total number of queued buffers in all input/output channels.</td>
          + </tr>
          + <tr>
          + <td>min-queue-len</td>
          — End diff –

          These are not in sync with the actual metric names.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3348#discussion_r109386391 — Diff: docs/monitoring/metrics.md — @@ -657,6 +657,24 @@ Thus, in order to infer the metric identifier: <td>outPoolUsage</td> <td>An estimate of the output buffers usage.</td> </tr> + <tr> + <td rowspan="4">Network.<Input|Output>.<gate><br /> + <strong>(only available if <tt>taskmanager.net.detailed-metrics</tt> config option is set)</strong></td> + <td>total-queue-len</td> + <td>Total number of queued buffers in all input/output channels.</td> + </tr> + <tr> + <td>min-queue-len</td> — End diff – These are not in sync with the actual metric names.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/3348

          ok, sorry, these slipped through...

          please note however, that the not-null checks in #3610 become obsolete with this PR

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3348 ok, sorry, these slipped through... please note however, that the not-null checks in #3610 become obsolete with this PR
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3348

          Will try merging this now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3348 Will try merging this now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3348

          Did @StephanEwen provide the initial implementation, or why is the first commit by him?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3348 Did @StephanEwen provide the initial implementation, or why is the first commit by him?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/3348

          yes, that's exactly why

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3348 yes, that's exactly why
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3348

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3348
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Implemented in:
          6cba8bb32a822a3f6bc7a798224390d24dd19cfc
          fff04bfbea5eeb3fc0d6a59db961c4b678ceab58

          Show
          aljoscha Aljoscha Krettek added a comment - Implemented in: 6cba8bb32a822a3f6bc7a798224390d24dd19cfc fff04bfbea5eeb3fc0d6a59db961c4b678ceab58

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development