Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11662

Discarded checkpoint can cause Tasks to fail

    XMLWordPrintableJSON

Details

    Description

      Flink's CheckpointCoordinator discards an ongoing checkpoint as soon as it receives the first decline message. Part of the discard operation is the deletion of the checkpointing directory. Depending on the underlying FileSystem implementation, concurrent write and read operation to files in the checkpoint directory can then fail (e.g. this is the case with HDFS). If there is still a local checkpointing operation running for some Task and belonging to the discarded checkpoint, then it can happen that the checkpointing operation fails (e.g. an AsyncCheckpointRunnable). Depending on the configuration of the CheckpointExceptionHandler, this can lead to a task failure and a job recovery which is caused by an already discarded checkpoint.

      2019-02-16 11:26:29.378 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1389046 @ 1550287589373 for job 599a6ac3c371874d12ebf024978cadbc.
      2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1389046 by task 7239e5d29203c4c720ed2db6f5db33fc of job 599a6ac3c371874d12ebf024978cadbc.
      2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1389046 of job 599a6ac3c371874d12ebf024978cadbc.
      org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (3/3) was not running
      at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      2019-02-16 11:26:29.697 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3) (a5657b784d235731cd468164e85d0b50) switched from RUNNING to FAILED.
      org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 1389046 for operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
      at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
      at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
      at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not materialize checkpoint 1389046 for operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
      at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
      ... 6 common frames omitted
      Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 in order to obtain the stream state handle
      at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
      at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
      at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
      ... 5 common frames omitted
      Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 in order to obtain the stream state handle
      at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
      at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
      at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
      at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
      ... 7 common frames omitted
      Caused by: org.apache.hadoop.ipc.RemoteException: java.io.IOException: Path doesn't exist: /.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkFilePath(FSNamesystem.java:3063)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3089)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3043)
      at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:938)
      at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:601)
      at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:743)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1175)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1171)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:415)
      at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1169)
      
      at org.apache.hadoop.ipc.Client.call(Client.java:863)
      at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:227)
      at com.sun.proxy.$Proxy5.complete(Unknown Source)
      at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:497)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
      at com.sun.proxy.$Proxy5.complete(Unknown Source)
      at org.apache.hadoop.hdfs.DFSClient.closeFile(DFSClient.java:1208)
      at org.apache.hadoop.hdfs.DFSClient.access$5300(DFSClient.java:151)
      at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:6693)
      at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:6567)
      at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
      at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
      at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
      at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
      at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
      ... 12 common frames omitted
      

      Especially for larger jobs with sources which recover state and, hence, take some time before they accept a checkpoint request, this can lead to an unstable job which can be stuck in some restart loop.

      A workaround for this problem is to disable failTaskOnCheckpointError in the ExecutionConfig via ExecutionConfig#setFailTaskOnCheckpointError(false). With this setting checkpoint failures won't fail the owning Task.

      In order to properly solve this problem, failing local checkpoint operations belonging to a discarded checkpoint should simply be ignored. A good solution could be to centralize the checkpoint operation failure handling in the CheckpointCoordinator. The CheckpointCoordinator knows which checkpoints are still valid and, hence, can distinguish between valid and invalid checkpoint operation failures. It would, furthermore, allow to implement more sophisticated failure handling strategies such as accepting n checkpoint failures before failing a task.

      Attachments

        1. taskmanager.log
          32.52 MB
          Till Rohrmann
        2. jobmanager.log
          18.24 MB
          Till Rohrmann

        Issue Links

          Activity

            People

              yunta Yun Tang
              framst Dong Ma
              Votes:
              0 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m