Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3052

Optimizer does not push properties out of bulk iterations

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.0
    • Fix Version/s: 0.10.1, 1.0.0
    • Component/s: API / DataSet
    • Labels:
      None

      Description

      Flink's optimizer should be able to reuse interesting properties from outside the loop. In order to do that it is sometimes necessary to append a NoOp node to the step function which recomputes the required properties.

      This is currently not working for BulkIterations, because the plans with the appended NoOp nodes are not added to the overall list of candidates.

      This not only leads to sub-optimal plan selection but sometimes to the rejection of valid jobs. The following job, for example, will be falsely rejected by flink.

      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      
      		DataSet<Tuple1<Long>> input1 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
      			@Override
      			public Tuple1<Long> map(Long value) throws Exception {
      				return new Tuple1<>(value);
      			}
      		});
      
      		DataSet<Tuple1<Long>> input2 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
      			@Override
      			public Tuple1<Long> map(Long value) throws Exception {
      				return new Tuple1<>(value);
      			}
      		});
      
      		DataSet<Tuple1<Long>> distinctInput = input1.distinct();
      
      		IterativeDataSet<Tuple1<Long>> iteration = distinctInput.iterate(10);
      
      		DataSet<Tuple1<Long>> iterationStep = iteration
      				.coGroup(input2)
      				.where(0)
      				.equalTo(0)
      				.with(new CoGroupFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>>() {
      					@Override
      					public void coGroup(
      							Iterable<Tuple1<Long>> first,
      							Iterable<Tuple1<Long>> second,
      							Collector<Tuple1<Long>> out) throws Exception {
      						Iterator<Tuple1<Long>> it = first.iterator();
      
      						if (it.hasNext()) {
      							out.collect(it.next());
      						}
      					}
      				});
      
      		DataSet<Tuple1<Long>> iterationResult = iteration.closeWith(iterationStep);
      
      		iterationResult.output(new DiscardingOutputFormat<Tuple1<Long>>());
      

        Attachments

          Activity

            People

            • Assignee:
              trohrmann Till Rohrmann
              Reporter:
              trohrmann Till Rohrmann

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment