Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
0.10.0
-
None
Description
Flink's optimizer should be able to reuse interesting properties from outside the loop. In order to do that it is sometimes necessary to append a NoOp node to the step function which recomputes the required properties.
This is currently not working for BulkIterations, because the plans with the appended NoOp nodes are not added to the overall list of candidates.
This not only leads to sub-optimal plan selection but sometimes to the rejection of valid jobs. The following job, for example, will be falsely rejected by flink.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple1<Long>> input1 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() { @Override public Tuple1<Long> map(Long value) throws Exception { return new Tuple1<>(value); } }); DataSet<Tuple1<Long>> input2 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() { @Override public Tuple1<Long> map(Long value) throws Exception { return new Tuple1<>(value); } }); DataSet<Tuple1<Long>> distinctInput = input1.distinct(); IterativeDataSet<Tuple1<Long>> iteration = distinctInput.iterate(10); DataSet<Tuple1<Long>> iterationStep = iteration .coGroup(input2) .where(0) .equalTo(0) .with(new CoGroupFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>>() { @Override public void coGroup( Iterable<Tuple1<Long>> first, Iterable<Tuple1<Long>> second, Collector<Tuple1<Long>> out) throws Exception { Iterator<Tuple1<Long>> it = first.iterator(); if (it.hasNext()) { out.collect(it.next()); } } }); DataSet<Tuple1<Long>> iterationResult = iteration.closeWith(iterationStep); iterationResult.output(new DiscardingOutputFormat<Tuple1<Long>>());