Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-2491 Support Checkpoints After Tasks Finished
  3. FLINK-23473

Do not create transaction in TwoPhaseCommitSinkFunction after finish()




      In a scenario where:
      1. task/operator received `finish()`
      2. checkpoint 42 triggered (not yet completed)
      3. checkpoint 43 triggered (not yet completed)
      4. checkpoint 44 triggered (not yet completed)
      5. notifyCheckpointComplete(43)

      And what should we do now? We can of course commit all transactions until
      checkpoint 43. But should we keep waiting for `notyifyCheckpointComplete(44)`? What if in the meantime another checkpoint is triggered? We could end up waiting indefinitely.

      Our proposal is to shutdown the task immediately after seeing first
      `notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER
      `finish()`. This should be fine, as:
      a) ideally there should be no new pending transactions opened after
      checkpoint 42
      b) even if operator/function is opening some transactions for checkpoint 43
      and checkpoint 44 (`FlinkKafkaProducer`), those transactions after
      checkpoint 42 should be empty

      After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to:

      • commit transactions from checkpoint 42, (and 43 if they were created,
        depends on the user code)
      • close operator, aborting any pending transactions (for checkpoint 44 if
        they were opened, depends on the user code)

      If checkpoint 44 completes afterwards, it will still be valid. Ideally we
      would recommend that after seeing `finish()` operators/functions should not
      be opening any new transactions.


        Issue Links



              ym Yuan Mei
              dwysakowicz Dawid Wysakowicz
              0 Vote for this issue
              3 Start watching this issue