Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3052

Optimizer does not push properties out of bulk iterations

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.10.0
    • 0.10.1, 1.0.0
    • API / DataSet
    • None

    Description

      Flink's optimizer should be able to reuse interesting properties from outside the loop. In order to do that it is sometimes necessary to append a NoOp node to the step function which recomputes the required properties.

      This is currently not working for BulkIterations, because the plans with the appended NoOp nodes are not added to the overall list of candidates.

      This not only leads to sub-optimal plan selection but sometimes to the rejection of valid jobs. The following job, for example, will be falsely rejected by flink.

      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      
      		DataSet<Tuple1<Long>> input1 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
      			@Override
      			public Tuple1<Long> map(Long value) throws Exception {
      				return new Tuple1<>(value);
      			}
      		});
      
      		DataSet<Tuple1<Long>> input2 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
      			@Override
      			public Tuple1<Long> map(Long value) throws Exception {
      				return new Tuple1<>(value);
      			}
      		});
      
      		DataSet<Tuple1<Long>> distinctInput = input1.distinct();
      
      		IterativeDataSet<Tuple1<Long>> iteration = distinctInput.iterate(10);
      
      		DataSet<Tuple1<Long>> iterationStep = iteration
      				.coGroup(input2)
      				.where(0)
      				.equalTo(0)
      				.with(new CoGroupFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>>() {
      					@Override
      					public void coGroup(
      							Iterable<Tuple1<Long>> first,
      							Iterable<Tuple1<Long>> second,
      							Collector<Tuple1<Long>> out) throws Exception {
      						Iterator<Tuple1<Long>> it = first.iterator();
      
      						if (it.hasNext()) {
      							out.collect(it.next());
      						}
      					}
      				});
      
      		DataSet<Tuple1<Long>> iterationResult = iteration.closeWith(iterationStep);
      
      		iterationResult.output(new DiscardingOutputFormat<Tuple1<Long>>());
      

      Attachments

        Activity

          People

            trohrmann Till Rohrmann
            trohrmann Till Rohrmann
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: