Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3427

Add watermark monitoring to JobManager web frontend

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: DataStream API, Webfrontend
    • Labels:
      None

      Description

      Currently, its quite hard to figure out issues with the watermarks.

      I think we can improve the situation by reporting the following information through the metrics system:

      • Report the current low watermark for each operator (this way, you can see if one operator is preventing the watermarks to rise)
      • Report the number of events arrived after the low watermark (users can see how accurate the watermarks are)

        Issue Links

          Activity

          Hide
          kkl0u Kostas Kloudas added a comment -

          I will not work on this one for now.
          Anybody who wants can take it.

          Show
          kkl0u Kostas Kloudas added a comment - I will not work on this one for now. Anybody who wants can take it.
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          I'll work on this if no one minds.

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - I'll work on this if no one minds.
          Hide
          rmetzger Robert Metzger added a comment -

          Hey Ivan Mushketyk, how do you plan to visualize the watermarks in the front end?

          Show
          rmetzger Robert Metzger added a comment - Hey Ivan Mushketyk , how do you plan to visualize the watermarks in the front end?
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hey Robert Metzger, my idea was to record a timestamp of the latest watermark on every vertex and display timestamps in the "Subtask" panel with other metrics (here is a panel I am talking about: https://cloud.githubusercontent.com/assets/592286/18147690/c287947e-6fcd-11e6-8968-ff47283635b7.png).

          On the other hand, we could enhance the graph representation and display watermarks and number of events for every vertex there. Maybe something similar to Google Dataflow UI: https://youtu.be/TnLiEWglqHk?t=22m4s

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hey Robert Metzger , my idea was to record a timestamp of the latest watermark on every vertex and display timestamps in the "Subtask" panel with other metrics (here is a panel I am talking about: https://cloud.githubusercontent.com/assets/592286/18147690/c287947e-6fcd-11e6-8968-ff47283635b7.png ). On the other hand, we could enhance the graph representation and display watermarks and number of events for every vertex there. Maybe something similar to Google Dataflow UI: https://youtu.be/TnLiEWglqHk?t=22m4s
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Robert Metzger If this will help in the discussing was planning to add two new metric gauges for last watermal and number of events.

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Robert Metzger If this will help in the discussing was planning to add two new metric gauges for last watermal and number of events.
          Hide
          rmetzger Robert Metzger added a comment -

          Afaik there is already a gauge for getting the low watermark.

          Show
          rmetzger Robert Metzger added a comment - Afaik there is already a gauge for getting the low watermark.
          Hide
          rmetzger Robert Metzger added a comment -

          Ivan Mushketyk did you already start working on this issue? I'm asking because I have somebody who wanted to implement this soon, and we have some concrete plans how to do it.
          If you're already half-way through with the implementation, you can get the task and finish it. However, if you did not start yet, I'm wondering whether it would be okay for you if I can take the task?

          I'm sorry that I didn't see your comment three days ago earlier.

          Show
          rmetzger Robert Metzger added a comment - Ivan Mushketyk did you already start working on this issue? I'm asking because I have somebody who wanted to implement this soon, and we have some concrete plans how to do it. If you're already half-way through with the implementation, you can get the task and finish it. However, if you did not start yet, I'm wondering whether it would be okay for you if I can take the task? I'm sorry that I didn't see your comment three days ago earlier.
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Robert Metzger, I not half-way through the task yet, so it's okay if you take the task. I'll work on something different.
          Just out of curiosity, how do you plan to implement it?

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Robert Metzger , I not half-way through the task yet, so it's okay if you take the task. I'll work on something different. Just out of curiosity, how do you plan to implement it?
          Hide
          rmetzger Robert Metzger added a comment -

          Hi Ivan,
          Sadly, the person who wanted to work on this decided not to do it. So if you are still interested, feel free to assign it to you again.

          So answer your question:
          I was thinking to add a new tab for “Event time”
          It will show the same plan as the “Plan” view, but with the low watermarks of each operator (We’ll compute the low watermark for the task in the web interface based on watermarks reported by the subtasks).
          There should also be a view to see the subtasks individual watermarks and the max watermark from the subtasks

          Show
          rmetzger Robert Metzger added a comment - Hi Ivan, Sadly, the person who wanted to work on this decided not to do it. So if you are still interested, feel free to assign it to you again. So answer your question: I was thinking to add a new tab for “Event time” It will show the same plan as the “Plan” view, but with the low watermarks of each operator (We’ll compute the low watermark for the task in the web interface based on watermarks reported by the subtasks). There should also be a view to see the subtasks individual watermarks and the max watermark from the subtasks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102174360

          — Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee —
          @@ -87,3 +87,27 @@ angular.module('flinkApp')

          .filter "percentage", ->
          (number) -> (number * 100).toFixed(0) + '%'
          +
          +.filter "parseWatermark", ->
          + (value) ->
          + if value <= -9223372036854776000
          + return 'No Watermark'
          + else
          + return value
          +
          +.filter "lowWatermark", ->
          + (watermarks, nodeid) ->
          + lowWatermark = "None"
          + if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length
          + values = (watermark.value for watermark in watermarks[nodeid])
          + lowWatermark = Math.min.apply(null, values)
          + if lowWatermark <= -9223372036854776000
          + lowWatermark = "No Watermark"
          + return lowWatermark
          +
          +.filter "watermarksByNode", ->
          + (watermarks, nodeid) ->
          + arr = []
          + if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length
          + arr = watermarks[nodeid]
          — End diff –

          Can we directly return `watermarks[nodeid]` here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102174360 — Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee — @@ -87,3 +87,27 @@ angular.module('flinkApp') .filter "percentage", -> (number) -> (number * 100).toFixed(0) + '%' + +.filter "parseWatermark", -> + (value) -> + if value <= -9223372036854776000 + return 'No Watermark' + else + return value + +.filter "lowWatermark", -> + (watermarks, nodeid) -> + lowWatermark = "None" + if watermarks != null && watermarks [nodeid] && watermarks [nodeid] .length + values = (watermark.value for watermark in watermarks [nodeid] ) + lowWatermark = Math.min.apply(null, values) + if lowWatermark <= -9223372036854776000 + lowWatermark = "No Watermark" + return lowWatermark + +.filter "watermarksByNode", -> + (watermarks, nodeid) -> + arr = [] + if watermarks != null && watermarks [nodeid] && watermarks [nodeid] .length + arr = watermarks [nodeid] — End diff – Can we directly return `watermarks [nodeid] ` here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102175486

          — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee —
          @@ -81,6 +78,51 @@ angular.module('flinkApp')
          JobsService.stopJob($stateParams.jobid).then (data) ->
          {}

          + loadJob = ()->
          — End diff –

          Do we call this function multiple times? If not, can we keep this as before?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102175486 — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee — @@ -81,6 +78,51 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + loadJob = ()-> — End diff – Do we call this function multiple times? If not, can we keep this as before?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102173431

          — Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade —
          @@ -0,0 +1,27 @@
          +//
          + Licensed to the Apache Software Foundation (ASF) under one
          + or more contributor license agreements. See the NOTICE file
          + distributed with this work for additional information
          + regarding copyright ownership. The ASF licenses this file
          + to you under the Apache License, Version 2.0 (the
          + "License"); you may not use this file except in compliance
          + with the License. You may obtain a copy of the License at
          +
          + http://www.apache.org/licenses/LICENSE-2.0
          +
          + Unless required by applicable law or agreed to in writing, software
          + distributed under the License is distributed on an "AS IS" BASIS,
          + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + See the License for the specific language governing permissions and
          + limitations under the License.
          +
          +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length")
          + thead
          + tr
          + th id
          — End diff –

          Should be upper case. Or let's call it "Subtask".

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102173431 — Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade — @@ -0,0 +1,27 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length") + thead + tr + th id — End diff – Should be upper case. Or let's call it "Subtask".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102174155

          — Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade —
          @@ -0,0 +1,27 @@
          +//
          + Licensed to the Apache Software Foundation (ASF) under one
          + or more contributor license agreements. See the NOTICE file
          + distributed with this work for additional information
          + regarding copyright ownership. The ASF licenses this file
          + to you under the Apache License, Version 2.0 (the
          + "License"); you may not use this file except in compliance
          + with the License. You may obtain a copy of the License at
          +
          + http://www.apache.org/licenses/LICENSE-2.0
          +
          + Unless required by applicable law or agreed to in writing, software
          + distributed under the License is distributed on an "AS IS" BASIS,
          + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + See the License for the specific language governing permissions and
          + limitations under the License.
          +
          +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length")
          — End diff –

          If I click on a node that has `None` as the low watermark (I think when no metric is exposed for the node) the sub task list still expands and is clickable. You get a grew background, but don't see anything:

          ![screen shot 2017-02-21 at 11 12 56](https://cloud.githubusercontent.com/assets/1756620/23160749/eadeaa4c-f827-11e6-8b8c-b7f07372c546.png)

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102174155 — Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade — @@ -0,0 +1,27 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length") — End diff – If I click on a node that has `None` as the low watermark (I think when no metric is exposed for the node) the sub task list still expands and is clickable. You get a grew background, but don't see anything: ! [screen shot 2017-02-21 at 11 12 56] ( https://cloud.githubusercontent.com/assets/1756620/23160749/eadeaa4c-f827-11e6-8b8c-b7f07372c546.png )
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102206279

          — Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee —
          @@ -87,3 +87,27 @@ angular.module('flinkApp')

          .filter "percentage", ->
          (number) -> (number * 100).toFixed(0) + '%'
          +
          +.filter "parseWatermark", ->
          + (value) ->
          + if value <= -9223372036854776000
          + return 'No Watermark'
          + else
          + return value
          +
          +.filter "lowWatermark", ->
          + (watermarks, nodeid) ->
          + lowWatermark = "None"
          + if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length
          + values = (watermark.value for watermark in watermarks[nodeid])
          + lowWatermark = Math.min.apply(null, values)
          + if lowWatermark <= -9223372036854776000
          + lowWatermark = "No Watermark"
          — End diff –

          I think this can be confusing to users, because with the logic below it sometimes says `No watermark` or `None` depending on whether the metric was available or not. I think we should stick to one of the two. The comment about the constant applies here as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102206279 — Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee — @@ -87,3 +87,27 @@ angular.module('flinkApp') .filter "percentage", -> (number) -> (number * 100).toFixed(0) + '%' + +.filter "parseWatermark", -> + (value) -> + if value <= -9223372036854776000 + return 'No Watermark' + else + return value + +.filter "lowWatermark", -> + (watermarks, nodeid) -> + lowWatermark = "None" + if watermarks != null && watermarks [nodeid] && watermarks [nodeid] .length + values = (watermark.value for watermark in watermarks [nodeid] ) + lowWatermark = Math.min.apply(null, values) + if lowWatermark <= -9223372036854776000 + lowWatermark = "No Watermark" — End diff – I think this can be confusing to users, because with the logic below it sometimes says `No watermark` or `None` depending on whether the metric was available or not. I think we should stick to one of the two. The comment about the constant applies here as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102205223

          — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee —
          @@ -81,6 +78,51 @@ angular.module('flinkApp')
          JobsService.stopJob($stateParams.jobid).then (data) ->
          {}

          + loadJob = ()->
          + JobsService.loadJob($stateParams.jobid).then (data) ->
          + $scope.job = data
          + $scope.vertices = data.vertices
          + $scope.plan = data.plan
          + MetricsService.setupMetrics($stateParams.jobid, data.vertices)
          +
          + getWatermarks = (nodes)->
          + deferred = $q.defer()
          + watermarks = {}
          + jid = $scope.job.jid
          + angular.forEach nodes, (node, index) =>
          + metricIds = []
          + for num in [0..node.parallelism - 1]
          + metricIds.push(num + ".currentLowWatermark")
          + MetricsService.getMetrics(jid, node.id, metricIds).then (data) ->
          + values = []
          + for key, value of data.values
          + values.push(id: key.replace('.currentLowWatermark', ''), value: value)
          + watermarks[node.id] = values
          + if index >= $scope.plan.nodes.length - 1
          + deferred.resolve(watermarks)
          + deferred.promise
          +
          + getLowWatermarks = (watermarks)->
          — End diff –

          Can we add a high level comment about what happens here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102205223 — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee — @@ -81,6 +78,51 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + loadJob = ()-> + JobsService.loadJob($stateParams.jobid).then (data) -> + $scope.job = data + $scope.vertices = data.vertices + $scope.plan = data.plan + MetricsService.setupMetrics($stateParams.jobid, data.vertices) + + getWatermarks = (nodes)-> + deferred = $q.defer() + watermarks = {} + jid = $scope.job.jid + angular.forEach nodes, (node, index) => + metricIds = [] + for num in [0..node.parallelism - 1] + metricIds.push(num + ".currentLowWatermark") + MetricsService.getMetrics(jid, node.id, metricIds).then (data) -> + values = [] + for key, value of data.values + values.push(id: key.replace('.currentLowWatermark', ''), value: value) + watermarks [node.id] = values + if index >= $scope.plan.nodes.length - 1 + deferred.resolve(watermarks) + deferred.promise + + getLowWatermarks = (watermarks)-> — End diff – Can we add a high level comment about what happens here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102208620

          — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee —
          @@ -81,6 +78,51 @@ angular.module('flinkApp')
          JobsService.stopJob($stateParams.jobid).then (data) ->
          {}

          + loadJob = ()->
          + JobsService.loadJob($stateParams.jobid).then (data) ->
          + $scope.job = data
          + $scope.vertices = data.vertices
          + $scope.plan = data.plan
          + MetricsService.setupMetrics($stateParams.jobid, data.vertices)
          +
          + getWatermarks = (nodes)->
          + deferred = $q.defer()
          + watermarks = {}
          + jid = $scope.job.jid
          + angular.forEach nodes, (node, index) =>
          + metricIds = []
          + for num in [0..node.parallelism - 1]
          + metricIds.push(num + ".currentLowWatermark")
          + MetricsService.getMetrics(jid, node.id, metricIds).then (data) ->
          + values = []
          + for key, value of data.values
          + values.push(id: key.replace('.currentLowWatermark', ''), value: value)
          + watermarks[node.id] = values
          + if index >= $scope.plan.nodes.length - 1
          — End diff –

          I'm wondering whether the idiomatic way to use promises to union them via `$q.all(promises)` and then fill watermarks when all are finished?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102208620 — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee — @@ -81,6 +78,51 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + loadJob = ()-> + JobsService.loadJob($stateParams.jobid).then (data) -> + $scope.job = data + $scope.vertices = data.vertices + $scope.plan = data.plan + MetricsService.setupMetrics($stateParams.jobid, data.vertices) + + getWatermarks = (nodes)-> + deferred = $q.defer() + watermarks = {} + jid = $scope.job.jid + angular.forEach nodes, (node, index) => + metricIds = [] + for num in [0..node.parallelism - 1] + metricIds.push(num + ".currentLowWatermark") + MetricsService.getMetrics(jid, node.id, metricIds).then (data) -> + values = [] + for key, value of data.values + values.push(id: key.replace('.currentLowWatermark', ''), value: value) + watermarks [node.id] = values + if index >= $scope.plan.nodes.length - 1 — End diff – I'm wondering whether the idiomatic way to use promises to union them via `$q.all(promises)` and then fill watermarks when all are finished?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102174918

          — Diff: flink-runtime-web/web-dashboard/app/scripts/index.coffee —
          @@ -30,7 +30,7 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])

          .value 'flinkConfig', {
          jobServer: ''

            1. jobServer: 'http://localhost:8081/'
              + # jobServer: 'http://localhost:8081/'
              • End diff –

          Can we keep the indentation as before? I think Angular is a little picky with the config here and if we uncomment this line with the new indentation it will not parse the jobServer value. I'm not sure about this. If you think it's not a problem, it's OK to leave it as is.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102174918 — Diff: flink-runtime-web/web-dashboard/app/scripts/index.coffee — @@ -30,7 +30,7 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'] ) .value 'flinkConfig', { jobServer: '' jobServer: 'http://localhost:8081/' + # jobServer: 'http://localhost:8081/' End diff – Can we keep the indentation as before? I think Angular is a little picky with the config here and if we uncomment this line with the new indentation it will not parse the jobServer value. I'm not sure about this. If you think it's not a problem, it's OK to leave it as is.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102205464

          — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee —
          @@ -81,6 +78,51 @@ angular.module('flinkApp')
          JobsService.stopJob($stateParams.jobid).then (data) ->
          {}

          + loadJob = ()->
          + JobsService.loadJob($stateParams.jobid).then (data) ->
          + $scope.job = data
          + $scope.vertices = data.vertices
          + $scope.plan = data.plan
          + MetricsService.setupMetrics($stateParams.jobid, data.vertices)
          +
          + getWatermarks = (nodes)->
          + deferred = $q.defer()
          + watermarks = {}
          + jid = $scope.job.jid
          + angular.forEach nodes, (node, index) =>
          + metricIds = []
          + for num in [0..node.parallelism - 1]
          + metricIds.push(num + ".currentLowWatermark")
          + MetricsService.getMetrics(jid, node.id, metricIds).then (data) ->
          + values = []
          + for key, value of data.values
          + values.push(id: key.replace('.currentLowWatermark', ''), value: value)
          + watermarks[node.id] = values
          + if index >= $scope.plan.nodes.length - 1
          + deferred.resolve(watermarks)
          + deferred.promise
          +
          + getLowWatermarks = (watermarks)->
          + lowWatermarks = []
          + for k,v of watermarks
          + minValue = Math.min.apply(null,(watermark.value for watermark in v))
          + lowWatermarks[k] = if minValue <= -9223372036854776000 || v.length == 0 then 'None' else minValue
          — End diff –

          The constant `-9223372036854776000 ` is different than the returned Java `Long.MIN_VALUE`, because of differences of the internal data representation for numbers in Java and JavaScript. I think it would be good to have this constant as an extra variable with a name and comment that describes what it is (maybe `NO_WATERMARK` or so?).

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102205464 — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee — @@ -81,6 +78,51 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + loadJob = ()-> + JobsService.loadJob($stateParams.jobid).then (data) -> + $scope.job = data + $scope.vertices = data.vertices + $scope.plan = data.plan + MetricsService.setupMetrics($stateParams.jobid, data.vertices) + + getWatermarks = (nodes)-> + deferred = $q.defer() + watermarks = {} + jid = $scope.job.jid + angular.forEach nodes, (node, index) => + metricIds = [] + for num in [0..node.parallelism - 1] + metricIds.push(num + ".currentLowWatermark") + MetricsService.getMetrics(jid, node.id, metricIds).then (data) -> + values = [] + for key, value of data.values + values.push(id: key.replace('.currentLowWatermark', ''), value: value) + watermarks [node.id] = values + if index >= $scope.plan.nodes.length - 1 + deferred.resolve(watermarks) + deferred.promise + + getLowWatermarks = (watermarks)-> + lowWatermarks = [] + for k,v of watermarks + minValue = Math.min.apply(null,(watermark.value for watermark in v)) + lowWatermarks [k] = if minValue <= -9223372036854776000 || v.length == 0 then 'None' else minValue — End diff – The constant `-9223372036854776000 ` is different than the returned Java `Long.MIN_VALUE`, because of differences of the internal data representation for numbers in Java and JavaScript. I think it would be good to have this constant as an extra variable with a name and comment that describes what it is (maybe `NO_WATERMARK` or so?).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102205187

          — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee —
          @@ -81,6 +78,51 @@ angular.module('flinkApp')
          JobsService.stopJob($stateParams.jobid).then (data) ->
          {}

          + loadJob = ()->
          + JobsService.loadJob($stateParams.jobid).then (data) ->
          + $scope.job = data
          + $scope.vertices = data.vertices
          + $scope.plan = data.plan
          + MetricsService.setupMetrics($stateParams.jobid, data.vertices)
          +
          + getWatermarks = (nodes)->
          — End diff –

          Can we add a high level comment about what happens here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102205187 — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee — @@ -81,6 +78,51 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + loadJob = ()-> + JobsService.loadJob($stateParams.jobid).then (data) -> + $scope.job = data + $scope.vertices = data.vertices + $scope.plan = data.plan + MetricsService.setupMetrics($stateParams.jobid, data.vertices) + + getWatermarks = (nodes)-> — End diff – Can we add a high level comment about what happens here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102175654

          — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee —
          @@ -318,3 +360,7 @@ angular.module('flinkApp')
          loadMetrics() if $scope.nodeid

          1. --------------------------------------
            +
            +.controller 'JobPlanWatermarksController', ($scope) ->
              • End diff –

          Why do we need this controller?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102175654 — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee — @@ -318,3 +360,7 @@ angular.module('flinkApp') loadMetrics() if $scope.nodeid -------------------------------------- + +.controller 'JobPlanWatermarksController', ($scope) -> End diff – Why do we need this controller?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nellboy commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102725620

          — Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade —
          @@ -0,0 +1,27 @@
          +//
          + Licensed to the Apache Software Foundation (ASF) under one
          + or more contributor license agreements. See the NOTICE file
          + distributed with this work for additional information
          + regarding copyright ownership. The ASF licenses this file
          + to you under the Apache License, Version 2.0 (the
          + "License"); you may not use this file except in compliance
          + with the License. You may obtain a copy of the License at
          +
          + http://www.apache.org/licenses/LICENSE-2.0
          +
          + Unless required by applicable law or agreed to in writing, software
          + distributed under the License is distributed on an "AS IS" BASIS,
          + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + See the License for the specific language governing permissions and
          + limitations under the License.
          +
          +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length")
          — End diff –

          Fixed with the latest push

          Show
          githubbot ASF GitHub Bot added a comment - Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102725620 — Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade — @@ -0,0 +1,27 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length") — End diff – Fixed with the latest push
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nellboy commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102725706

          — Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade —
          @@ -0,0 +1,27 @@
          +//
          + Licensed to the Apache Software Foundation (ASF) under one
          + or more contributor license agreements. See the NOTICE file
          + distributed with this work for additional information
          + regarding copyright ownership. The ASF licenses this file
          + to you under the Apache License, Version 2.0 (the
          + "License"); you may not use this file except in compliance
          + with the License. You may obtain a copy of the License at
          +
          + http://www.apache.org/licenses/LICENSE-2.0
          +
          + Unless required by applicable law or agreed to in writing, software
          + distributed under the License is distributed on an "AS IS" BASIS,
          + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + See the License for the specific language governing permissions and
          + limitations under the License.
          +
          +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length")
          + thead
          + tr
          + th id
          — End diff –

          I'm not sure to what we're referring to here. Can you clarify?

          Show
          githubbot ASF GitHub Bot added a comment - Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102725706 — Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade — @@ -0,0 +1,27 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length") + thead + tr + th id — End diff – I'm not sure to what we're referring to here. Can you clarify?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nellboy commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102725785

          — Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee —
          @@ -87,3 +87,27 @@ angular.module('flinkApp')

          .filter "percentage", ->
          (number) -> (number * 100).toFixed(0) + '%'
          +
          +.filter "parseWatermark", ->
          + (value) ->
          + if value <= -9223372036854776000
          + return 'No Watermark'
          + else
          + return value
          +
          +.filter "lowWatermark", ->
          + (watermarks, nodeid) ->
          + lowWatermark = "None"
          + if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length
          + values = (watermark.value for watermark in watermarks[nodeid])
          + lowWatermark = Math.min.apply(null, values)
          + if lowWatermark <= -9223372036854776000
          + lowWatermark = "No Watermark"
          — End diff –

          Fixed with latest push

          Show
          githubbot ASF GitHub Bot added a comment - Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102725785 — Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee — @@ -87,3 +87,27 @@ angular.module('flinkApp') .filter "percentage", -> (number) -> (number * 100).toFixed(0) + '%' + +.filter "parseWatermark", -> + (value) -> + if value <= -9223372036854776000 + return 'No Watermark' + else + return value + +.filter "lowWatermark", -> + (watermarks, nodeid) -> + lowWatermark = "None" + if watermarks != null && watermarks [nodeid] && watermarks [nodeid] .length + values = (watermark.value for watermark in watermarks [nodeid] ) + lowWatermark = Math.min.apply(null, values) + if lowWatermark <= -9223372036854776000 + lowWatermark = "No Watermark" — End diff – Fixed with latest push
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nellboy commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102726198

          — Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee —
          @@ -87,3 +87,27 @@ angular.module('flinkApp')

          .filter "percentage", ->
          (number) -> (number * 100).toFixed(0) + '%'
          +
          +.filter "parseWatermark", ->
          + (value) ->
          + if value <= -9223372036854776000
          + return 'No Watermark'
          + else
          + return value
          +
          +.filter "lowWatermark", ->
          + (watermarks, nodeid) ->
          + lowWatermark = "None"
          + if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length
          + values = (watermark.value for watermark in watermarks[nodeid])
          + lowWatermark = Math.min.apply(null, values)
          + if lowWatermark <= -9223372036854776000
          + lowWatermark = "No Watermark"
          + return lowWatermark
          +
          +.filter "watermarksByNode", ->
          + (watermarks, nodeid) ->
          + arr = []
          + if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length
          + arr = watermarks[nodeid]
          — End diff –

          no, because initially there are no watermarks, so we must check if they exist or not. Nonetheless I have refactored this function and moved it to jobs.ctrl.coffee

          Show
          githubbot ASF GitHub Bot added a comment - Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726198 — Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee — @@ -87,3 +87,27 @@ angular.module('flinkApp') .filter "percentage", -> (number) -> (number * 100).toFixed(0) + '%' + +.filter "parseWatermark", -> + (value) -> + if value <= -9223372036854776000 + return 'No Watermark' + else + return value + +.filter "lowWatermark", -> + (watermarks, nodeid) -> + lowWatermark = "None" + if watermarks != null && watermarks [nodeid] && watermarks [nodeid] .length + values = (watermark.value for watermark in watermarks [nodeid] ) + lowWatermark = Math.min.apply(null, values) + if lowWatermark <= -9223372036854776000 + lowWatermark = "No Watermark" + return lowWatermark + +.filter "watermarksByNode", -> + (watermarks, nodeid) -> + arr = [] + if watermarks != null && watermarks [nodeid] && watermarks [nodeid] .length + arr = watermarks [nodeid] — End diff – no, because initially there are no watermarks, so we must check if they exist or not. Nonetheless I have refactored this function and moved it to jobs.ctrl.coffee
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nellboy commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102726419

          — Diff: flink-runtime-web/web-dashboard/app/scripts/index.coffee —
          @@ -30,7 +30,7 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'])

          .value 'flinkConfig', {
          jobServer: ''

            1. jobServer: 'http://localhost:8081/'
              + # jobServer: 'http://localhost:8081/'
              • End diff –

          I updated it because I had the same problem with sublime ... It was easier for my workflow to indent it like this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726419 — Diff: flink-runtime-web/web-dashboard/app/scripts/index.coffee — @@ -30,7 +30,7 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists'] ) .value 'flinkConfig', { jobServer: '' jobServer: 'http://localhost:8081/' + # jobServer: 'http://localhost:8081/' End diff – I updated it because I had the same problem with sublime ... It was easier for my workflow to indent it like this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nellboy commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102726454

          — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee —
          @@ -81,6 +78,51 @@ angular.module('flinkApp')
          JobsService.stopJob($stateParams.jobid).then (data) ->
          {}

          + loadJob = ()->
          — End diff –

          Fixed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726454 — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee — @@ -81,6 +78,51 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + loadJob = ()-> — End diff – Fixed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nellboy commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102726588

          — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee —
          @@ -81,6 +78,51 @@ angular.module('flinkApp')
          JobsService.stopJob($stateParams.jobid).then (data) ->
          {}

          + loadJob = ()->
          + JobsService.loadJob($stateParams.jobid).then (data) ->
          + $scope.job = data
          + $scope.vertices = data.vertices
          + $scope.plan = data.plan
          + MetricsService.setupMetrics($stateParams.jobid, data.vertices)
          +
          + getWatermarks = (nodes)->
          + deferred = $q.defer()
          + watermarks = {}
          + jid = $scope.job.jid
          + angular.forEach nodes, (node, index) =>
          + metricIds = []
          + for num in [0..node.parallelism - 1]
          + metricIds.push(num + ".currentLowWatermark")
          + MetricsService.getMetrics(jid, node.id, metricIds).then (data) ->
          + values = []
          + for key, value of data.values
          + values.push(id: key.replace('.currentLowWatermark', ''), value: value)
          + watermarks[node.id] = values
          + if index >= $scope.plan.nodes.length - 1
          — End diff –

          I have not fixed this, will try to see if I can find a more elegant solution.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726588 — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee — @@ -81,6 +78,51 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + loadJob = ()-> + JobsService.loadJob($stateParams.jobid).then (data) -> + $scope.job = data + $scope.vertices = data.vertices + $scope.plan = data.plan + MetricsService.setupMetrics($stateParams.jobid, data.vertices) + + getWatermarks = (nodes)-> + deferred = $q.defer() + watermarks = {} + jid = $scope.job.jid + angular.forEach nodes, (node, index) => + metricIds = [] + for num in [0..node.parallelism - 1] + metricIds.push(num + ".currentLowWatermark") + MetricsService.getMetrics(jid, node.id, metricIds).then (data) -> + values = [] + for key, value of data.values + values.push(id: key.replace('.currentLowWatermark', ''), value: value) + watermarks [node.id] = values + if index >= $scope.plan.nodes.length - 1 — End diff – I have not fixed this, will try to see if I can find a more elegant solution.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nellboy commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102726602

          — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee —
          @@ -81,6 +78,51 @@ angular.module('flinkApp')
          JobsService.stopJob($stateParams.jobid).then (data) ->
          {}

          + loadJob = ()->
          + JobsService.loadJob($stateParams.jobid).then (data) ->
          + $scope.job = data
          + $scope.vertices = data.vertices
          + $scope.plan = data.plan
          + MetricsService.setupMetrics($stateParams.jobid, data.vertices)
          +
          + getWatermarks = (nodes)->
          + deferred = $q.defer()
          + watermarks = {}
          + jid = $scope.job.jid
          + angular.forEach nodes, (node, index) =>
          + metricIds = []
          + for num in [0..node.parallelism - 1]
          + metricIds.push(num + ".currentLowWatermark")
          + MetricsService.getMetrics(jid, node.id, metricIds).then (data) ->
          + values = []
          + for key, value of data.values
          + values.push(id: key.replace('.currentLowWatermark', ''), value: value)
          + watermarks[node.id] = values
          + if index >= $scope.plan.nodes.length - 1
          + deferred.resolve(watermarks)
          + deferred.promise
          +
          + getLowWatermarks = (watermarks)->
          — End diff –

          Done

          Show
          githubbot ASF GitHub Bot added a comment - Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726602 — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee — @@ -81,6 +78,51 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + loadJob = ()-> + JobsService.loadJob($stateParams.jobid).then (data) -> + $scope.job = data + $scope.vertices = data.vertices + $scope.plan = data.plan + MetricsService.setupMetrics($stateParams.jobid, data.vertices) + + getWatermarks = (nodes)-> + deferred = $q.defer() + watermarks = {} + jid = $scope.job.jid + angular.forEach nodes, (node, index) => + metricIds = [] + for num in [0..node.parallelism - 1] + metricIds.push(num + ".currentLowWatermark") + MetricsService.getMetrics(jid, node.id, metricIds).then (data) -> + values = [] + for key, value of data.values + values.push(id: key.replace('.currentLowWatermark', ''), value: value) + watermarks [node.id] = values + if index >= $scope.plan.nodes.length - 1 + deferred.resolve(watermarks) + deferred.promise + + getLowWatermarks = (watermarks)-> — End diff – Done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nellboy commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102726645

          — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee —
          @@ -81,6 +78,51 @@ angular.module('flinkApp')
          JobsService.stopJob($stateParams.jobid).then (data) ->
          {}

          + loadJob = ()->
          + JobsService.loadJob($stateParams.jobid).then (data) ->
          + $scope.job = data
          + $scope.vertices = data.vertices
          + $scope.plan = data.plan
          + MetricsService.setupMetrics($stateParams.jobid, data.vertices)
          +
          + getWatermarks = (nodes)->
          + deferred = $q.defer()
          + watermarks = {}
          + jid = $scope.job.jid
          + angular.forEach nodes, (node, index) =>
          + metricIds = []
          + for num in [0..node.parallelism - 1]
          + metricIds.push(num + ".currentLowWatermark")
          + MetricsService.getMetrics(jid, node.id, metricIds).then (data) ->
          + values = []
          + for key, value of data.values
          + values.push(id: key.replace('.currentLowWatermark', ''), value: value)
          + watermarks[node.id] = values
          + if index >= $scope.plan.nodes.length - 1
          + deferred.resolve(watermarks)
          + deferred.promise
          +
          + getLowWatermarks = (watermarks)->
          + lowWatermarks = []
          + for k,v of watermarks
          + minValue = Math.min.apply(null,(watermark.value for watermark in v))
          + lowWatermarks[k] = if minValue <= -9223372036854776000 || v.length == 0 then 'None' else minValue
          — End diff –

          Fixed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726645 — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee — @@ -81,6 +78,51 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + loadJob = ()-> + JobsService.loadJob($stateParams.jobid).then (data) -> + $scope.job = data + $scope.vertices = data.vertices + $scope.plan = data.plan + MetricsService.setupMetrics($stateParams.jobid, data.vertices) + + getWatermarks = (nodes)-> + deferred = $q.defer() + watermarks = {} + jid = $scope.job.jid + angular.forEach nodes, (node, index) => + metricIds = [] + for num in [0..node.parallelism - 1] + metricIds.push(num + ".currentLowWatermark") + MetricsService.getMetrics(jid, node.id, metricIds).then (data) -> + values = [] + for key, value of data.values + values.push(id: key.replace('.currentLowWatermark', ''), value: value) + watermarks [node.id] = values + if index >= $scope.plan.nodes.length - 1 + deferred.resolve(watermarks) + deferred.promise + + getLowWatermarks = (watermarks)-> + lowWatermarks = [] + for k,v of watermarks + minValue = Math.min.apply(null,(watermark.value for watermark in v)) + lowWatermarks [k] = if minValue <= -9223372036854776000 || v.length == 0 then 'None' else minValue — End diff – Fixed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user nellboy commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3366#discussion_r102726782

          — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee —
          @@ -318,3 +360,7 @@ angular.module('flinkApp')
          loadMetrics() if $scope.nodeid

          1. --------------------------------------
            +
            +.controller 'JobPlanWatermarksController', ($scope) ->
              • End diff –

          This controller is no longer empty, but we still needed to register it as a controller, so yes, we needed it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726782 — Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee — @@ -318,3 +360,7 @@ angular.module('flinkApp') loadMetrics() if $scope.nodeid -------------------------------------- + +.controller 'JobPlanWatermarksController', ($scope) -> End diff – This controller is no longer empty, but we still needed to register it as a controller, so yes, we needed it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3366

          Thanks for addressing the comments. I added small refactorings on top of your changes here: (https://github.com/uce/flink/tree/watermarks) and will merge this later today. Thanks again for this contribution it is a really nice feature!

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3366 Thanks for addressing the comments. I added small refactorings on top of your changes here: ( https://github.com/uce/flink/tree/watermarks ) and will merge this later today. Thanks again for this contribution it is a really nice feature!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3366

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3366
          Hide
          vpernin Vladislav Pernin added a comment -

          What is the official way of monitoring backpressure ?
          The rest_api is not documented any more and sends a "deprecated" response.

          Shouldn't it be part of the metrics system ?

          Show
          vpernin Vladislav Pernin added a comment - What is the official way of monitoring backpressure ? The rest_api is not documented any more and sends a "deprecated" response. Shouldn't it be part of the metrics system ?
          Hide
          Zentol Chesnay Schepler added a comment -

          Vladislav Pernin The backpressure monitoring should still be done through the REST API or the backpressure tab in the web-frontend. The "status" of the REST response is "deprecated" if this query caused the backpressure sampling to be started. Since this takes a bit the last, and as such outdated, backpressure information, will be returned.

          Show
          Zentol Chesnay Schepler added a comment - Vladislav Pernin The backpressure monitoring should still be done through the REST API or the backpressure tab in the web-frontend. The "status" of the REST response is "deprecated" if this query caused the backpressure sampling to be started. Since this takes a bit the last, and as such outdated, backpressure information, will be returned.

            People

            • Assignee:
              Unassigned
              Reporter:
              rmetzger Robert Metzger
            • Votes:
              2 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development