Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-2662

CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 0.9.1, 0.10.0
    • Fix Version/s: 1.0.0, 1.2.0, 1.3.0, 1.1.5
    • Component/s: Optimizer
    • Labels:
      None

      Description

      I have a Flink program which throws the exception in the jira title. Full text:

      Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.
      at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
      at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
      at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
      at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
      at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
      at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
      at org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
      at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
      at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
      at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
      at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
      at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
      at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
      at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
      at org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
      at org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
      at malom.Solver.main(Solver.java:66)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:497)
      at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

      The execution plan:
      http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
      (I obtained this by commenting out the line that throws the exception)

      The code is here:
      https://github.com/ggevay/flink/tree/plan-generation-bug
      The class to run is "Solver". It needs a command line argument, which is a directory where it would write output. (On first run, it generates some lookuptables for a few minutes, which are then placed to /tmp/movegen)

      1. Bug.java
        3 kB
        Yassine Marzougui
      2. FlinkBug.scala
        1 kB
        Lorenz Bühmann

        Issue Links

          Activity

          Hide
          Zentol Chesnay Schepler added a comment -

          I tried reproducing the error but the shipping strategy is picked correctly on my machine.

          Show
          Zentol Chesnay Schepler added a comment - I tried reproducing the error but the shipping strategy is picked correctly on my machine.
          Hide
          ggevay Gabor Gevay added a comment -

          That's strange. I have also tried to reproduce this now, and it is still giving the same error for me. Do you have any idea as to what characteristic of my machine might influence the plan generation in a way that this error happens only for me?

          Tomorrow I will try this on my work laptop as well, and report back.

          Show
          ggevay Gabor Gevay added a comment - That's strange. I have also tried to reproduce this now, and it is still giving the same error for me. Do you have any idea as to what characteristic of my machine might influence the plan generation in a way that this error happens only for me? Tomorrow I will try this on my work laptop as well, and report back.
          Hide
          ggevay Gabor Gevay added a comment -

          It is working on my work laptop...

          Show
          ggevay Gabor Gevay added a comment - It is working on my work laptop...
          Hide
          Zentol Chesnay Schepler added a comment -

          are you sure that you ran it both times against the same flink version?

          sine i have no idea about the optimizer this is pretty much my only idea.

          Show
          Zentol Chesnay Schepler added a comment - are you sure that you ran it both times against the same flink version? sine i have no idea about the optimizer this is pretty much my only idea.
          Hide
          ggevay Gabor Gevay added a comment -

          Yes, because I checked out the branch that is referred here in both cases.

          Show
          ggevay Gabor Gevay added a comment - Yes, because I checked out the branch that is referred here in both cases.
          Hide
          fhueske Fabian Hueske added a comment -

          Hi guys, did you make progress on this issue?
          Is it still reproducable?

          Show
          fhueske Fabian Hueske added a comment - Hi guys, did you make progress on this issue? Is it still reproducable?
          Hide
          ggevay Gabor Gevay added a comment -

          Hi,
          Unfortunately, I haven't made any progress. It is still reproducible, I have now rebased to the current master, and the same thing happens when I try it on my home laptop. This is the rebased branch:
          https://github.com/ggevay/flink/tree/plan-generation-bug-rebased

          Show
          ggevay Gabor Gevay added a comment - Hi, Unfortunately, I haven't made any progress. It is still reproducible, I have now rebased to the current master, and the same thing happens when I try it on my home laptop. This is the rebased branch: https://github.com/ggevay/flink/tree/plan-generation-bug-rebased
          Hide
          Zentol Chesnay Schepler added a comment -

          I could reproduce the issue on my laptop as well now. Now to find the culprit...

          Show
          Zentol Chesnay Schepler added a comment - I could reproduce the issue on my laptop as well now. Now to find the culprit...
          Hide
          LorenzB Lorenz Bühmann added a comment -

          I do have the same problem:

          Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.
          at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
          at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:73)
          at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
          at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
          at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
          at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
          at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
          at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
          at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
          at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
          at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
          at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:184)
          at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
          at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
          at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
          at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
          at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1615)
          at org.sansa.inference.flink.conformance.FlinkBug$.main(FlinkBug.scala:31)
          at org.sansa.inference.flink.conformance.FlinkBug.main(FlinkBug.scala)

          I tried both, Flink 1.0.3 and Flink 1.1.0-SNAPSHOT.

          Any progress on it?

          Show
          LorenzB Lorenz Bühmann added a comment - I do have the same problem: Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: Plan generation for Unions picked a ship strategy between binary plan operators. at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:73) at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:184) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1615) at org.sansa.inference.flink.conformance.FlinkBug$.main(FlinkBug.scala:31) at org.sansa.inference.flink.conformance.FlinkBug.main(FlinkBug.scala) I tried both, Flink 1.0.3 and Flink 1.1.0-SNAPSHOT. Any progress on it?
          Hide
          waliaashish85 Ashish Walia added a comment -

          I ran into the same problem with Flink 1.0.3 when I was trying to combine two datasets (these two datasets were previously obtained by combining a bunch of files using union in the reduce step) using joinwithTiny and then performing further operations on the joined dataset. As a workaround, I used groupBy and reduceGroup to collect the desired records.

          Show
          waliaashish85 Ashish Walia added a comment - I ran into the same problem with Flink 1.0.3 when I was trying to combine two datasets (these two datasets were previously obtained by combining a bunch of files using union in the reduce step) using joinwithTiny and then performing further operations on the joined dataset. As a workaround, I used groupBy and reduceGroup to collect the desired records.
          Hide
          fhueske Fabian Hueske added a comment -

          OK, I'll give it a shot and will try to fix this bug.
          Can anybody of you Ashish Walia, Lorenz Bühmann, or Gabor Gevay provide an (as concise as possible) example program that fails?

          Thanks, Fabian

          Show
          fhueske Fabian Hueske added a comment - OK, I'll give it a shot and will try to fix this bug. Can anybody of you Ashish Walia , Lorenz Bühmann , or Gabor Gevay provide an (as concise as possible) example program that fails? Thanks, Fabian
          Hide
          LorenzB Lorenz Bühmann added a comment - - edited

          Fabian Hueske I attached a file. It (hopefully) contains a minimal example that leads to the reported exception. While keeping it as minimal as possible, the whole logic behind the program got lost - so please don't think about it's meaning. Flink version used was 1.1.0 via Maven

          Show
          LorenzB Lorenz Bühmann added a comment - - edited Fabian Hueske I attached a file. It (hopefully) contains a minimal example that leads to the reported exception. While keeping it as minimal as possible, the whole logic behind the program got lost - so please don't think about it's meaning. Flink version used was 1.1.0 via Maven
          Hide
          fhueske Fabian Hueske added a comment -

          Thanks a lot Lorenz Bühmann! This will be very helpful.

          I'll try to look into this issue soon but will be quite busy the next couple of days.

          Show
          fhueske Fabian Hueske added a comment - Thanks a lot Lorenz Bühmann ! This will be very helpful. I'll try to look into this issue soon but will be quite busy the next couple of days.
          Hide
          LorenzB Lorenz Bühmann added a comment -

          No problem, just let me know if you need something more Fabian Hueske

          Show
          LorenzB Lorenz Bühmann added a comment - No problem, just let me know if you need something more Fabian Hueske
          Hide
          fhueske Fabian Hueske added a comment -

          I found the root cause of this problem. The plan contains a Union (U1) operator with two outputs of which one is another union operator (U2). One of the input requires a partitioning shipping strategy (U1, pushed down from a following distinct), the other not. In some cases, the partitioning is not pushed to the input of U1, such that the connection between U1 and U2 is hash-partitioned. As one of the last steps, the optimizer merges consecutive binary union operator into an n-ary union operator and checks that their connection is a simple forward connection, i.e., not a partitioned connection. This is where the translation fails.

          I have a fix for this issue that translates union operators with two (or more) inputs into multiple internal union operators with a single output. Thereby, we avoid multiple competing shipping strategies for union operators and a potential partitioning is always pushed to the two input of a union.
          Note, the fix touches only the program translation. The API does not change and the program does not need to be changed.

          I'll open a PR with the fix soon.

          Show
          fhueske Fabian Hueske added a comment - I found the root cause of this problem. The plan contains a Union (U1) operator with two outputs of which one is another union operator (U2). One of the input requires a partitioning shipping strategy (U1, pushed down from a following distinct), the other not. In some cases, the partitioning is not pushed to the input of U1, such that the connection between U1 and U2 is hash-partitioned. As one of the last steps, the optimizer merges consecutive binary union operator into an n-ary union operator and checks that their connection is a simple forward connection, i.e., not a partitioned connection. This is where the translation fails. I have a fix for this issue that translates union operators with two (or more) inputs into multiple internal union operators with a single output. Thereby, we avoid multiple competing shipping strategies for union operators and a potential partitioning is always pushed to the two input of a union. Note, the fix touches only the program translation. The API does not change and the program does not need to be changed. I'll open a PR with the fix soon.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user fhueske opened a pull request:

          https://github.com/apache/flink/pull/2508

          FLINK-2662 [dataSet] Translate union with multiple output into separate unions with single output.

          Fixes FLINK-2662 by translating Union operators with two (or more) successors into two or more Union operators with a single successor.

          In the optimizer union operators with two (or more) successors caused problems, when these successors had different partitioning requirements and some of these successors were other Union operators. In certain situations, the UnionMerging post pass would fail because of a non-forward shipping strategy between two subsequent union operators.

          This fix does only adapt the program translation and does not change the API.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/fhueske/flink FLINK-2662

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2508.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2508


          commit 8d91e9d0074884ac430c88c4f6ad41878a8d1dff
          Author: Fabian Hueske <fhueske@apache.org>
          Date: 2016-09-16T16:40:32Z

          FLINK-2662 [dataSet] Translate union with multiple output into separate unions with single output.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/2508 FLINK-2662 [dataSet] Translate union with multiple output into separate unions with single output. Fixes FLINK-2662 by translating Union operators with two (or more) successors into two or more Union operators with a single successor. In the optimizer union operators with two (or more) successors caused problems, when these successors had different partitioning requirements and some of these successors were other Union operators. In certain situations, the UnionMerging post pass would fail because of a non-forward shipping strategy between two subsequent union operators. This fix does only adapt the program translation and does not change the API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink FLINK-2662 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2508.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2508 commit 8d91e9d0074884ac430c88c4f6ad41878a8d1dff Author: Fabian Hueske <fhueske@apache.org> Date: 2016-09-16T16:40:32Z FLINK-2662 [dataSet] Translate union with multiple output into separate unions with single output.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2508

          Another possibility would be to simply add a `NoOp` operator behind a union with multiple outputs with different partitioning schemes. That prevents the unions from being merged.

          Would that not be better, in terms of not duplicating records (as it happens when having multiple instances of the one union operator)?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2508 Another possibility would be to simply add a `NoOp` operator behind a union with multiple outputs with different partitioning schemes. That prevents the unions from being merged. Would that not be better, in terms of not duplicating records (as it happens when having multiple instances of the one union operator)?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2508

          Adding a NoOp would prevent the unions from being merged, but the output of the NoOp would still need to be replicated because it needs to be served to two different operators.

          if you have something like
          ```
          1 ------------\
          >-> U_2 -> X
          2 -\ /
          >-> U_1 -<
          3 -/ -> Y
          ```

          Duplicating `U_1` would would temporarily result in
          ```

          1 --------------\
          >-> U_2 -> X
          /
          2 -/> U_11 -/
          X
          3 /-> U_12-> Y
          ```

          The generated plan with merged unions would be

          ```
          1 --\
          >>> X
          / /
          2 //--\
          / >-> Y
          3 /---/
          ```

          With adding a NoOp the plan would be:

          ```
          1 -----------\
          >-> X
          2 -\ /
          >-> NO -<
          3 -/ -> Y
          ```

          This plan would also duplicate record (the output of NoOp) and in addition add serialization overhead due to the additional operator.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2508 Adding a NoOp would prevent the unions from being merged, but the output of the NoOp would still need to be replicated because it needs to be served to two different operators. if you have something like ``` 1 ------------\ >-> U_2 -> X 2 -\ / >-> U_1 -< 3 -/ -> Y ``` Duplicating `U_1` would would temporarily result in ``` 1 --------------\ >-> U_2 -> X / 2 -/ > U_11 -/ X 3 / -> U_12-> Y ``` The generated plan with merged unions would be ``` 1 --\ > > > X / / 2 / /--\ / >-> Y 3 / ---/ ``` With adding a NoOp the plan would be: ``` 1 -----------\ >-> X 2 -\ / >-> NO -< 3 -/ -> Y ``` This plan would also duplicate record (the output of NoOp) and in addition add serialization overhead due to the additional operator.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2508

          I think you are right.

          +1 to merge then

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2508 I think you are right. +1 to merge then
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2508

          Thanks, merging then

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2508 Thanks, merging then
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2508

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2508
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.1.3 with a7f6594b6b47b91242cbb0a13ea4efc5508adcfc
          Fixed for 1.2.0 with 303f6fee99b731dd138e37513705271f97f76d72

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.1.3 with a7f6594b6b47b91242cbb0a13ea4efc5508adcfc Fixed for 1.2.0 with 303f6fee99b731dd138e37513705271f97f76d72
          Hide
          ymarzougui Yassine Marzougui added a comment -

          I ran into the same problem with Flink 1.1.3 when trying to perform partitionByRange(1).sortPartition(1, Order.DESCENDING) on the union of DataSets.
          I have attached a program reproducing the bug.

          Show
          ymarzougui Yassine Marzougui added a comment - I ran into the same problem with Flink 1.1.3 when trying to perform partitionByRange(1).sortPartition(1, Order.DESCENDING) on the union of DataSets. I have attached a program reproducing the bug.
          Hide
          fhueske Fabian Hueske added a comment -

          Reopened due to the bug reported by Yassine Marzougui.

          Show
          fhueske Fabian Hueske added a comment - Reopened due to the bug reported by Yassine Marzougui .
          Hide
          Zentol Chesnay Schepler added a comment -

          Niels Basjes reported another instance of this issue, which can be reproduced with the following code:

          The smallest code snippet I have been able to create that reproduces this problem is below here.
          Note that when using a single union this error does not happen.

          public class Main implements Serializable {
            public static void main(String[] args) throws Exception {
              System.exit(new Main().run());
            }
          
            private int run() throws IOException {
              final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
          
              final DataSet<String> lines =
                         env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist")))
                  .union(env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist"))))
                  .union(env.createInput(new TextInputFormat(new Path("/tmp/doesNotExist"))));
          
              List<String> allLines = new ArrayList<>();
              lines
                .rebalance()
                .output(new LocalCollectionOutputFormat<>(allLines));
          
              // execute program
              try {
                env.execute("Running");
              } catch (Exception e) {
                e.printStackTrace();
              }
              return 0;
            }
          }
          
          Show
          Zentol Chesnay Schepler added a comment - Niels Basjes reported another instance of this issue, which can be reproduced with the following code: The smallest code snippet I have been able to create that reproduces this problem is below here. Note that when using a single union this error does not happen. public class Main implements Serializable { public static void main( String [] args) throws Exception { System .exit( new Main().run()); } private int run() throws IOException { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final DataSet< String > lines = env.createInput( new TextInputFormat( new Path( "/tmp/doesNotExist" ))) .union(env.createInput( new TextInputFormat( new Path( "/tmp/doesNotExist" )))) .union(env.createInput( new TextInputFormat( new Path( "/tmp/doesNotExist" )))); List< String > allLines = new ArrayList<>(); lines .rebalance() .output( new LocalCollectionOutputFormat<>(allLines)); // execute program try { env.execute( "Running" ); } catch (Exception e) { e.printStackTrace(); } return 0; } }
          Hide
          nielsbasjes Niels Basjes added a comment -

          I have the full reproduction application available here: https://github.com/nielsbasjes/Reproduce-FLINK-5025
          Note that it uses Flink 1.1.3

          Show
          nielsbasjes Niels Basjes added a comment - I have the full reproduction application available here: https://github.com/nielsbasjes/Reproduce-FLINK-5025 Note that it uses Flink 1.1.3
          Hide
          fhueske Fabian Hueske added a comment -

          I have a patch for this issue.
          Should get into the 1.1.4 (and 1.2.0) releases.

          Cheers, Fabian

          Show
          fhueske Fabian Hueske added a comment - I have a patch for this issue. Should get into the 1.1.4 (and 1.2.0) releases. Cheers, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user fhueske opened a pull request:

          https://github.com/apache/flink/pull/2848

          FLINK-2662 [optimizer] Fix computation of global properties of union operator.

          Fix computation of global properties of union operator.
          This solves the problem of invalid shipping strategy between consecutive unions.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/fhueske/flink unionBug

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2848.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2848


          commit a226bcb13f6398988d9f4359ae371cbb1bf71465
          Author: Fabian Hueske <fhueske@apache.org>
          Date: 2016-11-21T18:06:42Z

          FLINK-2662 [optimizer] Fix computation of global properties of union operator.

          • Fixes invalid shipping strategy between consecutive unions.

          Show
          githubbot ASF GitHub Bot added a comment - GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/2848 FLINK-2662 [optimizer] Fix computation of global properties of union operator. Fix computation of global properties of union operator. This solves the problem of invalid shipping strategy between consecutive unions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink unionBug Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2848.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2848 commit a226bcb13f6398988d9f4359ae371cbb1bf71465 Author: Fabian Hueske <fhueske@apache.org> Date: 2016-11-21T18:06:42Z FLINK-2662 [optimizer] Fix computation of global properties of union operator. Fixes invalid shipping strategy between consecutive unions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2848

          Thanks, looks good +1

          If I see this correctly, the global properties logic was simply incomplete and only handled hash partitioning before?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2848 Thanks, looks good +1 If I see this correctly, the global properties logic was simply incomplete and only handled hash partitioning before?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2848

          @StephanEwen Yes, that's right.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2848 @StephanEwen Yes, that's right.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2848

          Merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2848 Merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2848

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2848
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.1.4 with efbd293afe4348b0f199e2c66a990ae6880edcef
          Fixed for 1.2.0 with 7d91b9ec71c9b711e04a91f847f5c85d3f561da6

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.1.4 with efbd293afe4348b0f199e2c66a990ae6880edcef Fixed for 1.2.0 with 7d91b9ec71c9b711e04a91f847f5c85d3f561da6
          Hide
          ymarzougui Yassine Marzougui added a comment -

          I came across this bug again with the following code:

          final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                  env.fromElements(Tuple1.of("one"))
                          .join(env.fromElements(Tuple1.of("one"))
                                  .union(env.fromElements(Tuple1.of("two")))
                                  .union(env.fromElements(Tuple1.of("three")))
                                  .union(env.fromElements(Tuple1.of("four"))), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND)
                          .where(0)
                          .equalTo(0)
                          .print();
          
          Show
          ymarzougui Yassine Marzougui added a comment - I came across this bug again with the following code: final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.fromElements(Tuple1.of( "one" )) .join(env.fromElements(Tuple1.of( "one" )) .union(env.fromElements(Tuple1.of( "two" ))) .union(env.fromElements(Tuple1.of( "three" ))) .union(env.fromElements(Tuple1.of( "four" ))), JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND) .where(0) .equalTo(0) .print();
          Hide
          fhueske Fabian Hueske added a comment -

          Which version?

          Show
          fhueske Fabian Hueske added a comment - Which version?
          Hide
          ymarzougui Yassine Marzougui added a comment -

          1.3-SNAPSHOT (Commit: 6ac5794)

          Show
          ymarzougui Yassine Marzougui added a comment - 1.3-SNAPSHOT (Commit: 6ac5794)
          Hide
          fhueske Fabian Hueske added a comment -

          Thanks!
          This seems to be a never ending story with many special cases :-/
          I'll reopen the issue. Should be fixed for 1.2.

          Show
          fhueske Fabian Hueske added a comment - Thanks! This seems to be a never ending story with many special cases :-/ I'll reopen the issue. Should be fixed for 1.2.
          Hide
          fhueske Fabian Hueske added a comment -

          Another special case of this was reported.

          Show
          fhueske Fabian Hueske added a comment - Another special case of this was reported.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user fhueske opened a pull request:

          https://github.com/apache/flink/pull/3083

          FLINK-2662 [optimizer] Fix translation of broadcasted unions.

          Fix optimizer to support union with > 2 broadcasted inputs.

          1. `BinaryUnionDescriptor` is changed to compute fully replicated global property if both inputs are fully replicated.
          2. Enumeration of plans for binary union is changed to enforce local forward connection if input is fully replicated.
          3. Add a test to check that union with three inputs can be on broadcasted side of join (fully replicated property is requested and pushed to all inputs of the union).

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/fhueske/flink unionBug3

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3083.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3083


          commit a0019d386e09f727874e54b7c8ce2ca94b26f2e5
          Author: Fabian Hueske <fhueske@apache.org>
          Date: 2017-01-05T23:00:30Z

          FLINK-2662 [optimizer] Fix translation of broadcasted unions.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/3083 FLINK-2662 [optimizer] Fix translation of broadcasted unions. Fix optimizer to support union with > 2 broadcasted inputs. 1. `BinaryUnionDescriptor` is changed to compute fully replicated global property if both inputs are fully replicated. 2. Enumeration of plans for binary union is changed to enforce local forward connection if input is fully replicated. 3. Add a test to check that union with three inputs can be on broadcasted side of join (fully replicated property is requested and pushed to all inputs of the union). You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink unionBug3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3083.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3083 commit a0019d386e09f727874e54b7c8ce2ca94b26f2e5 Author: Fabian Hueske <fhueske@apache.org> Date: 2017-01-05T23:00:30Z FLINK-2662 [optimizer] Fix translation of broadcasted unions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on the issue:

          https://github.com/apache/flink/pull/3083

          Seems there is a long story under this issue. I will try to catch up all the histories and have a look at this pr.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3083 Seems there is a long story under this issue. I will try to catch up all the histories and have a look at this pr.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3083

          Had a quick look as well. Seems to fix the bug correctly, test is included.

          +1 from my side

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3083 Had a quick look as well. Seems to fix the bug correctly, test is included. +1 from my side
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3083

          Thanks @StephanEwen. Will merge the fix.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3083 Thanks @StephanEwen. Will merge the fix.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3083

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3083
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.1.5 with 6566b63aa50af946ecf0d028c4e284fc6dc2a55b
          Fixed for 1.2.0 with 8c5edb2fc8e905b881401815a39941ce2aab3bbd
          Fixed for 1.3.0 with 6dffe282bc5459b9c099df9a77cc2c3f26b28ae5

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.1.5 with 6566b63aa50af946ecf0d028c4e284fc6dc2a55b Fixed for 1.2.0 with 8c5edb2fc8e905b881401815a39941ce2aab3bbd Fixed for 1.3.0 with 6dffe282bc5459b9c099df9a77cc2c3f26b28ae5

            People

            • Assignee:
              fhueske Fabian Hueske
              Reporter:
              ggevay Gabor Gevay
            • Votes:
              1 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development