Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31414

exceptions in the alignment timer are ignored

    XMLWordPrintableJSON

Details

    Description

      Alignment timer task in alternating aligned checkpoint run as a future task in mailbox thread, causing the exceptions (SingleCheckpointBarrierHandler#registerAlignmentTimer()) to be ignored. These exceptions should have failed the task, but now this will cause the same checkpoint to fire twice initInputsCheckpoints in my test.

       

       switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: unable to send request to worker
              at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:247)
              at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:161)
              at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.prepareSnapshot(StreamTaskNetworkInput.java:103)
              at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.prepareSnapshot(StreamOneInputProcessor.java:83)
              at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.prepareSnapshot(StreamMultipleInputProcessor.java:122)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.prepareInputSnapshot(StreamTask.java:518)
              at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.prepareInflightDataSnapshot(SubtaskCheckpointCoordinatorImpl.java:655)
              at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.initInputsCheckpoint(SubtaskCheckpointCoordinatorImpl.java:515)
              at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.initInputsCheckpoint(SingleCheckpointBarrierHandler.java:516)
              at org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:46)
              at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:54)
              at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
              at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
              at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
              at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
              at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
              at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
              at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
              at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
              at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
              at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
              at java.lang.Thread.run(Thread.java:748)
              Suppressed: java.io.IOException: java.lang.IllegalStateException: writer not found for request start 17
                      at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.close(ChannelStateWriteRequestExecutorImpl.java:175)
                      at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.close(ChannelStateWriterImpl.java:235)
                      at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cancel(SubtaskCheckpointCoordinatorImpl.java:564)
                      at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.close(SubtaskCheckpointCoordinatorImpl.java:551)
                      at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
                      at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
                      at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
                      at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943)
                      at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917)
                      at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
                      at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
                      ... 3 more
              Caused by: java.lang.IllegalStateException: writer not found for request start 17
                      at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
                      at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:75)
                      at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:62)
                      at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
                      at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
                      ... 1 more
      Caused by: java.lang.IllegalStateException: not running
              at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152)
              at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144)
              at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:128)
              at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:244)
              ... 27 more
              [CIRCULAR REFERENCE:java.lang.IllegalStateException: writer not found for request start 17] 

       

       

      see : BarrierAlignmentUtil#createRegisterTimerCallback()

       

      Attachments

        Issue Links

          Activity

            People

              Feifan Wang Feifan Wang
              Feifan Wang Feifan Wang
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: