The shutdown method of the checkpoint coordinator is not called when a Flink cluster is shutdown via SIGINT. The issue is that the checkpoint coordinator shutdown/cleanup is only called after the job enters a final state. This does not happen for regular cluster shutdown (via kill). Because we don't have proper stopping of streaming jobs, this means that every program using checkpointing is suffering from this.
I've tested this only locally for now with a custom WordCount checkpointing the current count. When stopping the process, the files still exist. Since this is the same mechanism as in a distributed setup with HDFS, this should mean that files in HDFS will be lingering around.
The problem is that the postStop method of the JM actor is not called when shutting down. The task manager components, which need to do resource cleanup register custom shutdown hooks and don't rely on a shutdown call from the task manager.
For 0.9.1 we need to make sure that the state is simply cleaned up with a shutdown hook (as in the blob manager). For 0.10 with HA we need to be more careful and not clean it up when other job manager instances need access. See
FLINK-2354 for details.