Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10740 FLIP-27: Refactor Source Interface
  3. FLINK-16986

Enhance the OperatorEvent handling guarantee during checkpointing.

    XMLWordPrintableJSON

Details

    Description

      When the CheckpointCoordinator takes a checkpoint, the checkpointing order is following:

      1. CheckpointCoordinator triggers checkpoint on each OperatorCoordinator
      2. Each OperatorCoordinator takes a snapshot.
      3. Right after taking the snapshot, the CheckpointCoordinator sends a CHECKPOINT_FIN marker through the OperatorContext.
      4. Once the OperatorContext sees CHECKPOINT_FIN marker, it will wait for all the previous events are acked and suspend the event gateway to the operators by buffering the future OperatorEvents sent from the OperatorCoordinator to the operators without actually sending them out.
      5. The CheckpointCoordinator waits until all the {{OperatorCoordinator}}s finish step 2-4 and then triggers the task snapshots.
      6. The suspension of an event gateway to an operator can be lifted after all the subtasks of that operator has finished their task checkpoint.

      The mechanism above guarantees all the OperatorEvents sent before taking the operator coordinator snapshot are handled by the operator before the task snapshots are taken.

      An operator can use this mechanism to know whether an OperatorEvent it sent to the coordinator is included in the upcoming checkpoint or not. What it has to do is to ask the operator coordinator to ACK that OperatorEvent. If the ACK is received before the operator takes the next snapshot, that OperatorEvent must have been handled and checkpointed by the OperatorCoordinator.

      Attachments

        Issue Links

          Activity

            People

              sewen Stephan Ewen
              becket_qin Jiangjie Qin
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: