Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5131 Fine-grained Resource Configuration
  3. FLINK-5134

Aggregate ResourceSpec for chained operators when generating job graph

    Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      This is a part of fine-grained resource configuration in flip-6.

      In JobGraph generation, each JobVertex may contain a series of chained operators, and the resource of JobVertex should be aggregation of individual resource in chained operators.

      For memory resource in JobVertex, the aggregation is the sum formula for chained operators, and for cpu cores resource, the aggregation is the maximum formula for chained operators.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zhijiangW opened a pull request:

          https://github.com/apache/flink/pull/3455

          FLINK-5134[runtime]Aggregate ResourceSpe for chained operators when generating JobGraph

          It is a part of fine-grained resource configuration in 'flip-6'.

          In 'JobGraph' generation, each created 'JobVertex' may contain a series of chained operators, and the resource of 'JobVertex' should be aggregation of individual resource in chained operators.

          BTW, in order to avoid NPE issue and take it easy to process in the following processes, I set the default unknown 'ResourceSpec' for 'Operator' and 'StreamNode'.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zhijiangW/flink FLILNK-5134

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3455.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3455


          commit 1692fdccbdc702a598c09ba4ce408bb25504210f
          Author: 淘江 <taojiang.wzj@alibaba-inc.com>
          Date: 2017-03-02T05:51:39Z

          FLINK-5134[runtime]Aggregate ResourceSpe for chained operators when generating job graph

          Summary: It is a sub task of fine-grained resource configuration. Aggregate ResourceSpe for chained operators when generating job graph

          Test Plan: N/A

          Reviewers: 辅机

          Subscribers: P577102

          Differential Revision: https://aone.alibaba-inc.com/code/D124138


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/3455 FLINK-5134 [runtime] Aggregate ResourceSpe for chained operators when generating JobGraph It is a part of fine-grained resource configuration in 'flip-6'. In 'JobGraph' generation, each created 'JobVertex' may contain a series of chained operators, and the resource of 'JobVertex' should be aggregation of individual resource in chained operators. BTW, in order to avoid NPE issue and take it easy to process in the following processes, I set the default unknown 'ResourceSpec' for 'Operator' and 'StreamNode'. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLILNK-5134 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3455.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3455 commit 1692fdccbdc702a598c09ba4ce408bb25504210f Author: 淘江 <taojiang.wzj@alibaba-inc.com> Date: 2017-03-02T05:51:39Z FLINK-5134 [runtime] Aggregate ResourceSpe for chained operators when generating job graph Summary: It is a sub task of fine-grained resource configuration. Aggregate ResourceSpe for chained operators when generating job graph Test Plan: N/A Reviewers: 辅机 Subscribers: P577102 Differential Revision: https://aone.alibaba-inc.com/code/D124138
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3455

          The code in this PR looks good. Would be nice to have a test for the merging on the `JobGraphGenerator` side, similar to the one for the `StreamingJobGraphGenerator`.

          A test that checks that includes a few chaining patterns, like chaining sources / sinks, possibly iterations.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3455 The code in this PR looks good. Would be nice to have a test for the merging on the `JobGraphGenerator` side, similar to the one for the `StreamingJobGraphGenerator`. A test that checks that includes a few chaining patterns, like chaining sources / sinks, possibly iterations.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3455#discussion_r104829653

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java —
          @@ -208,4 +209,36 @@ public Integer map(Integer value) throws Exception

          { assertFalse(printConfig.isChainStart()); assertTrue(printConfig.isChainEnd()); }

          +
          +// /**
          — End diff –

          According to Apache's rule. Don't comment any code. If you would like to delete them, please delete them. Git logs and pull requests will show the changes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3455#discussion_r104829653 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java — @@ -208,4 +209,36 @@ public Integer map(Integer value) throws Exception { assertFalse(printConfig.isChainStart()); assertTrue(printConfig.isChainEnd()); } + +// /** — End diff – According to Apache's rule. Don't comment any code. If you would like to delete them, please delete them. Git logs and pull requests will show the changes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3455

          @StephanEwen , thank you for reviews and suggestions.

          I would add the related tests for `JobGraphGenerator` and try to cover different chained modes.

          BTW, I will also remove the previous comments for `setResources` public methods and change to private methods. So the tests can call them via reflection as mentioned in slack. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3455 @StephanEwen , thank you for reviews and suggestions. I would add the related tests for `JobGraphGenerator` and try to cover different chained modes. BTW, I will also remove the previous comments for `setResources` public methods and change to private methods. So the tests can call them via reflection as mentioned in slack. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3455

          Sounds very good!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3455 Sounds very good!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3455

          @StephanEwen , I have submitted the modifications as we confirmed before. It needs to change some previous codes and excuse my late update because of a little busy last week.

          BTW, I found another potential bug in `StreamingJobGraphGeneratorTest.testChainStartEndSetting`. If this test is not executed at first, it would cause NPE problem because the `idCounter` in `StreamTransformation` is static. And I want to fix it in another separate jira although I have modified it in order to confirm the `clean verify` successful.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3455 @StephanEwen , I have submitted the modifications as we confirmed before. It needs to change some previous codes and excuse my late update because of a little busy last week. BTW, I found another potential bug in `StreamingJobGraphGeneratorTest.testChainStartEndSetting`. If this test is not executed at first, it would cause NPE problem because the `idCounter` in `StreamTransformation` is static. And I want to fix it in another separate jira although I have modified it in order to confirm the `clean verify` successful.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3455

          Thanks, looks good. Will merge this once the CI builds have passed!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3455 Thanks, looks good. Will merge this once the CI builds have passed!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3455

          Seeing some test failures associated with this pull request.

          StreamingJobGraphGeneratorTest.testChainedResourceMerging:

          Scala API backwards compatibility check (possibly missing `@PublicEvolving` or `@Internal` annotations)

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3455 Seeing some test failures associated with this pull request. StreamingJobGraphGeneratorTest.testChainedResourceMerging: https://s3.amazonaws.com/archive.travis-ci.org/jobs/210601881/log.txt https://s3.amazonaws.com/archive.travis-ci.org/jobs/210601884/log.txt Scala API backwards compatibility check (possibly missing `@PublicEvolving` or `@Internal` annotations) https://s3.amazonaws.com/archive.travis-ci.org/jobs/210601883/log.txt
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3455

          @StephanEwen , yes, I already noticed that today and this test can pass in my local machine. And I executed it for many loops and found that it may fail sometimes because of the sort sequence of generated job vertex may change for `IterationHead` and `Source`. I will make it stable and notice you when pass CI.
          Thank you!

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3455 @StephanEwen , yes, I already noticed that today and this test can pass in my local machine. And I executed it for many loops and found that it may fail sometimes because of the sort sequence of generated job vertex may change for `IterationHead` and `Source`. I will make it stable and notice you when pass CI. Thank you!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3455

          @StephanEwen , I fixed the iteration tests to make it stable yesterday, and I think it may be related with the static incremental id in `StreamTransformation`, especially for multi tests to be executed in different sequences. And I took another way to check and verify all the `JobVertex`s from generated `JobGraph`.

          The CI build finished just now and I found there are two failures. The first one is because of timeout, and the second one is for flink-dist module and seems unrelated with my pull request. https://s3.amazonaws.com/archive.travis-ci.org/jobs/211029086/log.txt(https://s3.amazonaws.com/archive.travis-ci.org/jobs/211029086/log.txt)

          Wish your check and reviews, thank you !

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3455 @StephanEwen , I fixed the iteration tests to make it stable yesterday, and I think it may be related with the static incremental id in `StreamTransformation`, especially for multi tests to be executed in different sequences. And I took another way to check and verify all the `JobVertex`s from generated `JobGraph`. The CI build finished just now and I found there are two failures. The first one is because of timeout, and the second one is for flink-dist module and seems unrelated with my pull request. https://s3.amazonaws.com/archive.travis-ci.org/jobs/211029086/log.txt ( https://s3.amazonaws.com/archive.travis-ci.org/jobs/211029086/log.txt ) Wish your check and reviews, thank you !
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3455

          Looks good, thanks.
          Merging this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3455 Looks good, thanks. Merging this...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3455

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3455
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed via 980d072fa2546dbc10cf878cf29532b2d8bbca8a

          Show
          StephanEwen Stephan Ewen added a comment - Fixed via 980d072fa2546dbc10cf878cf29532b2d8bbca8a

            People

            • Assignee:
              zjwang zhijiang
              Reporter:
              zjwang zhijiang
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development