Details
-
Sub-task
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
None
Description
Fix bug for Semi/Anti WindowJoin.
//代码占位符 @Test def testSemiJoinIN(): Unit = { val sql = """ |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) L WHERE L.a IN ( |SELECT a FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) R |WHERE L.window_start = R.window_start AND L.window_end = R.window_end) """.stripMargin util.verifyRelPlan(sql) } @Test def testSemiExist(): Unit = { val sql = """ |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) L WHERE EXISTS ( |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) R |WHERE L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a) """.stripMargin util.verifyRelPlan(sql) } @Test def testAntiJoinNotExist(): Unit = { val sql = """ |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) L WHERE NOT EXISTS ( |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) R |WHERE L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a) """.stripMargin util.verifyRelPlan(sql) } @Test def testAntiJoinNotIN(): Unit = { val sql = """ |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) L WHERE L.a NOT IN ( |SELECT a FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) R |WHERE L.window_start = R.window_start AND L.window_end = R.window_end) """.stripMargin util.verifyRelPlan(sql) }
Now run the above sql, an `ArrayIndexOutOfBoundsException` would be thrown out.
Attachments
Issue Links
- relates to
-
FLINK-19606 Implement streaming window join operator
- Closed
- links to