This feature is used for supporting windowing.
The storage needs to have the following features:
1. Spillable key value storage (integrate with
2. Upon checkpoint, it saves a snapshot for the entire data set with the checkpointing window id. This should be done incrementally (ManagedState) to avoid wasting space with unchanged data
3. When recovering, it takes the recovery window id and restores to that snapshot
4. When a window is committed, all windows with a lower ID should be purged from the store.
5. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, and because of 2 and 3, we may want to add methods to the WindowedStorage interface so that the implementation of WindowedOperator can notify the storage of checkpointing, recovering and committing of a window.