Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27462

could not trigger checkpoint when using table function

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.14.3
    • None
    • API / Core
    • None

    Description

      I found that it could not trigger checkpoints despite I had enable it in environment. 

      I think this problem may be related to I had used a table function in my DML. When I deploy the application the task dedicated to the table function turned to be finished immediately despite the table function had declarate the property `isDeterministic` to false.

      Below is my basic code to recur the issue:

       

      // table function
      public class GetStreamingModelSinkFilter extends TableFunction<Integer> {
          private boolean status = false;
          private String statusRedisValue;
          private final String redisKeyOfStatus;
          private final RedisInfo redisInfo;
          private RedisManager redisManager;
          private RedisCommands<String, String> redisCommands;
          private long lastCheckTimestamp = 0L;
          private long currentTimestamp;
      
          public GetStreamingModelSinkFilter() {
              ...initial something
              }
          }
      
          @Override
          public void open(FunctionContext context) throws Exception {
              redisManager = new RedisManager(redisInfo);
              redisCommands = redisManager.getCommands();
          }
      
          @Override
          public boolean isDeterministic() {
              return false;
          }
      
          public void eval() {
              if (status) {
                  collect(1);
              } else {
                  currentTimestamp = System.currentTimeMillis();
                  if (currentTimestamp - lastCheckTimestamp < 1000) {
                      collect(0);
                  } else {
                      statusRedisValue = redisCommands.get(redisKeyOfStatus);
                      if (Objects.equals(statusRedisValue, "1")) {
                          status = true;
                          collect(1);
                      } else {
                          lastCheckTimestamp = currentTimestamp;
                          collect(0);
                      }
                  }
      
              }
          }
      
          @Override
          public void close() throws Exception {
              redisManager.close();
          }
      } 

      Below's the DML:

       

       

      INSERT INTO TEST SELECT .... T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS T(MARK) ON TRUE WHERE MARK = 1
      

      TASKS SNAPSHOT OF THE APPLICATION

       

      Attachments

        1. image-2022-05-01-11-22-03-127.png
          48 kB
          Spongebob
        2. image-2022-05-01-11-23-07-063.png
          19 kB
          Spongebob
        3. image-2022-05-01-11-23-24-938.png
          30 kB
          Spongebob

        Activity

          People

            Unassigned Unassigned
            SpongebobZ Spongebob
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: