Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.6.4, 1.7.2, 1.8.1, 1.9.0
-
This changes the default chaining behavior of the AsyncWaitOperator. By default, we now break chains so that the AsyncWaitOperator is never chained after another operator.
Description
1.
For the following setup of chained operators:
SourceOperator -> FlatMap -> AsyncOperator
Lets assume that input buffer of AsyncOperator is full. We start processing a record from the SourceOperator, we pass it to the FlatMap, which fan it out (multiplies it 10 times). First multiplied record reaches AsyncOperator and is special treated (stored in AsyncWaitOperator#pendingStreamElementQueueEntry ) and then AsyncWaitOperator waits (and releases) on the checkpoint lock (in AsyncWaitOperator#addAsyncBufferEntry . If a checkpoint is triggered now, both SourceOperator and FlatMap will be checkpointed assumed that all of those 10 multiplied records were processed, which is not true. Only the first one is checkpointed by the AsyncWatiOperator. Remaining 9 are not. So if we ever restore state from this checkpoint, we have lost those 9 records.
2.
Similar issue (I think previously known) can happen if for example some upstream operator to the AsyncOperator fires a processing time timer, that emits some data. But in that case, AsyncWaitOperator#pendingStreamElementQueueEntry is being overwritten.
3.
If upstream operator has the following pseudo code:
stateA = true output.collect(x) stateB = true
one would assume that stateA and stateB access/writes will be atomic from the perspective of the checkpoints. But again, because AsyncWaitOperator releases the checkpoint lock, they will not be.
Attachments
Issue Links
- is related to
-
FLINK-16219 Make AsyncWaitOperator chainable again
- Closed
- links to