Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-2491 Support Checkpoints After Tasks Finished
  3. FLINK-23938

Do not resume channels if the barrier is received via RPC

    XMLWordPrintableJSON

Details

    Description

      Currently for a task, if all its predecessors are finished, they would notify the JM and emit EndOfPartition at the same time. If JM first received the notification, then for the next checkpoint, it would directly trigger this task.

      In this case, the task would fake a barrier for the channels that have not received EndOfPartition yet. This is right based on the current logic since the predecessors would wait till all the pending records are processed before head to finish.

      However, in this case when processing the barriers, we should not resume the corresponding channels, otherwise the upstream subpartition would throws exception since it is not blocked.

      28074 55458 [Map -> Map (10/12)#0] WARN  org.apache.flink.runtime.taskmanager.Task [] - Map -> Map (10/12)#0 (0e7fa4cb19227c4bba52d11f031178f0) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: Should be blocked by checkpoint.
      	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
      	at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.resumeConsumption(PipelinedSubpartition.java:381)
      	at org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.resumeConsumption(PipelinedSubpartitionView.java:79)
      	at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.resumeConsumption(LocalInputChannel.java:283)
      	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.resumeConsumption(SingleInputGate.java:857)
      	at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.resumeConsumption(InputGateWithMetrics.java:67)
      	at org.apache.flink.streaming.runtime.io.checkpointing.ChannelState.unblockAllChannels(ChannelState.java:76)
      	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:70)
      	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
      	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:240)
      	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:257)
      	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:239)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerUnfinishedChannelsCheckpoint(StreamTask.java:1201)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1118)
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:818)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:745)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:784)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:727)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)
      	at java.lang.Thread.run(Thread.java:748)
      

      Attachments

        Issue Links

          Activity

            People

              gaoyunhaii Yun Gao
              gaoyunhaii Yun Gao
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: