Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5464

MetricQueryService throws NullPointerException on JobManager

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.3.0
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: Metrics, Webfrontend
    • Labels:
      None

      Description

      I'm using Flink 699f4b0.

      My JobManager log contains many of these log entries:

      2017-01-11 19:42:05,778 WARN  org.apache.flink.runtime.webmonitor.metrics.MetricFetcher     - Fetching metrics failed.
      akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/MetricQueryService#-970662317]] after [10000 ms]
      	at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
      	at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
      	at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
      	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
      	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
      	at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
      	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
      	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
      	at java.lang.Thread.run(Thread.java:745)
      2017-01-11 19:42:07,765 WARN  org.apache.flink.runtime.metrics.dump.MetricQueryService      - An exception occurred while processing a message.
      java.lang.NullPointerException
      	at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
      	at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
      	at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
      	at org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
      	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
      	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

        Issue Links

          Activity

          Hide
          Zentol Chesnay Schepler added a comment -

          There are 2 possible cases for this: Either the supplied gauge was null, or it was not null but the value it supplies is null. Neither of these cases are checked at the moment.

          Show
          Zentol Chesnay Schepler added a comment - There are 2 possible cases for this: Either the supplied gauge was null, or it was not null but the value it supplies is null. Neither of these cases are checked at the moment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/3103

          FLINK-5464 [metrics] Prevent some NPEs

          This PR prevents some NullPointerExceptions from occurring in the metric system.

          • When registering a metric that is null the metric is ignored, and a warning is logged.
          • i.e ```group.counter("counter", null);```
          • The MetricDumpSerialization completely ignores gauges if their value is null.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 5464_mqs_npe

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3103.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3103


          commit 0912848b3ce54842fc6810aa0b041db5547ac690
          Author: zentol <chesnay@apache.org>
          Date: 2017-01-12T11:41:56Z

          FLINK-5464 [metrics] Ignore metrics that are null

          commit 941c83a599221fc57c02605e2c3bc348d70aa8b2
          Author: zentol <chesnay@apache.org>
          Date: 2017-01-12T11:42:26Z

          FLINK-5464 [metrics] Prevent Gauge NPE in serialization


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3103 FLINK-5464 [metrics] Prevent some NPEs This PR prevents some NullPointerExceptions from occurring in the metric system. When registering a metric that is null the metric is ignored, and a warning is logged. i.e ```group.counter("counter", null);``` The MetricDumpSerialization completely ignores gauges if their value is null. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5464_mqs_npe Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3103.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3103 commit 0912848b3ce54842fc6810aa0b041db5547ac690 Author: zentol <chesnay@apache.org> Date: 2017-01-12T11:41:56Z FLINK-5464 [metrics] Ignore metrics that are null commit 941c83a599221fc57c02605e2c3bc348d70aa8b2 Author: zentol <chesnay@apache.org> Date: 2017-01-12T11:42:26Z FLINK-5464 [metrics] Prevent Gauge NPE in serialization
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3103

          cc @rmetzger

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3103 cc @rmetzger
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3103

          While testing this PR, I found that the jobmanager.log is now full with exceptions like this one:

          ```
          java.io.EOFException
          at java.io.DataInputStream.readInt(DataInputStream.java:392)
          at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeString(MetricDumpSerialization.java:230)
          at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeMetricInfo(MetricDumpSerialization.java:278)
          at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeGauge(MetricDumpSerialization.java:243)
          at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$800(MetricDumpSerialization.java:47)
          at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpDeserializer.deserialize(MetricDumpSerialization.java:214)
          at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:196)
          at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58)
          at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188)
          at akka.dispatch.OnSuccess.internal(Future.scala:212)
          at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
          at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
          at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
          at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25)
          at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)
          at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)
          at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
          at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265)
          at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
          at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
          at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
          at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
          at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3103 While testing this PR, I found that the jobmanager.log is now full with exceptions like this one: ``` java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeString(MetricDumpSerialization.java:230) at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeMetricInfo(MetricDumpSerialization.java:278) at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeGauge(MetricDumpSerialization.java:243) at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$800(MetricDumpSerialization.java:47) at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpDeserializer.deserialize(MetricDumpSerialization.java:214) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:196) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3103

          yeah I'll have to rethink this :/

          I wanted to just ignore gauges returning null (since nothing in the web-interface accounts for that case), but did not adjust the count of metrics that are submitted. urgh.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3103 yeah I'll have to rethink this :/ I wanted to just ignore gauges returning null (since nothing in the web-interface accounts for that case), but did not adjust the count of metrics that are submitted. urgh.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol closed the pull request at:

          https://github.com/apache/flink/pull/3103

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3103
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/3128

          FLINK-5464 Improve MetricDumpSerialization error handling

          Rework of #3103.

          The key change introduced in the previous PR remains; if a gauge returns null it is not serialized.

          However, I've extended the PR to harden the entire serialization process against exceptions. The major gain here is that a single failed serialization does no longer destroys the entire dump; instead it is simply omitted.

          In order to allow that I had to replace the ```OutputStream```s with a ```ByteBuffer```. The former doesn't really allow you to handle failures in between serialization steps, as you can't reset the stream in any way. The ```ByteBuffer``` is manually resized if a ```BufferOverflowException``` occurs.

          • ```MetricDump(De)Serializer#(de)serialize``` will no longer throw any exception but catch and log them instead
          • Exceptions during the serialization of a metric will cause that metric to be skipped.
          • added test for handling of gauge returning null
          • added test for manual resizing of backing array

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 5464_mqs_npe

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3128.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3128


          commit 8610a47c407afe2140cd4b5651ebc794ef3feec8
          Author: zentol <chesnay@apache.org>
          Date: 2017-01-12T11:41:56Z

          FLINK-5464 [metrics] Ignore metrics that are null

          commit 442c0a4dee002b73e5b86d6c7bb274484a8900ac
          Author: zentol <chesnay@apache.org>
          Date: 2017-01-16T10:25:58Z

          [hotfix] Remove unused variable in MetricDumpSerializerTest

          commit 0f813ebf53414b1b68c6dfe8e3e1dbc896054c36
          Author: zentol <chesnay@apache.org>
          Date: 2017-01-12T11:42:26Z

          FLINK-5464 [metrics] Improve MetricDumpSerialization exception handling


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3128 FLINK-5464 Improve MetricDumpSerialization error handling Rework of #3103. The key change introduced in the previous PR remains; if a gauge returns null it is not serialized. However, I've extended the PR to harden the entire serialization process against exceptions. The major gain here is that a single failed serialization does no longer destroys the entire dump; instead it is simply omitted. In order to allow that I had to replace the ```OutputStream```s with a ```ByteBuffer```. The former doesn't really allow you to handle failures in between serialization steps, as you can't reset the stream in any way. The ```ByteBuffer``` is manually resized if a ```BufferOverflowException``` occurs. ```MetricDump(De)Serializer#(de)serialize``` will no longer throw any exception but catch and log them instead Exceptions during the serialization of a metric will cause that metric to be skipped. added test for handling of gauge returning null added test for manual resizing of backing array You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5464_mqs_npe Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3128.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3128 commit 8610a47c407afe2140cd4b5651ebc794ef3feec8 Author: zentol <chesnay@apache.org> Date: 2017-01-12T11:41:56Z FLINK-5464 [metrics] Ignore metrics that are null commit 442c0a4dee002b73e5b86d6c7bb274484a8900ac Author: zentol <chesnay@apache.org> Date: 2017-01-16T10:25:58Z [hotfix] Remove unused variable in MetricDumpSerializerTest commit 0f813ebf53414b1b68c6dfe8e3e1dbc896054c36 Author: zentol <chesnay@apache.org> Date: 2017-01-12T11:42:26Z FLINK-5464 [metrics] Improve MetricDumpSerialization exception handling
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3128

          I've had an offline chat with @rmetzger and @uce. We agreed that using a ByteBuffer and resizing it manually was a bit undesirable.

          Instead we opted for the following approach:

          • use DataOutputSerializer instead of DataOutputStream; it is a bit more efficient of strings, which make up the majority of serialized data, and is also backed by a resizing array
          • restructure the serialize methods to be symmetric with the deserialize methods
          • Access the metric values before serializing anything and reduce them to primitives or strings. The assumption is that if this succeeds the following serialization will succeed; and can only fail due to critical errors that will prevent serialization completely or programming errors on our part.
          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3128 I've had an offline chat with @rmetzger and @uce. We agreed that using a ByteBuffer and resizing it manually was a bit undesirable. Instead we opted for the following approach: use DataOutputSerializer instead of DataOutputStream; it is a bit more efficient of strings, which make up the majority of serialized data, and is also backed by a resizing array restructure the serialize methods to be symmetric with the deserialize methods Access the metric values before serializing anything and reduce them to primitives or strings. The assumption is that if this succeeds the following serialization will succeed; and can only fail due to critical errors that will prevent serialization completely or programming errors on our part.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97314773

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -50,12 +51,27 @@
          private MetricDumpSerialization() {
          }

          + public static class MetricSerializationResult {
          + public final byte[] data;
          — End diff –

          Wonderning whether to call this `serializedMetrics`

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97314773 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { + public final byte[] data; — End diff – Wonderning whether to call this `serializedMetrics`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97316854

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
          *

          • @param data serialized metrics
          • @return A list containing the deserialized metrics.
          • * @throws IOException
            */
          • public List<MetricDump> deserialize(byte[] data) throws IOException {
          • ByteArrayInputStream bais = new ByteArrayInputStream(data);
          • DataInputStream dis = new DataInputStream(bais);
          • int numCounters = dis.readInt();
          • int numGauges = dis.readInt();
          • int numHistograms = dis.readInt();
          • int numMeters = dis.readInt();
            + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
              • End diff –

          In line 230, an empty line is missing before `class MetricDumpDeserializer`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97316854 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * @param data serialized metrics @return A list containing the deserialized metrics. * @throws IOException */ public List<MetricDump> deserialize(byte[] data) throws IOException { ByteArrayInputStream bais = new ByteArrayInputStream(data); DataInputStream dis = new DataInputStream(bais); int numCounters = dis.readInt(); int numGauges = dis.readInt(); int numHistograms = dis.readInt(); int numMeters = dis.readInt(); + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) { End diff – In line 230, an empty line is missing before `class MetricDumpDeserializer`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97315450

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -64,122 +80,148 @@ private MetricDumpSerialization() {

          • @param gauges gauges to serialize
          • @param histograms histograms to serialize
          • @return byte array containing the serialized metrics
          • * @throws IOException
            */
          • public byte[] serialize(
            + public MetricSerializationResult serialize(
            Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
            Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
            Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
          • Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException {
          • baos.reset();
          • dos.writeInt(counters.size());
          • dos.writeInt(gauges.size());
          • dos.writeInt(histograms.size());
          • dos.writeInt(meters.size());
            + Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
            +
            + buffer.clear();

          + int numCounters = 0;
          for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeCounter(dos, entry.getKey());
            + try { + serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numCounters++; + }

            catch (Exception e)

            { + LOG.warn("Failed to serialize counter.", e); + }

            }

          + int numGauges = 0;
          for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeGauge(dos, entry.getKey());
            + try { + serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numGauges++; + }

            catch (Exception e)

            { + LOG.warn("Failed to serialize gauge.", e); + }

            }

          + int numHistograms = 0;
          for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeHistogram(dos, entry.getKey());
            + try { + serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numHistograms++; + }

            catch (Exception e)

            { + LOG.warn("Failed to serialize histogram.", e); + }

            }

          + int numMeters = 0;
          for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeMeter(dos, entry.getKey());
            + try { + serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numMeters++; + }

            catch (Exception e)

            { + LOG.warn("Failed to serialize meter.", e); + }

            }

          • return baos.toByteArray();
            + return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms);
              • End diff –

          Empty line before return?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97315450 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -64,122 +80,148 @@ private MetricDumpSerialization() { @param gauges gauges to serialize @param histograms histograms to serialize @return byte array containing the serialized metrics * @throws IOException */ public byte[] serialize( + public MetricSerializationResult serialize( Map<Counter, Tuple2<QueryScopeInfo, String>> counters, Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges, Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms, Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException { baos.reset(); dos.writeInt(counters.size()); dos.writeInt(gauges.size()); dos.writeInt(histograms.size()); dos.writeInt(meters.size()); + Map<Meter, Tuple2<QueryScopeInfo, String>> meters) { + + buffer.clear(); + int numCounters = 0; for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeCounter(dos, entry.getKey()); + try { + serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numCounters++; + } catch (Exception e) { + LOG.warn("Failed to serialize counter.", e); + } } + int numGauges = 0; for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeGauge(dos, entry.getKey()); + try { + serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numGauges++; + } catch (Exception e) { + LOG.warn("Failed to serialize gauge.", e); + } } + int numHistograms = 0; for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeHistogram(dos, entry.getKey()); + try { + serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numHistograms++; + } catch (Exception e) { + LOG.warn("Failed to serialize histogram.", e); + } } + int numMeters = 0; for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeMeter(dos, entry.getKey()); + try { + serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numMeters++; + } catch (Exception e) { + LOG.warn("Failed to serialize meter.", e); + } } return baos.toByteArray(); + return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms); End diff – Empty line before return?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97315164

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -50,12 +51,27 @@
          private MetricDumpSerialization() {
          }

          + public static class MetricSerializationResult {
          + public final byte[] data;
          + public final int numCounters;
          + public final int numGauges;
          + public final int numMeters;
          + public final int numHistograms;
          +
          + public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms) {
          + this.data = data;
          — End diff –

          Let's add simple sanity checks `checkNotNull(data)` and `checkArgument(num* >= 0)` for the other fields.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97315164 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { + public final byte[] data; + public final int numCounters; + public final int numGauges; + public final int numMeters; + public final int numHistograms; + + public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms) { + this.data = data; — End diff – Let's add simple sanity checks `checkNotNull(data)` and `checkArgument(num* >= 0)` for the other fields.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97317434

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
          *

          • @param data serialized metrics
          • @return A list containing the deserialized metrics.
          • * @throws IOException
            */
          • public List<MetricDump> deserialize(byte[] data) throws IOException {
          • ByteArrayInputStream bais = new ByteArrayInputStream(data);
          • DataInputStream dis = new DataInputStream(bais);
          • int numCounters = dis.readInt();
          • int numGauges = dis.readInt();
          • int numHistograms = dis.readInt();
          • int numMeters = dis.readInt();
            + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
            + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length);
          • List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
            + List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
          • for (int x = 0; x < numCounters; x++) {
          • metrics.add(deserializeCounter(dis));
            + for (int x = 0; x < data.numCounters; x++)
            Unknown macro: { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + }
            }

            - for (int x = 0; x < numGauges; x++) {
            - metrics.add(deserializeGauge(dis));
            + for (int x = 0; x < data.numGauges; x++) {
            + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } }
          • for (int x = 0; x < numHistograms; x++) {
          • metrics.add(deserializeHistogram(dis));
            + for (int x = 0; x < data.numHistograms; x++)
            Unknown macro: { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + }
            }

            - for (int x = 0; x < numMeters; x++) {
            - metrics.add(deserializeMeter(dis));
            + for (int x = 0; x < data.numMeters; x++) {
            + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } }

            -

          • return metrics;
              • End diff –

          Code does not compile because of the missing return.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97317434 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * @param data serialized metrics @return A list containing the deserialized metrics. * @throws IOException */ public List<MetricDump> deserialize(byte[] data) throws IOException { ByteArrayInputStream bais = new ByteArrayInputStream(data); DataInputStream dis = new DataInputStream(bais); int numCounters = dis.readInt(); int numGauges = dis.readInt(); int numHistograms = dis.readInt(); int numMeters = dis.readInt(); + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) { + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length); List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters); for (int x = 0; x < numCounters; x++) { metrics.add(deserializeCounter(dis)); + for (int x = 0; x < data.numCounters; x++) Unknown macro: { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numGauges; x++) { - metrics.add(deserializeGauge(dis)); + for (int x = 0; x < data.numGauges; x++) { + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } for (int x = 0; x < numHistograms; x++) { metrics.add(deserializeHistogram(dis)); + for (int x = 0; x < data.numHistograms; x++) Unknown macro: { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numMeters; x++) { - metrics.add(deserializeMeter(dis)); + for (int x = 0; x < data.numMeters; x++) { + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - return metrics; End diff – Code does not compile because of the missing return.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97316605

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -64,122 +80,148 @@ private MetricDumpSerialization() {

          • @param gauges gauges to serialize
          • @param histograms histograms to serialize
          • @return byte array containing the serialized metrics
          • * @throws IOException
            */
          • public byte[] serialize(
            + public MetricSerializationResult serialize(
            Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
            Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
            Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
          • Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException {
          • baos.reset();
          • dos.writeInt(counters.size());
          • dos.writeInt(gauges.size());
          • dos.writeInt(histograms.size());
          • dos.writeInt(meters.size());
            + Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
            +
            + buffer.clear();

          + int numCounters = 0;
          for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeCounter(dos, entry.getKey());
            + try { + serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numCounters++; + }

            catch (Exception e)

            { + LOG.warn("Failed to serialize counter.", e); + }

            }

          + int numGauges = 0;
          for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeGauge(dos, entry.getKey());
            + try { + serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numGauges++; + }

            catch (Exception e)

            { + LOG.warn("Failed to serialize gauge.", e); + }

            }

          + int numHistograms = 0;
          for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeHistogram(dos, entry.getKey());
            + try { + serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numHistograms++; + }

            catch (Exception e)

            { + LOG.warn("Failed to serialize histogram.", e); + }

            }

          + int numMeters = 0;
          for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeMeter(dos, entry.getKey());
            + try { + serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numMeters++; + }

            catch (Exception e) {
            + LOG.warn("Failed to serialize meter.", e);

              • End diff –

          Should we decrease the log level to `debug` (here and the other lines)? The user won't be able to act on the warning and in most cases this should work and we don't risk printing out many log messages (as happened before a couple of times).

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97316605 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -64,122 +80,148 @@ private MetricDumpSerialization() { @param gauges gauges to serialize @param histograms histograms to serialize @return byte array containing the serialized metrics * @throws IOException */ public byte[] serialize( + public MetricSerializationResult serialize( Map<Counter, Tuple2<QueryScopeInfo, String>> counters, Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges, Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms, Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException { baos.reset(); dos.writeInt(counters.size()); dos.writeInt(gauges.size()); dos.writeInt(histograms.size()); dos.writeInt(meters.size()); + Map<Meter, Tuple2<QueryScopeInfo, String>> meters) { + + buffer.clear(); + int numCounters = 0; for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeCounter(dos, entry.getKey()); + try { + serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numCounters++; + } catch (Exception e) { + LOG.warn("Failed to serialize counter.", e); + } } + int numGauges = 0; for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeGauge(dos, entry.getKey()); + try { + serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numGauges++; + } catch (Exception e) { + LOG.warn("Failed to serialize gauge.", e); + } } + int numHistograms = 0; for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeHistogram(dos, entry.getKey()); + try { + serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numHistograms++; + } catch (Exception e) { + LOG.warn("Failed to serialize histogram.", e); + } } + int numMeters = 0; for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeMeter(dos, entry.getKey()); + try { + serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numMeters++; + } catch (Exception e) { + LOG.warn("Failed to serialize meter.", e); End diff – Should we decrease the log level to `debug` (here and the other lines)? The user won't be able to act on the warning and in most cases this should work and we don't risk printing out many log messages (as happened before a couple of times).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97315979

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -64,122 +80,148 @@ private MetricDumpSerialization() {

          • @param gauges gauges to serialize
          • @param histograms histograms to serialize
          • @return byte array containing the serialized metrics
          • * @throws IOException
            */
          • public byte[] serialize(
            + public MetricSerializationResult serialize(
            Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
            Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
            Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
          • Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException {
          • baos.reset();
          • dos.writeInt(counters.size());
          • dos.writeInt(gauges.size());
          • dos.writeInt(histograms.size());
          • dos.writeInt(meters.size());
            + Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
            +
            + buffer.clear();

          + int numCounters = 0;
          for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeCounter(dos, entry.getKey());
            + try { + serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numCounters++; + }

            catch (Exception e)

            { + LOG.warn("Failed to serialize counter.", e); + }

            }

          + int numGauges = 0;
          for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeGauge(dos, entry.getKey());
            + try { + serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numGauges++; + }

            catch (Exception e)

            { + LOG.warn("Failed to serialize gauge.", e); + }

            }

          + int numHistograms = 0;
          for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeHistogram(dos, entry.getKey());
            + try { + serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numHistograms++; + }

            catch (Exception e)

            { + LOG.warn("Failed to serialize histogram.", e); + }

            }

          + int numMeters = 0;
          for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {

          • serializeMetricInfo(dos, entry.getValue().f0);
          • serializeString(dos, entry.getValue().f1);
          • serializeMeter(dos, entry.getKey());
            + try { + serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numMeters++; + }

            catch (Exception e)

            { + LOG.warn("Failed to serialize meter.", e); + }

            }

          • return baos.toByteArray();
            + return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms);
            }

          public void close() {

          • try { - dos.close(); - }

            catch (Exception e)

            { - LOG.debug("Failed to close OutputStream.", e); - }
            - try { - baos.close(); - } catch (Exception e) { - LOG.debug("Failed to close OutputStream.", e); - }

            + buffer = null;
            }
            }

          • private static void serializeMetricInfo(DataOutputStream dos, QueryScopeInfo info) throws IOException {
          • serializeString(dos, info.scope);
          • dos.writeByte(info.getCategory());
            + private static void serializeMetricInfo(DataOutput out, QueryScopeInfo info) throws IOException
            Unknown macro: { + out.writeUTF(info.scope); + out.writeByte(info.getCategory()); switch (info.getCategory()) { case INFO_CATEGORY_JM: break; case INFO_CATEGORY_TM: String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; - serializeString(dos, tmID); + out.writeUTF(tmID); break; case INFO_CATEGORY_JOB: QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; - serializeString(dos, jobInfo.jobID); + out.writeUTF(jobInfo.jobID); break; case INFO_CATEGORY_TASK: QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; - serializeString(dos, taskInfo.jobID); - serializeString(dos, taskInfo.vertexID); - dos.writeInt(taskInfo.subtaskIndex); + out.writeUTF(taskInfo.jobID); + out.writeUTF(taskInfo.vertexID); + out.writeInt(taskInfo.subtaskIndex); break; case INFO_CATEGORY_OPERATOR: QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; - serializeString(dos, operatorInfo.jobID); - serializeString(dos, operatorInfo.vertexID); - dos.writeInt(operatorInfo.subtaskIndex); - serializeString(dos, operatorInfo.operatorName); + out.writeUTF(operatorInfo.jobID); + out.writeUTF(operatorInfo.vertexID); + out.writeInt(operatorInfo.subtaskIndex); + out.writeUTF(operatorInfo.operatorName); break; + default: + throw new IOException("Unknown scope category: " + info.getCategory()); } }
          • private static void serializeString(DataOutputStream dos, String string) throws IOException {
          • byte[] bytes = string.getBytes();
          • dos.writeInt(bytes.length);
          • dos.write(bytes);
            + private static void serializeCounter(DataOutput out, QueryScopeInfo info, String name, Counter counter) throws IOException { + long count = counter.getCount(); + serializeMetricInfo(out, info); + out.writeUTF(name); + out.writeLong(count); }
          • private static void serializeCounter(DataOutputStream dos, Counter counter) throws IOException { - dos.writeLong(counter.getCount()); - }

            -

          • private static void serializeGauge(DataOutputStream dos, Gauge<?> gauge) throws IOException {
          • serializeString(dos, gauge.getValue().toString());
            + private static void serializeGauge(DataOutput out, QueryScopeInfo info, String name, Gauge<?> gauge) throws IOException
            Unknown macro: { + Object value = gauge.getValue(); + if (value == null) { + throw new NullPointerException("Value returned by gauge " + name + " was null."); + } + String stringValue = gauge.getValue().toString(); + if (stringValue == null) { + throw new NullPointerException("toString() of the value returned by gauge " + name + " returned null."); + } + serializeMetricInfo(out, info); + out.writeUTF(name); + out.writeUTF(stringValue); }
          • private static void serializeHistogram(DataOutputStream dos, Histogram histogram) throws IOException {
            + private static void serializeHistogram(DataOutput out, QueryScopeInfo info, String name, Histogram histogram) throws IOException {
            HistogramStatistics stat = histogram.getStatistics();
            -
          • dos.writeLong(stat.getMin());
          • dos.writeLong(stat.getMax());
          • dos.writeDouble(stat.getMean());
          • dos.writeDouble(stat.getQuantile(0.5));
          • dos.writeDouble(stat.getStdDev());
          • dos.writeDouble(stat.getQuantile(0.75));
          • dos.writeDouble(stat.getQuantile(0.90));
          • dos.writeDouble(stat.getQuantile(0.95));
          • dos.writeDouble(stat.getQuantile(0.98));
          • dos.writeDouble(stat.getQuantile(0.99));
          • dos.writeDouble(stat.getQuantile(0.999));
            + long min = stat.getMin();
            + long max = stat.getMax();
            + double mean = stat.getMean();
            + double mediam = stat.getQuantile(0.5);
              • End diff –

          Typo `mediam` -> `median`

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97315979 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -64,122 +80,148 @@ private MetricDumpSerialization() { @param gauges gauges to serialize @param histograms histograms to serialize @return byte array containing the serialized metrics * @throws IOException */ public byte[] serialize( + public MetricSerializationResult serialize( Map<Counter, Tuple2<QueryScopeInfo, String>> counters, Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges, Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms, Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException { baos.reset(); dos.writeInt(counters.size()); dos.writeInt(gauges.size()); dos.writeInt(histograms.size()); dos.writeInt(meters.size()); + Map<Meter, Tuple2<QueryScopeInfo, String>> meters) { + + buffer.clear(); + int numCounters = 0; for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeCounter(dos, entry.getKey()); + try { + serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numCounters++; + } catch (Exception e) { + LOG.warn("Failed to serialize counter.", e); + } } + int numGauges = 0; for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeGauge(dos, entry.getKey()); + try { + serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numGauges++; + } catch (Exception e) { + LOG.warn("Failed to serialize gauge.", e); + } } + int numHistograms = 0; for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeHistogram(dos, entry.getKey()); + try { + serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numHistograms++; + } catch (Exception e) { + LOG.warn("Failed to serialize histogram.", e); + } } + int numMeters = 0; for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) { serializeMetricInfo(dos, entry.getValue().f0); serializeString(dos, entry.getValue().f1); serializeMeter(dos, entry.getKey()); + try { + serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numMeters++; + } catch (Exception e) { + LOG.warn("Failed to serialize meter.", e); + } } return baos.toByteArray(); + return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms); } public void close() { try { - dos.close(); - } catch (Exception e) { - LOG.debug("Failed to close OutputStream.", e); - } - try { - baos.close(); - } catch (Exception e) { - LOG.debug("Failed to close OutputStream.", e); - } + buffer = null; } } private static void serializeMetricInfo(DataOutputStream dos, QueryScopeInfo info) throws IOException { serializeString(dos, info.scope); dos.writeByte(info.getCategory()); + private static void serializeMetricInfo(DataOutput out, QueryScopeInfo info) throws IOException Unknown macro: { + out.writeUTF(info.scope); + out.writeByte(info.getCategory()); switch (info.getCategory()) { case INFO_CATEGORY_JM: break; case INFO_CATEGORY_TM: String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; - serializeString(dos, tmID); + out.writeUTF(tmID); break; case INFO_CATEGORY_JOB: QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; - serializeString(dos, jobInfo.jobID); + out.writeUTF(jobInfo.jobID); break; case INFO_CATEGORY_TASK: QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; - serializeString(dos, taskInfo.jobID); - serializeString(dos, taskInfo.vertexID); - dos.writeInt(taskInfo.subtaskIndex); + out.writeUTF(taskInfo.jobID); + out.writeUTF(taskInfo.vertexID); + out.writeInt(taskInfo.subtaskIndex); break; case INFO_CATEGORY_OPERATOR: QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; - serializeString(dos, operatorInfo.jobID); - serializeString(dos, operatorInfo.vertexID); - dos.writeInt(operatorInfo.subtaskIndex); - serializeString(dos, operatorInfo.operatorName); + out.writeUTF(operatorInfo.jobID); + out.writeUTF(operatorInfo.vertexID); + out.writeInt(operatorInfo.subtaskIndex); + out.writeUTF(operatorInfo.operatorName); break; + default: + throw new IOException("Unknown scope category: " + info.getCategory()); } } private static void serializeString(DataOutputStream dos, String string) throws IOException { byte[] bytes = string.getBytes(); dos.writeInt(bytes.length); dos.write(bytes); + private static void serializeCounter(DataOutput out, QueryScopeInfo info, String name, Counter counter) throws IOException { + long count = counter.getCount(); + serializeMetricInfo(out, info); + out.writeUTF(name); + out.writeLong(count); } private static void serializeCounter(DataOutputStream dos, Counter counter) throws IOException { - dos.writeLong(counter.getCount()); - } - private static void serializeGauge(DataOutputStream dos, Gauge<?> gauge) throws IOException { serializeString(dos, gauge.getValue().toString()); + private static void serializeGauge(DataOutput out, QueryScopeInfo info, String name, Gauge<?> gauge) throws IOException Unknown macro: { + Object value = gauge.getValue(); + if (value == null) { + throw new NullPointerException("Value returned by gauge " + name + " was null."); + } + String stringValue = gauge.getValue().toString(); + if (stringValue == null) { + throw new NullPointerException("toString() of the value returned by gauge " + name + " returned null."); + } + serializeMetricInfo(out, info); + out.writeUTF(name); + out.writeUTF(stringValue); } private static void serializeHistogram(DataOutputStream dos, Histogram histogram) throws IOException { + private static void serializeHistogram(DataOutput out, QueryScopeInfo info, String name, Histogram histogram) throws IOException { HistogramStatistics stat = histogram.getStatistics(); - dos.writeLong(stat.getMin()); dos.writeLong(stat.getMax()); dos.writeDouble(stat.getMean()); dos.writeDouble(stat.getQuantile(0.5)); dos.writeDouble(stat.getStdDev()); dos.writeDouble(stat.getQuantile(0.75)); dos.writeDouble(stat.getQuantile(0.90)); dos.writeDouble(stat.getQuantile(0.95)); dos.writeDouble(stat.getQuantile(0.98)); dos.writeDouble(stat.getQuantile(0.99)); dos.writeDouble(stat.getQuantile(0.999)); + long min = stat.getMin(); + long max = stat.getMax(); + double mean = stat.getMean(); + double mediam = stat.getQuantile(0.5); End diff – Typo `mediam` -> `median`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97314569

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -17,19 +17,20 @@
          */
          package org.apache.flink.runtime.metrics.dump;

          -import org.apache.commons.io.output.ByteArrayOutputStream;
          import org.apache.flink.api.java.tuple.Tuple2;
          +import org.apache.flink.core.memory.DataInputView;
          import org.apache.flink.metrics.Counter;
          import org.apache.flink.metrics.Gauge;
          import org.apache.flink.metrics.Histogram;
          import org.apache.flink.metrics.HistogramStatistics;
          import org.apache.flink.metrics.Meter;
          +import org.apache.flink.runtime.util.DataInputDeserializer;
          +import org.apache.flink.runtime.util.DataOutputSerializer;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;

          -import java.io.ByteArrayInputStream;
          -import java.io.DataInputStream;
          -import java.io.DataOutputStream;
          +import java.io.DataInput;
          +import java.io.DataOutput;
          import java.io.IOException;
          import java.util.ArrayList;
          import java.util.List;
          — End diff –

          In line 49 (above the `LOG` field) an empty line is missing

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97314569 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -17,19 +17,20 @@ */ package org.apache.flink.runtime.metrics.dump; -import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.Meter; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.runtime.util.DataOutputSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; — End diff – In line 49 (above the `LOG` field) an empty line is missing
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97316951

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
          *

          • @param data serialized metrics
          • @return A list containing the deserialized metrics.
          • * @throws IOException
            */
          • public List<MetricDump> deserialize(byte[] data) throws IOException {
          • ByteArrayInputStream bais = new ByteArrayInputStream(data);
          • DataInputStream dis = new DataInputStream(bais);
          • int numCounters = dis.readInt();
          • int numGauges = dis.readInt();
          • int numHistograms = dis.readInt();
          • int numMeters = dis.readInt();
            + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
              • End diff –

          Furthermore, after that line. And there is an empty line after the JavaDoc of `deserialize`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97316951 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * @param data serialized metrics @return A list containing the deserialized metrics. * @throws IOException */ public List<MetricDump> deserialize(byte[] data) throws IOException { ByteArrayInputStream bais = new ByteArrayInputStream(data); DataInputStream dis = new DataInputStream(bais); int numCounters = dis.readInt(); int numGauges = dis.readInt(); int numHistograms = dis.readInt(); int numMeters = dis.readInt(); + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) { End diff – Furthermore, after that line. And there is an empty line after the JavaDoc of `deserialize`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97317481

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
          *

          • @param data serialized metrics
          • @return A list containing the deserialized metrics.
          • * @throws IOException
            */
          • public List<MetricDump> deserialize(byte[] data) throws IOException {
          • ByteArrayInputStream bais = new ByteArrayInputStream(data);
          • DataInputStream dis = new DataInputStream(bais);
          • int numCounters = dis.readInt();
          • int numGauges = dis.readInt();
          • int numHistograms = dis.readInt();
          • int numMeters = dis.readInt();
            + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
            + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length);
          • List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
            + List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
          • for (int x = 0; x < numCounters; x++) {
          • metrics.add(deserializeCounter(dis));
            + for (int x = 0; x < data.numCounters; x++)
            Unknown macro: { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + }
            }

            - for (int x = 0; x < numGauges; x++) {
            - metrics.add(deserializeGauge(dis));
            + for (int x = 0; x < data.numGauges; x++) {
            + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } }
          • for (int x = 0; x < numHistograms; x++) {
          • metrics.add(deserializeHistogram(dis));
            + for (int x = 0; x < data.numHistograms; x++)
            Unknown macro: { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + }
            }

            - for (int x = 0; x < numMeters; x++) {
            - metrics.add(deserializeMeter(dis));
            + for (int x = 0; x < data.numMeters; x++) {
            + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } }

            -

          • return metrics;
            }
            }
          • private static String deserializeString(DataInputStream dis) throws IOException { - int stringLength = dis.readInt(); - byte[] bytes = new byte[stringLength]; - dis.readFully(bytes); - return new String(bytes); - }
          • private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException {
            + private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException {
            QueryScopeInfo scope = deserializeMetricInfo(dis);
          • String name = deserializeString(dis);
          • return new MetricDump.CounterDump(scope, name, dis.readLong());
            + String name = dis.readUTF();
            + long count = dis.readLong();
            + return new MetricDump.CounterDump(scope, name, count);
              • End diff –

          Should we add an empty line before the `return`s?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97317481 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * @param data serialized metrics @return A list containing the deserialized metrics. * @throws IOException */ public List<MetricDump> deserialize(byte[] data) throws IOException { ByteArrayInputStream bais = new ByteArrayInputStream(data); DataInputStream dis = new DataInputStream(bais); int numCounters = dis.readInt(); int numGauges = dis.readInt(); int numHistograms = dis.readInt(); int numMeters = dis.readInt(); + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) { + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length); List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters); for (int x = 0; x < numCounters; x++) { metrics.add(deserializeCounter(dis)); + for (int x = 0; x < data.numCounters; x++) Unknown macro: { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numGauges; x++) { - metrics.add(deserializeGauge(dis)); + for (int x = 0; x < data.numGauges; x++) { + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } for (int x = 0; x < numHistograms; x++) { metrics.add(deserializeHistogram(dis)); + for (int x = 0; x < data.numHistograms; x++) Unknown macro: { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numMeters; x++) { - metrics.add(deserializeMeter(dis)); + for (int x = 0; x < data.numMeters; x++) { + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - return metrics; } } private static String deserializeString(DataInputStream dis) throws IOException { - int stringLength = dis.readInt(); - byte[] bytes = new byte[stringLength]; - dis.readFully(bytes); - return new String(bytes); - } private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException { + private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException { QueryScopeInfo scope = deserializeMetricInfo(dis); String name = deserializeString(dis); return new MetricDump.CounterDump(scope, name, dis.readLong()); + String name = dis.readUTF(); + long count = dis.readLong(); + return new MetricDump.CounterDump(scope, name, count); End diff – Should we add an empty line before the `return`s?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97315205

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -50,12 +51,27 @@
          private MetricDumpSerialization() {
          }

          + public static class MetricSerializationResult {
          + public final byte[] data;
          + public final int numCounters;
          + public final int numGauges;
          + public final int numMeters;
          + public final int numHistograms;
          +
          + public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms)

          { + this.data = data; + this.numCounters = numCounters; + this.numGauges = numGauges; + this.numMeters = numMeters; + this.numHistograms = numHistograms; + }

          + }
          +
          //-------------------------------------------------------------------------
          // Serialization
          //-------------------------------------------------------------------------
          public static class MetricDumpSerializer {

          • private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
          • private DataOutputStream dos = new DataOutputStream(baos);
            + private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
              • End diff –

          Empty line missing before this line.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97315205 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { + public final byte[] data; + public final int numCounters; + public final int numGauges; + public final int numMeters; + public final int numHistograms; + + public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms) { + this.data = data; + this.numCounters = numCounters; + this.numGauges = numGauges; + this.numMeters = numMeters; + this.numHistograms = numHistograms; + } + } + //------------------------------------------------------------------------- // Serialization //------------------------------------------------------------------------- public static class MetricDumpSerializer { private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); private DataOutputStream dos = new DataOutputStream(baos); + private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32); End diff – Empty line missing before this line.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97319035

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -50,12 +51,27 @@
          private MetricDumpSerialization() {
          }

          + public static class MetricSerializationResult {
          — End diff –

          This is not `Serializable` and would fail when send with Akka. The following test fails:
          ```java
          @Test
          public void testJavaSerialization() throws IOException

          { MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); final ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); final ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(serializer.serialize( new HashMap<Counter, Tuple2<QueryScopeInfo,String>>(), new HashMap<Gauge<?>, Tuple2<QueryScopeInfo,String>>(), new HashMap<Histogram, Tuple2<QueryScopeInfo,String>>(), new HashMap<Meter, Tuple2<QueryScopeInfo,String>>())); }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97319035 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { — End diff – This is not `Serializable` and would fail when send with Akka. The following test fails: ```java @Test public void testJavaSerialization() throws IOException { MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); final ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); final ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(serializer.serialize( new HashMap<Counter, Tuple2<QueryScopeInfo,String>>(), new HashMap<Gauge<?>, Tuple2<QueryScopeInfo,String>>(), new HashMap<Histogram, Tuple2<QueryScopeInfo,String>>(), new HashMap<Meter, Tuple2<QueryScopeInfo,String>>())); } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97314949

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -50,12 +51,27 @@
          private MetricDumpSerialization() {
          }

          + public static class MetricSerializationResult {
          — End diff –

          Can you add a short class level comment about why we added this? The problem with determining number of metrics before hand etc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97314949 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { — End diff – Can you add a short class level comment about why we added this? The problem with determining number of metrics before hand etc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97316717

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -50,12 +51,27 @@
          private MetricDumpSerialization() {
          }

          + public static class MetricSerializationResult {
          + public final byte[] data;
          + public final int numCounters;
          + public final int numGauges;
          + public final int numMeters;
          + public final int numHistograms;
          +
          + public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms) {
          — End diff –

          Let's decrease the visibility of the constructor as much as possible.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97316717 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { + public final byte[] data; + public final int numCounters; + public final int numGauges; + public final int numMeters; + public final int numHistograms; + + public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms) { — End diff – Let's decrease the visibility of the constructor as much as possible.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97327512

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
          *

          • @param data serialized metrics
          • @return A list containing the deserialized metrics.
          • * @throws IOException
            */
          • public List<MetricDump> deserialize(byte[] data) throws IOException {
          • ByteArrayInputStream bais = new ByteArrayInputStream(data);
          • DataInputStream dis = new DataInputStream(bais);
          • int numCounters = dis.readInt();
          • int numGauges = dis.readInt();
          • int numHistograms = dis.readInt();
          • int numMeters = dis.readInt();
            + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
            + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length);
          • List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
            + List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
          • for (int x = 0; x < numCounters; x++) {
          • metrics.add(deserializeCounter(dis));
            + for (int x = 0; x < data.numCounters; x++)
            Unknown macro: { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + }
            }

            - for (int x = 0; x < numGauges; x++) {
            - metrics.add(deserializeGauge(dis));
            + for (int x = 0; x < data.numGauges; x++) {
            + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } }
          • for (int x = 0; x < numHistograms; x++) {
          • metrics.add(deserializeHistogram(dis));
            + for (int x = 0; x < data.numHistograms; x++)
            Unknown macro: { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + }
            }

            - for (int x = 0; x < numMeters; x++) {
            - metrics.add(deserializeMeter(dis));
            + for (int x = 0; x < data.numMeters; x++) {
            + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } }

            -

          • return metrics;
              • End diff –

          It appears i forgot to push some of the last-minute fixes :/

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97327512 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * @param data serialized metrics @return A list containing the deserialized metrics. * @throws IOException */ public List<MetricDump> deserialize(byte[] data) throws IOException { ByteArrayInputStream bais = new ByteArrayInputStream(data); DataInputStream dis = new DataInputStream(bais); int numCounters = dis.readInt(); int numGauges = dis.readInt(); int numHistograms = dis.readInt(); int numMeters = dis.readInt(); + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) { + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length); List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters); for (int x = 0; x < numCounters; x++) { metrics.add(deserializeCounter(dis)); + for (int x = 0; x < data.numCounters; x++) Unknown macro: { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numGauges; x++) { - metrics.add(deserializeGauge(dis)); + for (int x = 0; x < data.numGauges; x++) { + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } for (int x = 0; x < numHistograms; x++) { metrics.add(deserializeHistogram(dis)); + for (int x = 0; x < data.numHistograms; x++) Unknown macro: { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numMeters; x++) { - metrics.add(deserializeMeter(dis)); + for (int x = 0; x < data.numMeters; x++) { + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - return metrics; End diff – It appears i forgot to push some of the last-minute fixes :/
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97327675

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE
          *

          • @param data serialized metrics
          • @return A list containing the deserialized metrics.
          • * @throws IOException
            */
          • public List<MetricDump> deserialize(byte[] data) throws IOException {
          • ByteArrayInputStream bais = new ByteArrayInputStream(data);
          • DataInputStream dis = new DataInputStream(bais);
          • int numCounters = dis.readInt();
          • int numGauges = dis.readInt();
          • int numHistograms = dis.readInt();
          • int numMeters = dis.readInt();
            + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
            + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length);
          • List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms);
            + List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
          • for (int x = 0; x < numCounters; x++) {
          • metrics.add(deserializeCounter(dis));
            + for (int x = 0; x < data.numCounters; x++)
            Unknown macro: { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + }
            }

            - for (int x = 0; x < numGauges; x++) {
            - metrics.add(deserializeGauge(dis));
            + for (int x = 0; x < data.numGauges; x++) {
            + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } }
          • for (int x = 0; x < numHistograms; x++) {
          • metrics.add(deserializeHistogram(dis));
            + for (int x = 0; x < data.numHistograms; x++)
            Unknown macro: { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + }
            }

            - for (int x = 0; x < numMeters; x++) {
            - metrics.add(deserializeMeter(dis));
            + for (int x = 0; x < data.numMeters; x++) {
            + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } }

            -

          • return metrics;
            }
            }
          • private static String deserializeString(DataInputStream dis) throws IOException { - int stringLength = dis.readInt(); - byte[] bytes = new byte[stringLength]; - dis.readFully(bytes); - return new String(bytes); - }
          • private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException {
            + private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException {
            QueryScopeInfo scope = deserializeMetricInfo(dis);
          • String name = deserializeString(dis);
          • return new MetricDump.CounterDump(scope, name, dis.readLong());
            + String name = dis.readUTF();
            + long count = dis.readLong();
            + return new MetricDump.CounterDump(scope, name, count);
              • End diff –

          Personally for short methods i think it's overkill. I would do it for methods like `deserializaHistogram` though.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97327675 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * @param data serialized metrics @return A list containing the deserialized metrics. * @throws IOException */ public List<MetricDump> deserialize(byte[] data) throws IOException { ByteArrayInputStream bais = new ByteArrayInputStream(data); DataInputStream dis = new DataInputStream(bais); int numCounters = dis.readInt(); int numGauges = dis.readInt(); int numHistograms = dis.readInt(); int numMeters = dis.readInt(); + public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) { + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length); List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters); for (int x = 0; x < numCounters; x++) { metrics.add(deserializeCounter(dis)); + for (int x = 0; x < data.numCounters; x++) Unknown macro: { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numGauges; x++) { - metrics.add(deserializeGauge(dis)); + for (int x = 0; x < data.numGauges; x++) { + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } for (int x = 0; x < numHistograms; x++) { metrics.add(deserializeHistogram(dis)); + for (int x = 0; x < data.numHistograms; x++) Unknown macro: { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numMeters; x++) { - metrics.add(deserializeMeter(dis)); + for (int x = 0; x < data.numMeters; x++) { + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - return metrics; } } private static String deserializeString(DataInputStream dis) throws IOException { - int stringLength = dis.readInt(); - byte[] bytes = new byte[stringLength]; - dis.readFully(bytes); - return new String(bytes); - } private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException { + private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException { QueryScopeInfo scope = deserializeMetricInfo(dis); String name = deserializeString(dis); return new MetricDump.CounterDump(scope, name, dis.readLong()); + String name = dis.readUTF(); + long count = dis.readLong(); + return new MetricDump.CounterDump(scope, name, count); End diff – Personally for short methods i think it's overkill. I would do it for methods like `deserializaHistogram` though.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3128

          @uce I've addressed your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3128 @uce I've addressed your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3128#discussion_r97343787

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java —
          @@ -70,16 +93,26 @@ public MetricSerializationResult(byte[] data, int numCounters, int numGauges, in
          //-------------------------------------------------------------------------
          // Serialization
          //-------------------------------------------------------------------------
          +
          public static class MetricDumpSerializer {
          +
          private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);

          /**

          • Serializes the given metrics and returns the resulting byte array.
            + *
            + * Should a {@link Metric}

            accessed in this method throw an exception it will be omitted from the returned

              • End diff –

          Very nice

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97343787 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java — @@ -70,16 +93,26 @@ public MetricSerializationResult(byte[] data, int numCounters, int numGauges, in //------------------------------------------------------------------------- // Serialization //------------------------------------------------------------------------- + public static class MetricDumpSerializer { + private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32); /** Serializes the given metrics and returns the resulting byte array. + * + * Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned End diff – Very nice
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3128

          Merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3128 Merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol closed the pull request at:

          https://github.com/apache/flink/pull/3128

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3128
          Hide
          Zentol Chesnay Schepler added a comment -

          master: 77047244a1ca2456c2b9fdf7083dd3a7c3aba029 & a8e85a2d5abcaf2defab27be0027190ac3ecb5d5
          1.2: 30419ff4a173965a144bf6e44810cf0d17f3962d & b323f66a6b3861b42fb0ae0c5b7cb4163529525a

          Show
          Zentol Chesnay Schepler added a comment - master: 77047244a1ca2456c2b9fdf7083dd3a7c3aba029 & a8e85a2d5abcaf2defab27be0027190ac3ecb5d5 1.2: 30419ff4a173965a144bf6e44810cf0d17f3962d & b323f66a6b3861b42fb0ae0c5b7cb4163529525a

            People

            • Assignee:
              Zentol Chesnay Schepler
              Reporter:
              rmetzger Robert Metzger
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development