Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6188

Some setParallelism() methods can't cope with default parallelism

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.3.0, 1.2.1
    • Fix Version/s: None
    • Component/s: DataStream API
    • Labels:
      None

      Description

      Recent changes done for FLINK-5808 move default parallelism manifestation from eager to lazy, that is, the parallelism of operations that don't have an explicit parallelism is only set when generating the JobGraph. Some setParallelism() calls, such as SingleOutputStreamOperator.setParallelism() cannot deal with the fact that the parallelism of an operation might be -1 (which indicates that it should take the default parallelism when generating the JobGraph).

      We should either revert the changes that fixed another user-facing bug for version 1.2.1 or fix the methods.

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          This is the thrown exception:

          Caused by: java.lang.IllegalArgumentException: The parallelism of an operator must be
          at least 1.
          	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
          	at org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.setParallelism(SingleOutputStreamOperator.java:124)
          	at org.apache.flink.streaming.api.datastream.DataStream.assignTimestampsAndWatermarks(DataStream.java:775)
          	at org.apache.flink.streaming.api.scala.DataStream.assignTimestampsAndWatermarks(DataStream.scala:736)
          	...
          

          The problem is that assignTimestampsAndWatermarks() uses setParallelism() with the parallelism of the upstream operation, which can be -1.

          Show
          aljoscha Aljoscha Krettek added a comment - This is the thrown exception: Caused by: java.lang.IllegalArgumentException: The parallelism of an operator must be at least 1. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) at org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.setParallelism(SingleOutputStreamOperator.java:124) at org.apache.flink.streaming.api.datastream.DataStream.assignTimestampsAndWatermarks(DataStream.java:775) at org.apache.flink.streaming.api.scala.DataStream.assignTimestampsAndWatermarks(DataStream.scala:736) ... The problem is that assignTimestampsAndWatermarks() uses setParallelism() with the parallelism of the upstream operation, which can be -1 .
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user aljoscha opened a pull request:

          https://github.com/apache/flink/pull/3616

          FLINK-6188 Correctly handle PARALLELISM_DEFAULT in stream operator

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/aljoscha/flink jira-6188-set-parallelism

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3616.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3616


          commit 40ca699cc1f6b00d6d50b16d6833794c5362dbb1
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-03-26T10:23:01Z

          FLINK-6188 Handle PARALLELISM_DEFAULT in setParallelism()

          SingleOutputStreamOperator.setParallelism() and
          StreamTransform.setParallelism() did not correctly handle the case of
          setting the parallelism to PARALLELISM_DEFAULT.

          commit 9107a3400af877ddfefcc0f5b16cc6b03127ec76
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-03-26T10:24:58Z

          FLINK-6188 Add TimestampAssignerTranslationTest

          The tests verify that we instantiate the correct operator and that they
          correctly pick up the parallelism form the upstream operator.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3616 FLINK-6188 Correctly handle PARALLELISM_DEFAULT in stream operator Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-6188-set-parallelism Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3616.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3616 commit 40ca699cc1f6b00d6d50b16d6833794c5362dbb1 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-03-26T10:23:01Z FLINK-6188 Handle PARALLELISM_DEFAULT in setParallelism() SingleOutputStreamOperator.setParallelism() and StreamTransform.setParallelism() did not correctly handle the case of setting the parallelism to PARALLELISM_DEFAULT. commit 9107a3400af877ddfefcc0f5b16cc6b03127ec76 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-03-26T10:24:58Z FLINK-6188 Add TimestampAssignerTranslationTest The tests verify that we instantiate the correct operator and that they correctly pick up the parallelism form the upstream operator.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3616

          This is not completely done yet. It seems I still have to change when max parallelism is instantiated for this to work.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3616 This is not completely done yet. It seems I still have to change when max parallelism is instantiated for this to work.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3616#discussion_r108943516

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java —
          @@ -202,7 +203,17 @@ public int getParallelism() {

          • @param parallelism The new parallelism to set on this {@code StreamTransformation}

            */
            public void setParallelism(int parallelism) {

          • Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero.");
            + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
              • End diff –

          The `PARALLELISM_UNKNOWN` does not seem to be used anywhere in the codebase, apart from checking against it. Couldn't we remove it? Or am I missing something?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108943516 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java — @@ -202,7 +203,17 @@ public int getParallelism() { @param parallelism The new parallelism to set on this {@code StreamTransformation} */ public void setParallelism(int parallelism) { Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero."); + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM."); End diff – The `PARALLELISM_UNKNOWN` does not seem to be used anywhere in the codebase, apart from checking against it. Couldn't we remove it? Or am I missing something?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3616#discussion_r108944527

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java —
          @@ -221,10 +232,19 @@ public int getMaxParallelism() {

          • @param maxParallelism Maximum parallelism for this stream transformation.
            */
            public void setMaxParallelism(int maxParallelism) {
          • Preconditions.checkArgument(maxParallelism > 0
          • && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
          • "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
          • + ". Found: " + maxParallelism);
            + checkArgument(
            + parallelism != ExecutionConfig.PARALLELISM_DEFAULT,
            + "A maximum parallelism can only be specified with an explicitly specified " +
            + "parallelism.");
              • End diff –

          This means that we always have to specify a `parallelism` before being able to specify a `maxParallelism`. This seems a bit counter-intuitive to me, as the only constraint seems to be that `parallelism <= maxParalleliem`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108944527 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java — @@ -221,10 +232,19 @@ public int getMaxParallelism() { @param maxParallelism Maximum parallelism for this stream transformation. */ public void setMaxParallelism(int maxParallelism) { Preconditions.checkArgument(maxParallelism > 0 && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM, "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); + checkArgument( + parallelism != ExecutionConfig.PARALLELISM_DEFAULT, + "A maximum parallelism can only be specified with an explicitly specified " + + "parallelism."); End diff – This means that we always have to specify a `parallelism` before being able to specify a `maxParallelism`. This seems a bit counter-intuitive to me, as the only constraint seems to be that `parallelism <= maxParalleliem`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3616#discussion_r108943809

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java —
          @@ -221,10 +232,19 @@ public int getMaxParallelism() {

          • @param maxParallelism Maximum parallelism for this stream transformation.
            */
            public void setMaxParallelism(int maxParallelism) {
          • Preconditions.checkArgument(maxParallelism > 0
          • && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
          • "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
          • + ". Found: " + maxParallelism);
            + checkArgument(
            + parallelism != ExecutionConfig.PARALLELISM_DEFAULT,
            + "A maximum parallelism can only be specified with an explicitly specified " +
            + "parallelism.");
            + checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
            + checkArgument(
            + maxParallelism >= parallelism,
            + "The maximum parallelism must be larger than the parallelism. (parallelism = " +
            + parallelism + " max-parallelism = " + maxParallelism);
              • End diff –

          Missing closing ")" at the end of the error message.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108943809 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java — @@ -221,10 +232,19 @@ public int getMaxParallelism() { @param maxParallelism Maximum parallelism for this stream transformation. */ public void setMaxParallelism(int maxParallelism) { Preconditions.checkArgument(maxParallelism > 0 && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM, "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); + checkArgument( + parallelism != ExecutionConfig.PARALLELISM_DEFAULT, + "A maximum parallelism can only be specified with an explicitly specified " + + "parallelism."); + checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0."); + checkArgument( + maxParallelism >= parallelism, + "The maximum parallelism must be larger than the parallelism. (parallelism = " + + parallelism + " max-parallelism = " + maxParallelism); End diff – Missing closing ")" at the end of the error message.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3616#discussion_r108943194

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java —
          @@ -50,6 +52,83 @@
          @SuppressWarnings("serial")
          public class StreamingJobGraphGeneratorTest extends TestLogger {

          — End diff –

          I would suggest to add some tests (the same code as the existing ones) with invalid configurations and the expected exception.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108943194 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java — @@ -50,6 +52,83 @@ @SuppressWarnings("serial") public class StreamingJobGraphGeneratorTest extends TestLogger { — End diff – I would suggest to add some tests (the same code as the existing ones) with invalid configurations and the expected exception.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user greghogan commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3616#discussion_r108948674

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java —
          @@ -202,7 +203,17 @@ public int getParallelism() {

          • @param parallelism The new parallelism to set on this {@code StreamTransformation}

            */
            public void setParallelism(int parallelism) {

          • Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero.");
            + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
              • End diff –

          `PARALLELISM_UNKNOWN` was removed in FLINK-3980. Not sure why it was added back unless this was unintentional.

          Show
          githubbot ASF GitHub Bot added a comment - Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108948674 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java — @@ -202,7 +203,17 @@ public int getParallelism() { @param parallelism The new parallelism to set on this {@code StreamTransformation} */ public void setParallelism(int parallelism) { Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero."); + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM."); End diff – `PARALLELISM_UNKNOWN` was removed in FLINK-3980 . Not sure why it was added back unless this was unintentional.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3616#discussion_r108956486

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java —
          @@ -202,7 +203,17 @@ public int getParallelism() {

          • @param parallelism The new parallelism to set on this {@code StreamTransformation}

            */
            public void setParallelism(int parallelism) {

          • Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero.");
            + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
              • End diff –

          @greghogan I see. so I suppose we could remove it. This will simplify the checks here a bit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108956486 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java — @@ -202,7 +203,17 @@ public int getParallelism() { @param parallelism The new parallelism to set on this {@code StreamTransformation} */ public void setParallelism(int parallelism) { Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero."); + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM."); End diff – @greghogan I see. so I suppose we could remove it. This will simplify the checks here a bit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3616#discussion_r108977470

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java —
          @@ -202,7 +203,17 @@ public int getParallelism() {

          • @param parallelism The new parallelism to set on this {@code StreamTransformation}

            */
            public void setParallelism(int parallelism) {

          • Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero.");
            + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
              • End diff –

          It seems that in fact it was added here by mistake: https://github.com/apache/flink/commit/ec975aa

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108977470 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java — @@ -202,7 +203,17 @@ public int getParallelism() { @param parallelism The new parallelism to set on this {@code StreamTransformation} */ public void setParallelism(int parallelism) { Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero."); + checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM."); End diff – It seems that in fact it was added here by mistake: https://github.com/apache/flink/commit/ec975aa
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3616#discussion_r108977967

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java —
          @@ -221,10 +232,19 @@ public int getMaxParallelism() {

          • @param maxParallelism Maximum parallelism for this stream transformation.
            */
            public void setMaxParallelism(int maxParallelism) {
          • Preconditions.checkArgument(maxParallelism > 0
          • && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
          • "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
          • + ". Found: " + maxParallelism);
            + checkArgument(
            + parallelism != ExecutionConfig.PARALLELISM_DEFAULT,
            + "A maximum parallelism can only be specified with an explicitly specified " +
            + "parallelism.");
              • End diff –

          Yes, it is but the problem is that the treatment of the "default parallelism" is a bit strange. (The default parallelism is the parallelism that is set from the flink config or by the user on the command line using the `-p` parameter. Maybe we have to rework that before we can fix this.

          I'm starting to think that we should maybe revert the parallelism/max-parallelism changes on the release-1.2 branch and rework the whole thing properly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3616#discussion_r108977967 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java — @@ -221,10 +232,19 @@ public int getMaxParallelism() { @param maxParallelism Maximum parallelism for this stream transformation. */ public void setMaxParallelism(int maxParallelism) { Preconditions.checkArgument(maxParallelism > 0 && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM, "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism); + checkArgument( + parallelism != ExecutionConfig.PARALLELISM_DEFAULT, + "A maximum parallelism can only be specified with an explicitly specified " + + "parallelism."); End diff – Yes, it is but the problem is that the treatment of the "default parallelism" is a bit strange. (The default parallelism is the parallelism that is set from the flink config or by the user on the command line using the `-p` parameter. Maybe we have to rework that before we can fix this. I'm starting to think that we should maybe revert the parallelism/max-parallelism changes on the release-1.2 branch and rework the whole thing properly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3616

          @greghogan @kl0u I addressed your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3616 @greghogan @kl0u I addressed your comments.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Removing blocker status since FLINK-5808 was reverted.

          Show
          aljoscha Aljoscha Krettek added a comment - Removing blocker status since FLINK-5808 was reverted.

            People

            • Assignee:
              aljoscha Aljoscha Krettek
              Reporter:
              aljoscha Aljoscha Krettek
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:

                Development