Details
-
New Feature
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
1.3.0
-
None
Description
I found the expected result of a unit test case incorrect compare to that in a RDMBS,
see flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
def testRightJoinWithNotOnlyEquiJoin(): Unit = { ... val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) val expected = "Hello world,BCD\n" val results = joinT.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) }
Then I took some time to learn about the ‘outer join’ in relational databases, the right result of above case should be(tested in SQL Server and MySQL, the results are same):
> select c, g from tuple3 right outer join tuple5 on a=f and b<h;
c g
-------------------------------- --------------------------------
NULL Hallo
NULL Hallo Welt
NULL Hallo Welt wie
NULL Hallo Welt wie gehts?
NULL ABC
Hello world BCD
NULL CDE
NULL DEF
NULL EFG
NULL FGH
NULL GHI
NULL HIJ
NULL IJK
NULL JKL
NULL KLM
the join condition rightOuterJoin('a === 'd && 'b < 'h) is not equivalent to rightOuterJoin('a === 'd).where('b < 'h).
The problem is rooted in the code-generated JoinFunction (see DataSetJoin.translateToPlan(), line 188). If the join condition does not match, we must emit the outer row padded with nulls instead of returning from the function without emitting anything.
The code-generated JoinFunction does also include equality predicates. These should be removed before generating the code, e.g., in DataSetJoinRule when generating the DataSetJoin with help of JoinInfo.getRemaining().
More details: https://goo.gl/ngekca
Attachments
Issue Links
- is duplicated by
-
FLINK-5511 Add support for outer joins with local predicates
- Closed
- links to