Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5520

Disable outer joins with non-equality predicates

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Done
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Outer joins with non-equality predicates (and at least one equality predicate) compute incorrect results.

      Since this is not a very common requirement, I propose to disable this feature for the 1.2.0 release and correctly implement it for a later version.

      The fix should add checks in the Table API validation phase (to get a good error message) and in the DataSetJoinRule to prevent translation of SQL queries with non-equality predicates on outer joins.

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.2.0 with 9073c53f902df7b3dbbdec2a30d86030e49fe27e
          Fixed for 1.3.0 with d1301c82b85c00284d90e8f5bdac4fd86dc5b173

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.2.0 with 9073c53f902df7b3dbbdec2a30d86030e49fe27e Fixed for 1.3.0 with d1301c82b85c00284d90e8f5bdac4fd86dc5b173
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3141

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3141
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3141

          Thanks for the update @lincoln-lil!
          +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3141 Thanks for the update @lincoln-lil! +1 to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3141

          I think it is fine to keep this small difference for now. With FLINK-5498 and FLINK-5511 these will be gone.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3141 I think it is fine to keep this small difference for now. With FLINK-5498 and FLINK-5511 these will be gone.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user lincoln-lil commented on the issue:

          https://github.com/apache/flink/pull/3141

          @fhueske when I add more test cases for "outer joins" with local predicates, I found different limitations between tableAPI and SQL, e.g., "SELECT c, g FROM Table3 left outer join Table5 on a = d AND e < 3" is valid, but dsFromTuple3.leftOuterJoin(dsFromTuple5, 'a === 'd && 'e < 3) is invalid, because validation in Join node sees original join predicates and validates simply, while in DataSetJoinRule the join predicates could be pruned according to the equivalence in relational algebra.
          I have a question, should we keep the tableAPI consistent with SQL? If so, the validation in tableAPI will be much more complex, and it reminds me the previous discussion about validation either in one or two places. Or just maintain the current implementation unchanged, because we will add support for both FLINK-5498 and FLINK-5511 soon, then these differences will be eliminated.
          What do you think ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user lincoln-lil commented on the issue: https://github.com/apache/flink/pull/3141 @fhueske when I add more test cases for "outer joins" with local predicates, I found different limitations between tableAPI and SQL, e.g., "SELECT c, g FROM Table3 left outer join Table5 on a = d AND e < 3" is valid, but dsFromTuple3.leftOuterJoin(dsFromTuple5, 'a === 'd && 'e < 3) is invalid, because validation in Join node sees original join predicates and validates simply, while in DataSetJoinRule the join predicates could be pruned according to the equivalence in relational algebra. I have a question, should we keep the tableAPI consistent with SQL? If so, the validation in tableAPI will be much more complex, and it reminds me the previous discussion about validation either in one or two places. Or just maintain the current implementation unchanged, because we will add support for both FLINK-5498 and FLINK-5511 soon, then these differences will be eliminated. What do you think ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user lincoln-lil commented on the issue:

          https://github.com/apache/flink/pull/3141

          @fhueske thanks for your review, I'll update this pr according to your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user lincoln-lil commented on the issue: https://github.com/apache/flink/pull/3141 @fhueske thanks for your review, I'll update this pr according to your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3141#discussion_r96843797

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala —
          @@ -375,4 +395,62 @@ class JoinITCase(

          Assert.assertEquals(0, result)
          }
          +
          + @Test(expected = classOf[TableException])
          + def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
          +
          + val env = ExecutionEnvironment.getExecutionEnvironment
          + val tEnv = TableEnvironment.getTableEnvironment(env, config)
          + tEnv.getConfig.setNullCheck(true)
          +
          + val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
          +
          + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
          + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
          + tEnv.registerTable("Table3", ds1)
          + tEnv.registerTable("Table5", ds2)
          +
          + tEnv.sql(sqlQuery).toDataSet[Row].collect()
          +
          + val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
          — End diff –

          This line can be removed

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3141#discussion_r96843797 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala — @@ -375,4 +395,62 @@ class JoinITCase( Assert.assertEquals(0, result) } + + @Test(expected = classOf [TableException] ) + def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet [Row] .collect() + + val results = tEnv.sql(sqlQuery).toDataSet [Row] .collect() — End diff – This line can be removed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3141#discussion_r96856944

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala —
          @@ -234,7 +249,38 @@ class JoinITCase(
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }

          • @Test
            + @Test(expected = classOf[ValidationException])
            + def testLeftJoinWithNotOnlyEquiJoin(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) + + val results = joinT.toDataSet[Row].collect() + }

            +
            + @Test(expected = classOf[ValidationException])
            + def testFullJoinWithNotOnlyEquiJoin(): Unit =

            { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) + + val results = joinT.toDataSet[Row].collect() + }

            +
            +

              • End diff –

          Add tests to check that local predicates are not allowed in outer joins as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3141#discussion_r96856944 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala — @@ -234,7 +249,38 @@ class JoinITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test + @Test(expected = classOf [ValidationException] ) + def testLeftJoinWithNotOnlyEquiJoin(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) + + val results = joinT.toDataSet[Row].collect() + } + + @Test(expected = classOf [ValidationException] ) + def testFullJoinWithNotOnlyEquiJoin(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + + val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) + + val results = joinT.toDataSet[Row].collect() + } + + End diff – Add tests to check that local predicates are not allowed in outer joins as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3141#discussion_r96843825

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala —
          @@ -375,4 +395,62 @@ class JoinITCase(

          Assert.assertEquals(0, result)
          }
          +
          + @Test(expected = classOf[TableException])
          + def testRightOuterJoinWithNonEquiJoinPredicate(): Unit =

          { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet[Row].collect() + + val results = tEnv.sql(sqlQuery).toDataSet[Row].collect() + }

          +
          + @Test(expected = classOf[TableException])
          + def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
          +
          + val env = ExecutionEnvironment.getExecutionEnvironment
          + val tEnv = TableEnvironment.getTableEnvironment(env, config)
          + tEnv.getConfig.setNullCheck(true)
          +
          + val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
          +
          + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
          + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
          + tEnv.registerTable("Table3", ds1)
          + tEnv.registerTable("Table5", ds2)
          +
          + tEnv.sql(sqlQuery).toDataSet[Row].collect()
          +
          + val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
          — End diff –

          remove

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3141#discussion_r96843825 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala — @@ -375,4 +395,62 @@ class JoinITCase( Assert.assertEquals(0, result) } + + @Test(expected = classOf [TableException] ) + def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet[Row].collect() + + val results = tEnv.sql(sqlQuery).toDataSet[Row].collect() + } + + @Test(expected = classOf [TableException] ) + def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet [Row] .collect() + + val results = tEnv.sql(sqlQuery).toDataSet [Row] .collect() — End diff – remove
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3141#discussion_r96841502

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala —
          @@ -454,30 +454,54 @@ case class Join(

          private def testJoinCondition(expression: Expression): Unit = {

          • def checkIfJoinCondition(exp : BinaryComparison) = exp.children match { - case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil - if x.isFromLeftInput != y.isFromLeftInput => Unit - case x => failValidation( - s"Invalid non-join predicate $exp. For non-join predicates use Table#where.") - }

            + def checkIfJoinCondition(exp: BinaryComparison) = exp.children match {
            + case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
            + if x.isFromLeftInput != y.isFromLeftInput => true
            + case x => false

              • End diff –

          `case x` -> `case _`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3141#discussion_r96841502 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala — @@ -454,30 +454,54 @@ case class Join( private def testJoinCondition(expression: Expression): Unit = { def checkIfJoinCondition(exp : BinaryComparison) = exp.children match { - case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil - if x.isFromLeftInput != y.isFromLeftInput => Unit - case x => failValidation( - s"Invalid non-join predicate $exp. For non-join predicates use Table#where.") - } + def checkIfJoinCondition(exp: BinaryComparison) = exp.children match { + case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil + if x.isFromLeftInput != y.isFromLeftInput => true + case x => false End diff – `case x` -> `case _`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3141#discussion_r96843888

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala —
          @@ -375,4 +395,62 @@ class JoinITCase(

          Assert.assertEquals(0, result)
          }
          +
          + @Test(expected = classOf[TableException])
          + def testRightOuterJoinWithNonEquiJoinPredicate(): Unit =

          { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet[Row].collect() + + val results = tEnv.sql(sqlQuery).toDataSet[Row].collect() + }

          +
          + @Test(expected = classOf[TableException])
          + def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit =

          { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet[Row].collect() + + val results = tEnv.sql(sqlQuery).toDataSet[Row].collect() + }

          +
          + @Test(expected = classOf[TableException])
          + def testFullOuterJoinWithNonEquiJoinPredicate(): Unit = {
          +
          + val env = ExecutionEnvironment.getExecutionEnvironment
          + val tEnv = TableEnvironment.getTableEnvironment(env, config)
          + tEnv.getConfig.setNullCheck(true)
          +
          + val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
          +
          + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
          + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
          + tEnv.registerTable("Table3", ds1)
          + tEnv.registerTable("Table5", ds2)
          +
          + tEnv.sql(sqlQuery).toDataSet[Row].collect()
          +
          + val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
          — End diff –

          remove

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3141#discussion_r96843888 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala — @@ -375,4 +395,62 @@ class JoinITCase( Assert.assertEquals(0, result) } + + @Test(expected = classOf [TableException] ) + def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet[Row].collect() + + val results = tEnv.sql(sqlQuery).toDataSet[Row].collect() + } + + @Test(expected = classOf [TableException] ) + def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet[Row].collect() + + val results = tEnv.sql(sqlQuery).toDataSet[Row].collect() + } + + @Test(expected = classOf [TableException] ) + def testFullOuterJoinWithNonEquiJoinPredicate(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + tEnv.getConfig.setNullCheck(true) + + val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet [Row] .collect() + + val results = tEnv.sql(sqlQuery).toDataSet [Row] .collect() — End diff – remove
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3141#discussion_r96856158

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala —
          @@ -83,6 +83,27 @@ class JoinITCase(
          val env = ExecutionEnvironment.getExecutionEnvironment
          val tEnv = TableEnvironment.getTableEnvironment(env, config)

          + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6"
          — End diff –

          I think this test covers the same case as the one above (`testJoinWithFilter()`).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3141#discussion_r96856158 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala — @@ -83,6 +83,27 @@ class JoinITCase( val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6" — End diff – I think this test covers the same case as the one above (`testJoinWithFilter()`).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3141

          Great thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3141 Great thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user lincoln-lil commented on the issue:

          https://github.com/apache/flink/pull/3141

          @fhueske FLINK-5547 make sense to me, I'll update this pr today.

          Show
          githubbot ASF GitHub Bot added a comment - Github user lincoln-lil commented on the issue: https://github.com/apache/flink/pull/3141 @fhueske FLINK-5547 make sense to me, I'll update this pr today.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3141

          The difference between failing during validation and translation is the code line that throws the exception. A check during validation causes an exception on the line that is responsible for the error. A check during translation throws an exception much later in the user program.

          Regarding the check in the rule: IMO, each rule must ensure that is does not translate the query into an invalid plan. It make sense though, to have an additional safety check in the `DataSetJoin`. However, this check should be done in the constructor and not in the `translateToPlan()` method (see FLINK-5547) to fail fast if a rule tries to create an invalid plan.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3141 The difference between failing during validation and translation is the code line that throws the exception. A check during validation causes an exception on the line that is responsible for the error. A check during translation throws an exception much later in the user program. Regarding the check in the rule: IMO, each rule must ensure that is does not translate the query into an invalid plan. It make sense though, to have an additional safety check in the `DataSetJoin`. However, this check should be done in the constructor and not in the `translateToPlan()` method (see FLINK-5547 ) to fail fast if a rule tries to create an invalid plan.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user lincoln-lil commented on the issue:

          https://github.com/apache/flink/pull/3141

          @fhueske agree with you that the check in validation phase of the Table API is useful for failing fast( IMO, it seems no difference on user experience that when an exception be thrown either in validation or translation phase ?), but there may be other rules can match and generate alternative plans to execute an outer join. For instance, we internal implemented a HbaseTableSource extends TableSource that can be joined as a TableSourceTable. We created a new rule that is used to match inner and outer joins. Though such special tableSource probably may not be suitable to be merged into Flink
          master, in the future there could be other TableSourceTables which can support different join predicates depending on their different physical storage characteristics.

          Show
          githubbot ASF GitHub Bot added a comment - Github user lincoln-lil commented on the issue: https://github.com/apache/flink/pull/3141 @fhueske agree with you that the check in validation phase of the Table API is useful for failing fast( IMO, it seems no difference on user experience that when an exception be thrown either in validation or translation phase ?), but there may be other rules can match and generate alternative plans to execute an outer join. For instance, we internal implemented a HbaseTableSource extends TableSource that can be joined as a TableSourceTable. We created a new rule that is used to match inner and outer joins. Though such special tableSource probably may not be suitable to be merged into Flink master, in the future there could be other TableSourceTables which can support different join predicates depending on their different physical storage characteristics.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3141

          I agree with @lincoln-lil and @shaoxuan-wang, this fix should go into the `1.1-release`, `1.2-release`, and `master` branches. We will later add support for non-equality predicate on left/right outer joins to master and include it for 1.3.0.

          Regarding the check, I agree with @wuchong. We implement these checks in `RelOptRule.matches()` to prevent that an invalid plan is constructed. Otherwise it may happen that we end up with an invalid plan although, there would be a valid (but more expensive) alternative. We have additional checks in the DataSet operators to be on the safe side but these should never be triggered. If this happens, something is wrong with the optimization phase. So its kind of a safety net. Although, I don't think there are rules to generate alternative plans to execute an outer join with non-equality predicate, I would like to keep the pattern for consistency reasons.
          Also the check in the validation phase of the Table API is useful, because the validation happens immediately when a function on a `Table` is called. In contrast, the check on `DataSetJoin` would happen when the translation is triggered, i.e., when the program is optimized and translated into a DataSet program. Throwing an exception early is helpful for users because they can much easier identify an invalid statement.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3141 I agree with @lincoln-lil and @shaoxuan-wang, this fix should go into the `1.1-release`, `1.2-release`, and `master` branches. We will later add support for non-equality predicate on left/right outer joins to master and include it for 1.3.0. Regarding the check, I agree with @wuchong. We implement these checks in `RelOptRule.matches()` to prevent that an invalid plan is constructed. Otherwise it may happen that we end up with an invalid plan although, there would be a valid (but more expensive) alternative. We have additional checks in the DataSet operators to be on the safe side but these should never be triggered. If this happens, something is wrong with the optimization phase. So its kind of a safety net. Although, I don't think there are rules to generate alternative plans to execute an outer join with non-equality predicate, I would like to keep the pattern for consistency reasons. Also the check in the validation phase of the Table API is useful, because the validation happens immediately when a function on a `Table` is called. In contrast, the check on `DataSetJoin` would happen when the translation is triggered, i.e., when the program is optimized and translated into a DataSet program. Throwing an exception early is helpful for users because they can much easier identify an invalid statement.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3141

          @wuchong @lincoln-lil , I do not think this is just for 1.2.0. The out join with non equality functionality is broken on master. We should deliver this fix to the master as well as release 1.2.0 until we have concrete plans to fix non equality functionality.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3141 @wuchong @lincoln-lil , I do not think this is just for 1.2.0. The out join with non equality functionality is broken on master. We should deliver this fix to the master as well as release 1.2.0 until we have concrete plans to fix non equality functionality.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user lincoln-lil commented on the issue:

          https://github.com/apache/flink/pull/3141

          Hi, wuchong, thanks for your review. I'll switch the pr based on 1.2.0.
          I think it's not necessary doing validation repeatedly both in DataSetJoinRule.java and operators.scala, and there already being several validations in DataSetJoin.translateToPlan function, why don't we put the same validation together to reduce code?

          Show
          githubbot ASF GitHub Bot added a comment - Github user lincoln-lil commented on the issue: https://github.com/apache/flink/pull/3141 Hi, wuchong, thanks for your review. I'll switch the pr based on 1.2.0. I think it's not necessary doing validation repeatedly both in DataSetJoinRule.java and operators.scala, and there already being several validations in DataSetJoin.translateToPlan function, why don't we put the same validation together to reduce code?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3141

          Hi @lincoln-lil , thank your for the PR. I think the issue is only for 1.2.0, so please create a new pull request to commit into `release-1.2`. For `master` we will directly fix the bug.

          I have taken a quick look at it. I think a better approach is to disable outer joins with non-equality in [`DataSetJoinRule`](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala#L43), not mix the code in SQL translation. such as

          ```scala
          override def matches(call: RelOptRuleCall): Boolean =

          { val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin] val joinInfo = join.analyzeCondition // joins require an equi-condition or a conjunctive predicate with at least one equi-condition // and outer joins with non-equality predicates is not supported currently !joinInfo.pairs().isEmpty && (join.getJoinType == JoinRelType.INNER || joinInfo.isEqui) }

          ```

          And also please add checks in [`operators.scala`](https://github.com/apache/flink/blob/release-1.2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala#L455) to give a good exception for TableAPI outer joins.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3141 Hi @lincoln-lil , thank your for the PR. I think the issue is only for 1.2.0, so please create a new pull request to commit into `release-1.2`. For `master` we will directly fix the bug. I have taken a quick look at it. I think a better approach is to disable outer joins with non-equality in [`DataSetJoinRule`] ( https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala#L43 ), not mix the code in SQL translation. such as ```scala override def matches(call: RelOptRuleCall): Boolean = { val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin] val joinInfo = join.analyzeCondition // joins require an equi-condition or a conjunctive predicate with at least one equi-condition // and outer joins with non-equality predicates is not supported currently !joinInfo.pairs().isEmpty && (join.getJoinType == JoinRelType.INNER || joinInfo.isEqui) } ``` And also please add checks in [`operators.scala`] ( https://github.com/apache/flink/blob/release-1.2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala#L455 ) to give a good exception for TableAPI outer joins.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user lincoln-lil opened a pull request:

          https://github.com/apache/flink/pull/3141

          FLINK-5520 [table] Disable outer joins with non-equality predicates

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/lincoln-lil/flink FLINK-5520

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3141.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3141


          commit be877a3a9bb9d00f54fbb929e805f95af1ed1cd2
          Author: lincoln-lil <lincoln.86xy@gmail.com>
          Date: 2017-01-17T14:42:39Z

          FLINK-5520 [table] Disable outer joins with non-equality predicates


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user lincoln-lil opened a pull request: https://github.com/apache/flink/pull/3141 FLINK-5520 [table] Disable outer joins with non-equality predicates Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/lincoln-lil/flink FLINK-5520 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3141.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3141 commit be877a3a9bb9d00f54fbb929e805f95af1ed1cd2 Author: lincoln-lil <lincoln.86xy@gmail.com> Date: 2017-01-17T14:42:39Z FLINK-5520 [table] Disable outer joins with non-equality predicates

            People

            • Assignee:
              lincoln.86xy lincoln.lee
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development