Right now, if a job failed before the first successful checkpoint, the CheckpointCoordinator will not reset the OperatorCoordinator state. This may leave the OperatorCoordinators in inconsistent state.
The CheckpointCoordinator should also reset the OperatorCoordinator state in this case, just like it does for the master hooks. It essentially means "reset to no checkpoint". There are two options for the fix:
- Add a reset() method to the OperatorCoordinator.
- Call resetToCheckpoint(null) on the OperatorCoordinator.