I am seeing some strange behaviour that may be a bug, or may just be intended.
We are running a Flink job on a 1.10.1 cluster running with 1 JobManager and 2 TaskManagers, parallelism 4. The job itself is simple:
- Source: kinesis connector reading from a single shard stream
- CEP: ~25 CEP Keyed Pattern operators watching the event stream for different kinds of behaviour. They all have ".withinSeconds(xxxx)" applied. Nothing is set up to grow endlessly.
- Sink: Single operator writing messages to SQS (custom code)
We are seeing the checkpoint size grow constantly until the job is restarted using a savepoint/restore. The size continues to grow past the point that the ".withinSeconds(xxxx)" limits should cause old data to be discarded. The growth is also out of proportion to the general platform growth (which is actually trending down at the moment due to COVID).
I've attached a snapshot from our monitoring dashboard below. You can see the huge drops in state_size on a savepoint/restore.
Our state configuration is as follows:
Max Concurrent: 1
Externalised Checkpoints: RETAIN_ON_CANCELLATION
TTL Compaction Filter enabled: TRUE
We are worried that the CEP library may be leaking state somewhere, leaving some objects not cleaned up. Unfortunately I can't share one of these checkpoints with the community due to the sensitive nature of the data contained within, but if anyone has any suggestions for how I could analyse the checkpoints to look for leaks, please let me know.
Thanks in advance for the help