Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.1.3
-
None
-
None
-
None
Description
I have a batch job that when I run it I get the error message:
org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at com.bol.reproduce.flink.Main.run(Main.java:42)
at com.bol.reproduce.flink.Main.main(Main.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
The smallest code snippet I have been able to create that reproduces this problem is below here.
Note that when using a single union this error does not happen.
public class Main implements Serializable { public static void main(String[] args) throws Exception { System.exit(new Main().run()); } private int run() throws IOException { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final DataSet<String> lines = env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist"))) .union(env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist")))) .union(env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist")))); List<String> allLines = new ArrayList<>(); lines .rebalance() .output(new LocalCollectionOutputFormat<>(allLines)); // execute program try { env.execute("Running"); } catch (Exception e) { e.printStackTrace(); } return 0; } }
Attachments
Issue Links
- duplicates
-
FLINK-2662 CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
- Closed