Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5118

Inconsistent records sent/received metrics

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.3.0
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: Metrics, Webfrontend
    • Labels:
      None

      Description

      In 1.2-SNAPSHOT running a large scale job you see that the counts for send/received records are inconsistent, e.g. in a simple word count job we see more received records/bytes than we see sent. This is a regression from 1.1 where everything works as expected.

        Issue Links

          Activity

          Show
          Zentol Chesnay Schepler added a comment - PR at https://github.com/apache/flink/pull/3106
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3106#discussion_r95812448

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java —
          @@ -99,7 +99,7 @@ public SerializationResult addRecord(T record) throws IOException {
          this.lengthBuffer.putInt(0, len);

          if (numBytesOut != null) {

          • numBytesOut.inc(len);
            + numBytesOut.inc(len + 4);
              • End diff –

          I think this warrants both a comment and a test.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3106#discussion_r95812448 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java — @@ -99,7 +99,7 @@ public SerializationResult addRecord(T record) throws IOException { this.lengthBuffer.putInt(0, len); if (numBytesOut != null) { numBytesOut.inc(len); + numBytesOut.inc(len + 4); End diff – I think this warrants both a comment and a test.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3106

          Good catch. I was wondering whether it is better to count in a consistent way, too, e.g. count the size of each buffer or each record on both A and B (now we have per record on the output side and per buffer on the input side).

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3106 Good catch. I was wondering whether it is better to count in a consistent way, too, e.g. count the size of each buffer or each record on both A and B (now we have per record on the output side and per buffer on the input side).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3106

          @uce Could you take another look? I moved the bytesOut counter into the RecordWriter.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3106 @uce Could you take another look? I moved the bytesOut counter into the RecordWriter.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user xhumanoid commented on the issue:

          https://github.com/apache/flink/pull/3106

          @zentol
          what do you think about remove
          if (numBytesOut != null) {

          and replace
          numBytesOut = metrics.getNumBytesOutCounter();

          with

          + if (metrics.getNumBytesOutCounter() != null)

          { + numBytesOut = metrics.getNumBytesOutCounter(); + }

          else

          { + numBytesOut = new NullCounter(); + }

          where NullCounter have empty implementation for every method,

          prof:
          we do null check in one place, because sometime we may forget to do it

          cons:
          sometimes we broke devirtualization and inlining for counter.inc(..) method

          Show
          githubbot ASF GitHub Bot added a comment - Github user xhumanoid commented on the issue: https://github.com/apache/flink/pull/3106 @zentol what do you think about remove if (numBytesOut != null) { and replace numBytesOut = metrics.getNumBytesOutCounter(); with + if (metrics.getNumBytesOutCounter() != null) { + numBytesOut = metrics.getNumBytesOutCounter(); + } else { + numBytesOut = new NullCounter(); + } where NullCounter have empty implementation for every method, prof: we do null check in one place, because sometime we may forget to do it cons: sometimes we broke devirtualization and inlining for counter.inc(..) method
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3106

          @xhumanoid The metrics returned by the TaskIOMetricGroup can't actually be null, so I wouldn't put too much thought into dealing with that case.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3106 @xhumanoid The metrics returned by the TaskIOMetricGroup can't actually be null, so I wouldn't put too much thought into dealing with that case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user xhumanoid commented on the issue:

          https://github.com/apache/flink/pull/3106

          @zentol I asked because you always check on null when you try writing to Counter
          or is it prevent uninitialized state?

          Show
          githubbot ASF GitHub Bot added a comment - Github user xhumanoid commented on the issue: https://github.com/apache/flink/pull/3106 @zentol I asked because you always check on null when you try writing to Counter or is it prevent uninitialized state?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3106

          The code checks for null since there is no technical contract that the returned value is null. They aren't strictly necessary, and are only meant to guard against programming errors in the metrics system.

          Using a ```NullCounter``` would indeed do the same, would however introduce effectively dead code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3106 The code checks for null since there is no technical contract that the returned value is null. They aren't strictly necessary, and are only meant to guard against programming errors in the metrics system. Using a ```NullCounter``` would indeed do the same, would however introduce effectively dead code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3106

          +1 to this approach.

          Some suggestions for small performance improvements, not specific to this case, but also applicable to other cases:

          • It may make sense to type the counters at that level to the "SimpleCounter", if we anyways provide them. The TaskIOMetricGroup could type its counters to "SimpleCounter" as well. Since that class it not final, and future mocks/checks can be implemented on that class as well
          • We can make sure that the field `counter` is always initialized by initially assigning a standalone SimpleCounter. That way we could drop the null checks in the code.
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3106 +1 to this approach. Some suggestions for small performance improvements, not specific to this case, but also applicable to other cases: It may make sense to type the counters at that level to the "SimpleCounter", if we anyways provide them. The TaskIOMetricGroup could type its counters to "SimpleCounter" as well. Since that class it not final, and future mocks/checks can be implemented on that class as well We can make sure that the field `counter` is always initialized by initially assigning a standalone SimpleCounter. That way we could drop the null checks in the code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3106

          @StephanEwen I'll add the default initialization of ```counter``` while merging.

          Typing the counters to a ```SimpleCounter``` would affect a lot of other classes though (especially since we can extend this to the ```OperatorIOMetricGroup```), so I would like to do that as part of another issue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3106 @StephanEwen I'll add the default initialization of ```counter``` while merging. Typing the counters to a ```SimpleCounter``` would affect a lot of other classes though (especially since we can extend this to the ```OperatorIOMetricGroup```), so I would like to do that as part of another issue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3106

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3106
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in

          • 1.2.0 via 940b4900d1cfc19b32af2b3e8753d3b6e01622ee
          • 1.3.0 via 3a258f7eabe0940abaca63f900a080204b32d2fc
          Show
          StephanEwen Stephan Ewen added a comment - Fixed in 1.2.0 via 940b4900d1cfc19b32af2b3e8753d3b6e01622ee 1.3.0 via 3a258f7eabe0940abaca63f900a080204b32d2fc

            People

            • Assignee:
              Zentol Chesnay Schepler
              Reporter:
              uce Ufuk Celebi
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development