Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
1.7.0, 1.8.0
Description
Flink's CheckpointCoordinator discards an ongoing checkpoint as soon as it receives the first decline message. Part of the discard operation is the deletion of the checkpointing directory. Depending on the underlying FileSystem implementation, concurrent write and read operation to files in the checkpoint directory can then fail (e.g. this is the case with HDFS). If there is still a local checkpointing operation running for some Task and belonging to the discarded checkpoint, then it can happen that the checkpointing operation fails (e.g. an AsyncCheckpointRunnable). Depending on the configuration of the CheckpointExceptionHandler, this can lead to a task failure and a job recovery which is caused by an already discarded checkpoint.
2019-02-16 11:26:29.378 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1389046 @ 1550287589373 for job 599a6ac3c371874d12ebf024978cadbc. 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1389046 by task 7239e5d29203c4c720ed2db6f5db33fc of job 599a6ac3c371874d12ebf024978cadbc. 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1389046 of job 599a6ac3c371874d12ebf024978cadbc. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (3/3) was not running at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2019-02-16 11:26:29.697 [flink-akka.actor.default-dispatcher-68] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3) (a5657b784d235731cd468164e85d0b50) switched from RUNNING to FAILED. org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 1389046 for operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 1389046 for operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 common frames omitted Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 common frames omitted Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 common frames omitted Caused by: org.apache.hadoop.ipc.RemoteException: java.io.IOException: Path doesn't exist: /.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkFilePath(FSNamesystem.java:3063) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3089) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3043) at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:938) at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:743) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1175) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1171) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1169) at org.apache.hadoop.ipc.Client.call(Client.java:863) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:227) at com.sun.proxy.$Proxy5.complete(Unknown Source) at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at com.sun.proxy.$Proxy5.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.closeFile(DFSClient.java:1208) at org.apache.hadoop.hdfs.DFSClient.access$5300(DFSClient.java:151) at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:6693) at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:6567) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312) ... 12 common frames omitted
Especially for larger jobs with sources which recover state and, hence, take some time before they accept a checkpoint request, this can lead to an unstable job which can be stuck in some restart loop.
A workaround for this problem is to disable failTaskOnCheckpointError in the ExecutionConfig via ExecutionConfig#setFailTaskOnCheckpointError(false). With this setting checkpoint failures won't fail the owning Task.
In order to properly solve this problem, failing local checkpoint operations belonging to a discarded checkpoint should simply be ignored. A good solution could be to centralize the checkpoint operation failure handling in the CheckpointCoordinator. The CheckpointCoordinator knows which checkpoints are still valid and, hence, can distinguish between valid and invalid checkpoint operation failures. It would, furthermore, allow to implement more sophisticated failure handling strategies such as accepting n checkpoint failures before failing a task.
Attachments
Attachments
Issue Links
- is caused by
-
FLINK-5820 Extend State Backend Abstraction to support Global Cleanup Hooks
- Closed
- is duplicated by
-
FLINK-12209 Refactor failure handling with CheckpointFailureManager
- Closed
- relates to
-
FLINK-10855 CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints
- Open
-
FLINK-10615 Cancel with savepoint can fail the job
- Closed
-
FLINK-12194 Enable setting failOnCheckpointingErrors in DataStreamAllroundTestProgram
- Resolved
-
FLINK-23916 Add documentation for how to configure the CheckpointFailureManager
- Closed
-
FLINK-10724 Refactor failure handling in check point coordinator
- Closed
- links to
Hi framst , I think there are some other issues about the same problem.
And there is an issue https://issues.apache.org/jira/browse/FLINK-10724 to refactor the error handling after the issue has been resolved, we can handle this problem elegantly.