Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5473

setMaxParallelism() higher than 1 is possible on non-parallel operators

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      While trying out Flink 1.2, I found out that you can set a maxParallelism higher than 1 on a non-parallel operator.
      I think we should have the same semantics as the setParallelism() method.

      Also, when setting a global maxParallelism in the execution environment, it will be set as a default value for the non-parallel operator.
      When restoring a savepoint from 1.1, you have to set the maxParallelism to the parallelism of the 1.1 job. Non-parallel operators will then also get the maxPar set to this value, leading to an error on restore.

      So currently, users restoring from 1.1 to 1.2 have to manually set the maxParallelism to 1 for all non-parallel operators.

        Issue Links

          Activity

          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.3.0: acfeeaf5e337e56300d10a3a991e79edc827ac7a
          1.2.0: 993a2e2fa0ceecff0979a267ace7cd7b8e05d359

          Show
          till.rohrmann Till Rohrmann added a comment - 1.3.0: acfeeaf5e337e56300d10a3a991e79edc827ac7a 1.2.0: 993a2e2fa0ceecff0979a267ace7cd7b8e05d359
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3182

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3182
          Hide
          rmetzger Robert Metzger added a comment -

          Aljoscha Krettek No, Till Rohrmann wants to merge it to master as well.

          Show
          rmetzger Robert Metzger added a comment - Aljoscha Krettek No, Till Rohrmann wants to merge it to master as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3182

          I've merged the PR to the `release-1.2` branch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3182 I've merged the PR to the `release-1.2` branch.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Robert Metzger can this be closed, then?

          Show
          aljoscha Aljoscha Krettek added a comment - Robert Metzger can this be closed, then?
          Show
          rmetzger Robert Metzger added a comment - Resolved for 1.2 in http://git-wip-us.apache.org/repos/asf/flink/commit/993a2e2f
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3182

          Changes look good. Travis passed. Merging this PR. Thanks for your work @StefanRRichter

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3182 Changes look good. Travis passed. Merging this PR. Thanks for your work @StefanRRichter
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3182

          Rebased.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3182 Rebased.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3182

          Thanks for the review, @tillrohrmann! I followed all of your suggestions, except for the indentation formatting.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3182 Thanks for the review, @tillrohrmann! I followed all of your suggestions, except for the indentation formatting.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3182#discussion_r97337263

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java —
          @@ -57,47 +57,51 @@

          /** Use the same log for all ExecutionGraph classes */
          private static final Logger LOG = ExecutionGraph.LOG;

          • private final SerializableObject stateMonitor = new SerializableObject();
            +
            + public static final int VALUE_NOT_SET = -1;
            +
            + private final Object stateMonitor = new Object();

          private final ExecutionGraph graph;

          private final JobVertex jobVertex;

          private final ExecutionVertex[] taskVertices;

          • private IntermediateResult[] producedDataSets;
            + private final IntermediateResult[] producedDataSets;

          private final List<IntermediateResult> inputs;

          private final int parallelism;

          • private final int maxParallelism;

          • private final boolean[] finishedSubtasks;
          • private volatile int numSubtasksInFinalState;
          • +
            private final SlotSharingGroup slotSharingGroup;

          • +
            private final CoLocationGroup coLocationGroup;

          • +
            private final InputSplit[] inputSplits;

          + private final int maxParallelismConfigured;
          +
          + private int maxParallelismDerived;
          +
          + private volatile int numSubtasksInFinalState;
          +
          /**

          • Serialized task information which is for all sub tasks the same. Thus, it avoids to
          • serialize the same information multiple times in order to create the
          • TaskDeploymentDescriptors.
            */
          • private final SerializedValue<TaskInformation> serializedTaskInformation;
            + private SerializedValue<TaskInformation> serializedTaskInformation;

          private InputSplitAssigner splitAssigner;

          public ExecutionJobVertex(
          ExecutionGraph graph,
          JobVertex jobVertex,
          int defaultParallelism,

          • Time timeout) throws JobException, IOException {
            + Time timeout) throws JobException {
              • End diff –

          You are right, but I kept the indentation to avoid formatting changes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3182#discussion_r97337263 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java — @@ -57,47 +57,51 @@ /** Use the same log for all ExecutionGraph classes */ private static final Logger LOG = ExecutionGraph.LOG; private final SerializableObject stateMonitor = new SerializableObject(); + + public static final int VALUE_NOT_SET = -1; + + private final Object stateMonitor = new Object(); private final ExecutionGraph graph; private final JobVertex jobVertex; private final ExecutionVertex[] taskVertices; private IntermediateResult[] producedDataSets; + private final IntermediateResult[] producedDataSets; private final List<IntermediateResult> inputs; private final int parallelism; private final int maxParallelism; private final boolean[] finishedSubtasks; private volatile int numSubtasksInFinalState; + private final SlotSharingGroup slotSharingGroup; + private final CoLocationGroup coLocationGroup; + private final InputSplit[] inputSplits; + private final int maxParallelismConfigured; + + private int maxParallelismDerived; + + private volatile int numSubtasksInFinalState; + /** Serialized task information which is for all sub tasks the same. Thus, it avoids to serialize the same information multiple times in order to create the TaskDeploymentDescriptors. */ private final SerializedValue<TaskInformation> serializedTaskInformation; + private SerializedValue<TaskInformation> serializedTaskInformation; private InputSplitAssigner splitAssigner; public ExecutionJobVertex( ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, Time timeout) throws JobException, IOException { + Time timeout) throws JobException { End diff – You are right, but I kept the indentation to avoid formatting changes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3182#discussion_r97286408

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
          @@ -94,6 +95,9 @@ public OperatorChain(StreamTask<OUT, OP> containingTask) {
          try {
          for (int i = 0; i < outEdgesInOrder.size(); i++) {
          StreamEdge outEdge = outEdgesInOrder.get;
          +
          +
          +
          — End diff –

          One line break would probably be enough here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3182#discussion_r97286408 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -94,6 +95,9 @@ public OperatorChain(StreamTask<OUT, OP> containingTask) { try { for (int i = 0; i < outEdgesInOrder.size(); i++) { StreamEdge outEdge = outEdgesInOrder.get ; + + + — End diff – One line break would probably be enough here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3182#discussion_r97282598

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java —
          @@ -57,47 +57,51 @@

          /** Use the same log for all ExecutionGraph classes */
          private static final Logger LOG = ExecutionGraph.LOG;

          • private final SerializableObject stateMonitor = new SerializableObject();
            +
            + public static final int VALUE_NOT_SET = -1;
            +
            + private final Object stateMonitor = new Object();

          private final ExecutionGraph graph;

          private final JobVertex jobVertex;

          private final ExecutionVertex[] taskVertices;

          • private IntermediateResult[] producedDataSets;
            + private final IntermediateResult[] producedDataSets;

          private final List<IntermediateResult> inputs;

          private final int parallelism;

          • private final int maxParallelism;

          • private final boolean[] finishedSubtasks;
          • private volatile int numSubtasksInFinalState;
          • +
            private final SlotSharingGroup slotSharingGroup;

          • +
            private final CoLocationGroup coLocationGroup;

          • +
            private final InputSplit[] inputSplits;

          + private final int maxParallelismConfigured;
          +
          + private int maxParallelismDerived;
          +
          + private volatile int numSubtasksInFinalState;
          +
          /**

          • Serialized task information which is for all sub tasks the same. Thus, it avoids to
          • serialize the same information multiple times in order to create the
          • TaskDeploymentDescriptors.
            */
          • private final SerializedValue<TaskInformation> serializedTaskInformation;
            + private SerializedValue<TaskInformation> serializedTaskInformation;

          private InputSplitAssigner splitAssigner;

          public ExecutionJobVertex(
          ExecutionGraph graph,
          JobVertex jobVertex,
          int defaultParallelism,

          • Time timeout) throws JobException, IOException {
            + Time timeout) throws JobException {
              • End diff –

          Method declaration parameters which are broken into multiple lines are usually indented twice.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3182#discussion_r97282598 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java — @@ -57,47 +57,51 @@ /** Use the same log for all ExecutionGraph classes */ private static final Logger LOG = ExecutionGraph.LOG; private final SerializableObject stateMonitor = new SerializableObject(); + + public static final int VALUE_NOT_SET = -1; + + private final Object stateMonitor = new Object(); private final ExecutionGraph graph; private final JobVertex jobVertex; private final ExecutionVertex[] taskVertices; private IntermediateResult[] producedDataSets; + private final IntermediateResult[] producedDataSets; private final List<IntermediateResult> inputs; private final int parallelism; private final int maxParallelism; private final boolean[] finishedSubtasks; private volatile int numSubtasksInFinalState; + private final SlotSharingGroup slotSharingGroup; + private final CoLocationGroup coLocationGroup; + private final InputSplit[] inputSplits; + private final int maxParallelismConfigured; + + private int maxParallelismDerived; + + private volatile int numSubtasksInFinalState; + /** Serialized task information which is for all sub tasks the same. Thus, it avoids to serialize the same information multiple times in order to create the TaskDeploymentDescriptors. */ private final SerializedValue<TaskInformation> serializedTaskInformation; + private SerializedValue<TaskInformation> serializedTaskInformation; private InputSplitAssigner splitAssigner; public ExecutionJobVertex( ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, Time timeout) throws JobException, IOException { + Time timeout) throws JobException { End diff – Method declaration parameters which are broken into multiple lines are usually indented twice.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3182#discussion_r97281937

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java —
          @@ -45,29 +45,37 @@

          private final Logger logger;
          private final Map<JobVertexID, ExecutionJobVertex> tasks;

          • private final CompletedCheckpoint latest;
            + private final Map<JobVertexID, TaskState> taskStates;
            private final boolean allowNonRestoredState;

          public StateAssignmentOperation(
          Logger logger,
          Map<JobVertexID, ExecutionJobVertex> tasks,

          • CompletedCheckpoint latest,
            + Map<JobVertexID, TaskState> taskStates,
            boolean allowNonRestoredState) { this.logger = logger; this.tasks = tasks; - this.latest = latest; + this.taskStates = taskStates; this.allowNonRestoredState = allowNonRestoredState; }

          public boolean assignStates() throws Exception {
          — End diff –

          This method seems a bit lengthy. Maybe we could split it up.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3182#discussion_r97281937 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java — @@ -45,29 +45,37 @@ private final Logger logger; private final Map<JobVertexID, ExecutionJobVertex> tasks; private final CompletedCheckpoint latest; + private final Map<JobVertexID, TaskState> taskStates; private final boolean allowNonRestoredState; public StateAssignmentOperation( Logger logger, Map<JobVertexID, ExecutionJobVertex> tasks, CompletedCheckpoint latest, + Map<JobVertexID, TaskState> taskStates, boolean allowNonRestoredState) { this.logger = logger; this.tasks = tasks; - this.latest = latest; + this.taskStates = taskStates; this.allowNonRestoredState = allowNonRestoredState; } public boolean assignStates() throws Exception { — End diff – This method seems a bit lengthy. Maybe we could split it up.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3182#discussion_r97280170

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java —
          @@ -45,29 +45,37 @@

          private final Logger logger;
          private final Map<JobVertexID, ExecutionJobVertex> tasks;

          • private final CompletedCheckpoint latest;
            + private final Map<JobVertexID, TaskState> taskStates;
            private final boolean allowNonRestoredState;

          public StateAssignmentOperation(
          Logger logger,
          Map<JobVertexID, ExecutionJobVertex> tasks,

          • CompletedCheckpoint latest,
            + Map<JobVertexID, TaskState> taskStates,
            boolean allowNonRestoredState) {

          this.logger = logger;
          this.tasks = tasks;

          • this.latest = latest;
            + this.taskStates = taskStates;
            this.allowNonRestoredState = allowNonRestoredState;
              • End diff –

          `Precondition` checks could be helpful here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3182#discussion_r97280170 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java — @@ -45,29 +45,37 @@ private final Logger logger; private final Map<JobVertexID, ExecutionJobVertex> tasks; private final CompletedCheckpoint latest; + private final Map<JobVertexID, TaskState> taskStates; private final boolean allowNonRestoredState; public StateAssignmentOperation( Logger logger, Map<JobVertexID, ExecutionJobVertex> tasks, CompletedCheckpoint latest, + Map<JobVertexID, TaskState> taskStates, boolean allowNonRestoredState) { this.logger = logger; this.tasks = tasks; this.latest = latest; + this.taskStates = taskStates; this.allowNonRestoredState = allowNonRestoredState; End diff – `Precondition` checks could be helpful here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3182#discussion_r97283204

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java —
          @@ -599,7 +602,24 @@ TaskDeploymentDescriptor createDeploymentDescriptor(
          boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment();

          for (IntermediateResultPartition partition : resultPartitions.values()) {

          • producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, lazyScheduling));
            +
            + List<List<ExecutionEdge>> consumers = partition.getConsumers();
            +
            + if(consumers.isEmpty()) {
              • End diff –

          whitespace missing between `if` and `(`

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3182#discussion_r97283204 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java — @@ -599,7 +602,24 @@ TaskDeploymentDescriptor createDeploymentDescriptor( boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment(); for (IntermediateResultPartition partition : resultPartitions.values()) { producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, lazyScheduling)); + + List<List<ExecutionEdge>> consumers = partition.getConsumers(); + + if(consumers.isEmpty()) { End diff – whitespace missing between `if` and `(`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3182#discussion_r97282806

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java —
          @@ -212,6 +206,19 @@ public ExecutionJobVertex(
          finishedSubtasks = new boolean[parallelism];
          }

          + public void setMaxParallelismDerived(int maxParallelism) {
          +
          + Preconditions.checkState(VALUE_NOT_SET == maxParallelismConfigured,
          + "Attempt to override a configured max parallelism. Configured: " + maxParallelismConfigured
          + + ", argument: " + maxParallelism);
          +
          + Preconditions.checkArgument(maxParallelism > 0
          + && maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
          + "Overriding max parallelism is not in valid bounds: " + maxParallelism);
          — End diff –

          Maybe we could add the valid bounds here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3182#discussion_r97282806 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java — @@ -212,6 +206,19 @@ public ExecutionJobVertex( finishedSubtasks = new boolean [parallelism] ; } + public void setMaxParallelismDerived(int maxParallelism) { + + Preconditions.checkState(VALUE_NOT_SET == maxParallelismConfigured, + "Attempt to override a configured max parallelism. Configured: " + maxParallelismConfigured + + ", argument: " + maxParallelism); + + Preconditions.checkArgument(maxParallelism > 0 + && maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, + "Overriding max parallelism is not in valid bounds: " + maxParallelism); — End diff – Maybe we could add the valid bounds here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3182#discussion_r97293150

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java —
          @@ -230,9 +237,46 @@ public int getParallelism()

          { return parallelism; }

          + /**
          + * Returns the effective max parallelism. This value is determined in the following order of priority:
          + * <p>
          + * (maxParallelismConfigured) overrides (maxParallelismOverride) override (max(128, roundUp(parallelism)) / default)
          — End diff –

          `maxParallelismOverride` => `maxParallelismDerived`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3182#discussion_r97293150 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java — @@ -230,9 +237,46 @@ public int getParallelism() { return parallelism; } + /** + * Returns the effective max parallelism. This value is determined in the following order of priority: + * <p> + * (maxParallelismConfigured) overrides (maxParallelismOverride) override (max(128, roundUp(parallelism)) / default) — End diff – `maxParallelismOverride` => `maxParallelismDerived`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3182

          cc @uce

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3182 cc @uce
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/3182

          FLINK-5473 Limit MaxParallelism to 1 for non-parallel operators and improve choice of max parallelism without explicit configuration

          This PR limits the maximum parallelism for non-parallel operator to 1.

          Furthermore, this improves the default behaviour if the user did not explicitly specify a maximum parallelism. In particular, maximum parallelism can now be derived from savepoints, allowing users that migrate from Flink 1.1 to Flink 1.2 to keep their job unchanged.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink recover-max-para

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3182.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3182


          commit 20d0a3fc88c85a71b692c5408fc7b2fd33da8ff2
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-01-16T13:31:22Z

          FLINK-5473 Limit max parallelism to 1 for non-parallel operators

          commit f6081f319b7f2ef8615557743d906bc4585445f7
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-01-16T17:41:37Z

          FLINK-5473 Better default behaviours for unspecified maximum parallelism


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3182 FLINK-5473 Limit MaxParallelism to 1 for non-parallel operators and improve choice of max parallelism without explicit configuration This PR limits the maximum parallelism for non-parallel operator to 1. Furthermore, this improves the default behaviour if the user did not explicitly specify a maximum parallelism. In particular, maximum parallelism can now be derived from savepoints, allowing users that migrate from Flink 1.1 to Flink 1.2 to keep their job unchanged. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink recover-max-para Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3182.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3182 commit 20d0a3fc88c85a71b692c5408fc7b2fd33da8ff2 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-01-16T13:31:22Z FLINK-5473 Limit max parallelism to 1 for non-parallel operators commit f6081f319b7f2ef8615557743d906bc4585445f7 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-01-16T17:41:37Z FLINK-5473 Better default behaviours for unspecified maximum parallelism
          Hide
          rmetzger Robert Metzger added a comment -

          That would be the best user experience, yes.

          Show
          rmetzger Robert Metzger added a comment - That would be the best user experience, yes.
          Hide
          srichter Stefan Richter added a comment -

          We could try to automatically set the old parallelism as max parallelism by default.

          Show
          srichter Stefan Richter added a comment - We could try to automatically set the old parallelism as max parallelism by default.

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              rmetzger Robert Metzger
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development