Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5025

Job fails because of Optimizer bug

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.1.3
    • None
    • None
    • None

    Description

      I have a batch job that when I run it I get the error message:

      org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.
      	at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
      	at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
      	at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
      	at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      	at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
      	at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
      	at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
      	at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185)
      	at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
      	at com.bol.reproduce.flink.Main.run(Main.java:42)
      	at com.bol.reproduce.flink.Main.main(Main.java:21)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
      

      The smallest code snippet I have been able to create that reproduces this problem is below here.
      Note that when using a single union this error does not happen.

      public class Main implements Serializable {
        public static void main(String[] args) throws Exception {
          System.exit(new Main().run());
        }
      
        private int run() throws IOException {
          final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      
          final DataSet<String> lines =
                     env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist")))
              .union(env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist"))))
              .union(env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist"))));
      
          List<String> allLines = new ArrayList<>();
          lines
            .rebalance()
            .output(new LocalCollectionOutputFormat<>(allLines));
      
          // execute program
          try {
            env.execute("Running");
          } catch (Exception e) {
            e.printStackTrace();
          }
          return 0;
        }
      }
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              nielsbasjes Niels Basjes
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: