Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18748

Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

    XMLWordPrintableJSON

Details

    Description

      Inspired by a user-zh email

      After FLINK-17342, when triggering a checkpoint/savepoint, we'll check whether the request can be triggered in CheckpointRequestDecider#chooseRequestToExecute, the logic is as follow:

      Preconditions.checkState(Thread.holdsLock(lock));
      // 1. 
      if (isTriggering || queuedRequests.isEmpty()) {
         return Optional.empty();
      }
      
      // 2 too many ongoing checkpoitn/savepoint
      if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
         return Optional.of(queuedRequests.first())
            .filter(CheckpointTriggerRequest::isForce)
            .map(unused -> queuedRequests.pollFirst());
      }
      
      // 3 check the timestamp of last complete checkpoint
      long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
      if (nextTriggerDelayMillis > 0) {
         return onTooEarly(nextTriggerDelayMillis);
      }
      
      return Optional.of(queuedRequests.pollFirst());
      

      But if currently pendingCheckpointsSizeSupplier.get() < maxConcurrentCheckpointAttempts, and the request is a savepoint, the savepoint will still wait some time in step 3. 

      I think we should trigger the savepoint immediately if pendingCheckpointSizeSupplier.get() < maxConcurrentCheckpointAttempts.

      Attachments

        Issue Links

          Activity

            People

              wayland tao wang
              klion26 Congxian Qiu
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: