Details
-
Technical Debt
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
1.19.0
-
None
Description
I'm not sure if it's a bug. The following case can re-produce this situation.
// add it in CalcITCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( row(1, "1"), row(2, "2"), row(3, "3"), row(4, "4"), row(5, "5"), row(6, "6"), row(7, "7"), row(8, "8")) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() }
Result
+----+---+---+---+---+ | op | a | b | a0| b0| +----+---+---+---+---+ | +I | 3 | 3 | 3 | 3 | | +I | 7 | 7 | 7 | 7 | | +I | 2 | 2 | 2 | 2 | | +I | 5 | 5 | 5 | 5 | | +I | 1 | 1 | 1 | 1 | | +I | 6 | 6 | 6 | 6 | | +I | 4 | 4 | 4 | 4 | | +I | 8 | 8 | 8 | 8 | +----+---+---+---+---+
When I do not use minibatch join, the result is :
+----+---+---+----+----+ | op | a | b | a0 | b0 | +----+---+---+----+----+ | +I | 1 | 1 | 1 | 1 | | +I | 2 | 2 | 2 | 2 | | +I | 3 | 3 | 3 | 3 | | +I | 4 | 4 | 4 | 4 | | +I | 5 | 5 | 5 | 5 | | +I | 6 | 6 | 6 | 6 | | +I | 7 | 7 | 7 | 7 | | +I | 8 | 8 | 8 | 8 | +----+---+---+----+----+
Attachments
Issue Links
- is related to
-
FLINK-34349 Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch
- Closed