Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
In a scenario where:
1. task/operator received `finish()`
2. checkpoint 42 triggered (not yet completed)
3. checkpoint 43 triggered (not yet completed)
4. checkpoint 44 triggered (not yet completed)
5. notifyCheckpointComplete(43)
And what should we do now? We can of course commit all transactions until
checkpoint 43. But should we keep waiting for `notyifyCheckpointComplete(44)`? What if in the meantime another checkpoint is triggered? We could end up waiting indefinitely.
Our proposal is to shutdown the task immediately after seeing first
`notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER
`finish()`. This should be fine, as:
a) ideally there should be no new pending transactions opened after
checkpoint 42
b) even if operator/function is opening some transactions for checkpoint 43
and checkpoint 44 (`FlinkKafkaProducer`), those transactions after
checkpoint 42 should be empty
After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to:
- commit transactions from checkpoint 42, (and 43 if they were created,
depends on the user code) - close operator, aborting any pending transactions (for checkpoint 44 if
they were opened, depends on the user code)
If checkpoint 44 completes afterwards, it will still be valid. Ideally we
would recommend that after seeing `finish()` operators/functions should not
be opening any new transactions.
Attachments
Issue Links
- links to