Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
During a stateful application upgrade using flink kubernetes operator, the StreamExecutionEnvironment.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any new changes that has happened on the same file in the directory.
Background : Currently we have a fresh deployment of the application using kuberenetes operator using savepoint as the upgarde mode and checkpoint enabled.
env.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator starts continuosly monitoring the directory (S3 prefix) for any changes and also checkpoints for the provided duration.
2022-11-07 10:47:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: s3://test-app/configs ... ... 2022-11-07 10:47:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667817365000 and global mod time= 1667817365000 ... ... 2022-11-07 10:51:40.896 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction checkpointed 1667817365000.
Now we try to upgrade the application using the kubernetes operator, due to this the application tries to take savepoint by using the below Suspend Mechanism - Cancel with savepoint.
By doing this, the application calls the cancel methods which inturn sets the globalModificationTime = Long.MAX_VALUE and then the savepoint is taken.
2022-11-07 10:54:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667817365000 and global mod time= 1667817365000 ... 2022-11-07 10:55:12.899 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction checkpointed 9223372036854775807 .... 2022-11-07 10:55:13.090 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Closed File Monitoring Source for path: s3://test-app/
Due to this, the globalModificationTime changed from 1667817365000 to MAX_VALUE (9223372036854775807) and gets stored in the savepoint state.
Once the application restarts with the new changes, the env.readFile() operator restores the previous state in which the globalModificationTime = Long.MAX_VALUE and starts ignoring any changes done to the file after upgrade
2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='INFO' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Restoring state for the ContinuousFileMonitoringFunction .... 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - ContinuousFileMonitoringFunction retrieved a global mod time of 9223372036854775807 .... 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path: s3://test-app/configs .... 2022-11-07 11:00:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807 ... ... 2022-11-07 11:01:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807 ... 2022-11-07 11:02:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy Source Thread - Source: Custom File Source (1/1)#0'] org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@ - Ignoring s3://test-app/configs/control-event-config.json, with mod time= 1667821399000 and global mod time= 9223372036854775807
Cause : The above issue seems to be due the reassignment of the globalModificationTime to MAX_VALUE during cancel
https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L389