With a high degree of parallelism, we end up with n*s number of files in each checkpoint (n = parallelism, s = stages). Writing them if fast (from many subtasks), removing them is slow (from JM).
This can't be mitigated by state.backend.fs.memory-threshold because most states are ten to hundreds Mb.
Instead of going through them 1 by 1, we could remove the directory recursively.
The easiest way is to remove channelStateHandle.discard() calls and use isRecursive=true in FsCompletedCheckpointStorageLocation.disposeStorageLocation.
Note: with the current isRecursive=false there will be an exception if there are any files left under that folder.
This can be extended to other state handles in future as well.