Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29926

File source continuous monitoring mode ignoring files during savepoint upgrade mode

    XMLWordPrintableJSON

Details

    Description

      During a stateful application upgrade using flink kubernetes operator, the StreamExecutionEnvironment.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any new changes that has happened on the same file in the directory.
       
      Background : Currently we have a fresh deployment of the application using kuberenetes operator using savepoint as the upgarde mode and checkpoint enabled.

      env.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator starts continuosly monitoring the directory (S3 prefix) for any changes and also checkpoints for the provided duration.

      2022-11-07 10:47:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: s3://test-app/configs
      ...
      ...
      2022-11-07 10:47:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667817365000 and global mod time= 1667817365000
      ...
      ...
      2022-11-07 10:51:40.896 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction checkpointed 1667817365000.

      Now we try to upgrade the application using the kubernetes operator, due to this the application tries to take savepoint by using the below Suspend Mechanism - Cancel with savepoint.
      By doing this, the application calls the cancel methods which inturn sets the globalModificationTime = Long.MAX_VALUE and then the savepoint is taken.

      2022-11-07 10:54:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667817365000 and global mod time= 1667817365000
      ...
      2022-11-07 10:55:12.899 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction checkpointed 9223372036854775807
      ....
      2022-11-07 10:55:13.090 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Closed File Monitoring Source for path: s3://test-app/

      Due to this, the globalModificationTime changed from 1667817365000 to MAX_VALUE (9223372036854775807) and gets stored in the savepoint state.

      Once the application restarts with the new changes, the env.readFile() operator restores the previous state in which the globalModificationTime = Long.MAX_VALUE and starts ignoring any changes done to the file after upgrade

      2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='INFO' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Restoring state for the ContinuousFileMonitoringFunction
      ....
      2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction retrieved a global mod time of 9223372036854775807
      ....
      2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: s3://test-app/configs
      ....
      2022-11-07 11:00:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807
      ...
      ...
      2022-11-07 11:01:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807
      ...
      2022-11-07 11:02:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807

      Cause : The above issue seems to be due the reassignment of the globalModificationTime to MAX_VALUE during cancel
      https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L389
       
       

      Attachments

        Activity

          People

            Unassigned Unassigned
            avks Avinash
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: