Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
1.14.3
-
None
-
None
Description
I found that it could not trigger checkpoints despite I had enable it in environment.
I think this problem may be related to I had used a table function in my DML. When I deploy the application the task dedicated to the table function turned to be finished immediately despite the table function had declarate the property `isDeterministic` to false.
Below is my basic code to recur the issue:
// table function public class GetStreamingModelSinkFilter extends TableFunction<Integer> { private boolean status = false; private String statusRedisValue; private final String redisKeyOfStatus; private final RedisInfo redisInfo; private RedisManager redisManager; private RedisCommands<String, String> redisCommands; private long lastCheckTimestamp = 0L; private long currentTimestamp; public GetStreamingModelSinkFilter() { ...initial something } } @Override public void open(FunctionContext context) throws Exception { redisManager = new RedisManager(redisInfo); redisCommands = redisManager.getCommands(); } @Override public boolean isDeterministic() { return false; } public void eval() { if (status) { collect(1); } else { currentTimestamp = System.currentTimeMillis(); if (currentTimestamp - lastCheckTimestamp < 1000) { collect(0); } else { statusRedisValue = redisCommands.get(redisKeyOfStatus); if (Objects.equals(statusRedisValue, "1")) { status = true; collect(1); } else { lastCheckTimestamp = currentTimestamp; collect(0); } } } } @Override public void close() throws Exception { redisManager.close(); } }
Below's the DML:
INSERT INTO TEST SELECT .... T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS T(MARK) ON TRUE WHERE MARK = 1
TASKS SNAPSHOT OF THE APPLICATION