Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.19.0
-
None
Description
// Add it in CalcItCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( changelogRow("+I", java.lang.Integer.valueOf(1), "1"), changelogRow("-U", java.lang.Integer.valueOf(1), "1"), changelogRow("+U", java.lang.Integer.valueOf(1), "99"), changelogRow("-D", java.lang.Integer.valueOf(1), "99") ) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() }
Output:
+----+-------------+-----------------+-------------+---------+ | op | a | b | a0 | b0 | +----+-------------+-----------------+-------------+---------+ | +U | 1 | 1 | 1 | 99 | | +U | 1 | 99 | 1 | 99 | | -U | 1 | 1 | 1 | 99 | | -D | 1 | 99 | 1 | 99 | +----+-------------+-----------------+-------------+---------+
Attachments
Issue Links
- is related to
-
FLINK-34349 Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch
- Closed