Description
Hello!
When I try to run a `groupBy` on an IterativeDataStream I get a NullPointerException. Here is the code that reproduces the issue:
public Test() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); DataStream<Tuple2<Long, Long>> edges = env .generateSequence(0, 7) .map(new MapFunction<Long, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> map(Long v) throws Exception { return new Tuple2<>(v, (v + 1)); } }); IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate(); SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1) .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws Exception { return tuple; } }) .split(new OutputSelector<Tuple2<Long, Long>>() { @Override public Iterable<String> select(Tuple2<Long, Long> tuple) { List<String> output = new ArrayList<>(); output.add("iterate"); return output; } }); iteration.closeWith(step.select("iterate")); env.execute("Sandbox"); }
Moving the groupBy before the iteration solves the issue. e.g. this works:
... iteration = edges.groupBy(1).iterate(); iteration.map(...)
Here is the stack trace:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207)
at org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72)
at org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73)
at org.apache.flink.graph.streaming.example.Test.main(Test.java:79)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)