Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-1986

Group by fails on iterative data streams

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • None
    • None

    Description

      Hello!

      When I try to run a `groupBy` on an IterativeDataStream I get a NullPointerException. Here is the code that reproduces the issue:

      public Test() throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
      
          DataStream<Tuple2<Long, Long>> edges = env
                  .generateSequence(0, 7)
                  .map(new MapFunction<Long, Tuple2<Long, Long>>() {
                      @Override
                      public Tuple2<Long, Long> map(Long v) throws Exception {
                          return new Tuple2<>(v, (v + 1));
                      }
                  });
      
          IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate();
      
          SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1)
                  .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                      @Override
                      public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws Exception {
                          return tuple;
                      }
                  })
                  .split(new OutputSelector<Tuple2<Long, Long>>() {
                      @Override
                      public Iterable<String> select(Tuple2<Long, Long> tuple) {
                          List<String> output = new ArrayList<>();
                          output.add("iterate");
                          return output;
                      }
                  });
      
          iteration.closeWith(step.select("iterate"));
      
          env.execute("Sandbox");
      }
      

      Moving the groupBy before the iteration solves the issue. e.g. this works:

      ... iteration = edges.groupBy(1).iterate();
      iteration.map(...)
      

      Here is the stack trace:

      Exception in thread "main" java.lang.NullPointerException
      	at org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207)
      	at org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72)
      	at org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73)
      	at org.apache.flink.graph.streaming.example.Test.main(Test.java:79)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:601)
      	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
      

      Attachments

        Activity

          People

            gyfora Gyula Fora
            balidani Daniel Bali
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: