For the following setup of chained operators:
Lets assume that input buffer of AsyncOperator is full. We start processing a record from the SourceOperator, we pass it to the FlatMap, which fan it out (multiplies it 10 times). First multiplied record reaches AsyncOperator and is special treated (stored in AsyncWaitOperator#pendingStreamElementQueueEntry ) and then AsyncWaitOperator waits (and releases) on the checkpoint lock (in AsyncWaitOperator#addAsyncBufferEntry . If a checkpoint is triggered now, both SourceOperator and FlatMap will be checkpointed assumed that all of those 10 multiplied records were processed, which is not true. Only the first one is checkpointed by the AsyncWatiOperator. Remaining 9 are not. So if we ever restore state from this checkpoint, we have lost those 9 records.
Similar issue (I think previously known) can happen if for example some upstream operator to the AsyncOperator fires a processing time timer, that emits some data. But in that case, AsyncWaitOperator#pendingStreamElementQueueEntry is being overwritten.
If upstream operator has the following pseudo code:
one would assume that stateA and stateB access/writes will be atomic from the perspective of the checkpoints. But again, because AsyncWaitOperator releases the checkpoint lock, they will not be.