Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-8638

Job restart when Checkpoint On Barrier failed

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.3.2, 1.4.0
    • None
    • None
    • None

    Description

      The following example comes from the one snapshotState process by using hdfs, snapshotState failed due to hdfs disk problems, so that triggerCheckpointOnBarrier fails and throws an exception to make the application restart. However, when restarting, flink needs to recover from the recent completed checkpoint and start chasing the data, which can lead to significant delays. We think that when StreamTask's triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails, the application should not restart but instead continue running and mark the checkpoint failed. Finally, notify the JobManager this checkpoint
      failed. By adding Checkpoint failure alarm let developers or users know this situation, and take the appropriate action. During this time, the flink job always keeps running.

       

      java.lang.Exception: Could not perform checkpoint 45843 for operator TriggerWindow(TumblingEventTimeWindows(60000), ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050, reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8}, EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map (153/459).
      at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
      at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
      at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
      at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
      at java.lang.Thread.run(Thread.java:745)
      
      Caused by: java.lang.Exception: Could not complete snapshot 45843 for operator TriggerWindow(TumblingEventTimeWindows(60000), ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050, reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8}, EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map (153/459).
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
      at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
      at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
      ... 8 more
      
      Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f in order to obtain the stream state handle
      at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
      at org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
      at org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
      at org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
      at org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
      ... 13 more
      
      Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage are bad. Aborting...
      at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109)
      at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871)
      at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:401)

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            insomnia Ryan Tao
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: