Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11662

Discarded checkpoint can cause Tasks to fail

Details

    Description

      Flink's CheckpointCoordinator discards an ongoing checkpoint as soon as it receives the first decline message. Part of the discard operation is the deletion of the checkpointing directory. Depending on the underlying FileSystem implementation, concurrent write and read operation to files in the checkpoint directory can then fail (e.g. this is the case with HDFS). If there is still a local checkpointing operation running for some Task and belonging to the discarded checkpoint, then it can happen that the checkpointing operation fails (e.g. an AsyncCheckpointRunnable). Depending on the configuration of the CheckpointExceptionHandler, this can lead to a task failure and a job recovery which is caused by an already discarded checkpoint.

      2019-02-16 11:26:29.378 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1389046 @ 1550287589373 for job 599a6ac3c371874d12ebf024978cadbc.
      2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1389046 by task 7239e5d29203c4c720ed2db6f5db33fc of job 599a6ac3c371874d12ebf024978cadbc.
      2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1389046 of job 599a6ac3c371874d12ebf024978cadbc.
      org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (3/3) was not running
      at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      2019-02-16 11:26:29.697 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3) (a5657b784d235731cd468164e85d0b50) switched from RUNNING to FAILED.
      org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 1389046 for operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
      at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
      at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
      at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not materialize checkpoint 1389046 for operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
      at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
      ... 6 common frames omitted
      Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 in order to obtain the stream state handle
      at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
      at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
      at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
      ... 5 common frames omitted
      Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 in order to obtain the stream state handle
      at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
      at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
      at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
      at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
      ... 7 common frames omitted
      Caused by: org.apache.hadoop.ipc.RemoteException: java.io.IOException: Path doesn't exist: /.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkFilePath(FSNamesystem.java:3063)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3089)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3043)
      at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:938)
      at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:601)
      at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:743)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1175)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1171)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:415)
      at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1169)
      
      at org.apache.hadoop.ipc.Client.call(Client.java:863)
      at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:227)
      at com.sun.proxy.$Proxy5.complete(Unknown Source)
      at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:497)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
      at com.sun.proxy.$Proxy5.complete(Unknown Source)
      at org.apache.hadoop.hdfs.DFSClient.closeFile(DFSClient.java:1208)
      at org.apache.hadoop.hdfs.DFSClient.access$5300(DFSClient.java:151)
      at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:6693)
      at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:6567)
      at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
      at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
      at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
      at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
      at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
      ... 12 common frames omitted
      

      Especially for larger jobs with sources which recover state and, hence, take some time before they accept a checkpoint request, this can lead to an unstable job which can be stuck in some restart loop.

      A workaround for this problem is to disable failTaskOnCheckpointError in the ExecutionConfig via ExecutionConfig#setFailTaskOnCheckpointError(false). With this setting checkpoint failures won't fail the owning Task.

      In order to properly solve this problem, failing local checkpoint operations belonging to a discarded checkpoint should simply be ignored. A good solution could be to centralize the checkpoint operation failure handling in the CheckpointCoordinator. The CheckpointCoordinator knows which checkpoints are still valid and, hence, can distinguish between valid and invalid checkpoint operation failures. It would, furthermore, allow to implement more sophisticated failure handling strategies such as accepting n checkpoint failures before failing a task.

      Attachments

        1. jobmanager.log
          18.24 MB
          Till Rohrmann
        2. taskmanager.log
          32.52 MB
          Till Rohrmann

        Issue Links

          Activity

            klion26 Congxian Qiu added a comment -

            Hi framst , I think there are some other issues about the same problem.

            And there is an issue https://issues.apache.org/jira/browse/FLINK-10724 to refactor the error handling after the issue has been resolved, we can handle this problem elegantly.

            klion26 Congxian Qiu added a comment - Hi framst , I think there are some other issues about the same problem. And there is an issue  https://issues.apache.org/jira/browse/FLINK-10724  to refactor the error handling after the issue has been resolved, we can handle this problem elegantly.
            framst Dong Ma added a comment - - edited

            klion26 thanks for the infomation. I have read the design documents but this exception is caused by checkpointCoordinator. should the checkpointCoordinator drop checkpointDirectory as a whole on a failure but know nothing about whether tasks are perferming checkpoint. Is this situation should be changed?  sewen

            framst Dong Ma added a comment - - edited klion26 thanks for the infomation. I have read the design documents but this exception is caused by checkpointCoordinator. should the checkpointCoordinator drop checkpointDirectory as a whole on a failure but know nothing about whether tasks are perferming checkpoint. Is this situation should be changed?  sewen
            yunta Yun Tang added a comment -

            framst we already have some discussion about this problem that checkpoint-coordinator deletes task writing directory in FLINK-10930.

            I think we should now separate this problem into FLINK-10724 and FLINK-10855. Any suggestion is welcome .

            yunta Yun Tang added a comment - framst we already have some discussion about this problem that checkpoint-coordinator deletes task writing directory in  FLINK-10930 . I think we should now separate this problem into FLINK-10724 and FLINK-10855 . Any suggestion is welcome .

            This also affects 1.5.x and 1.6.x, but those versions are archived so I can't add add to the "affects versions" field.

            aljoscha Aljoscha Krettek added a comment - This also affects 1.5.x and 1.6.x, but those versions are archived so I can't add add to the "affects versions" field.

            if you really want to you can unarchive them, set the affects version and archive them again.

            chesnay Chesnay Schepler added a comment - if you really want to you can unarchive them, set the affects version and archive them again.
            srichter Stefan Richter added a comment -

            Merged in:
            master: b760d55

            srichter Stefan Richter added a comment - Merged in: master: b760d55

            People

              yunta Yun Tang
              framst Dong Ma
              Votes:
              0 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m