Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10615

Cancel with savepoint can fail the job

    XMLWordPrintableJSON

Details

    Description

      Description
      Cancelling a job with a savepoint can lead to a job failure, if failOnCheckpointingErrors is set to true in the checkpoint config.

      Analysis

      1. The checkpoint scheduler gets stopped before taking the savepoint to ensure that no additional checkpoints are taken afterwards.
      2. All pending checkpoints are aborted, which leads to disposal of all the data at the checkpoint storage location.
      3. Job fails because the pending checkpoint cannot be finalized (see Stacktrace).

      Stacktrace

      2018-10-19 13:19:49,960 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job General purpose test job (a6f8f88c8875b83c25da2a6c15aba0a9) switched from state RUNNING to FAILING.
      AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 4 for operator Source: Custom Source -> Timestamps/Watermarks (1/1).}
              at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
              at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
              at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator Source: Custom Source -> Timestamps/Watermarks (1/1).
              at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
              ... 6 more
      Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs:/checkpoints/a6f8f88c8875b83c25da2a6c15aba0a9/chk-4/d74c1116-c
      de0-4ab3-a180-204164a0f413 in order to obtain the stream state handle
              at java.util.concurrent.FutureTask.report(FutureTask.java:122)
              at java.util.concurrent.FutureTask.get(FutureTask.java:192)
              at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
              at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
              at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
              ... 5 more
      Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs:/checkpoints/a6f8f88c8875b83c25da2a6c15aba0a9/chk-4/d74c1116-cde0-4ab3-a180-204164a0f413 in order to ob
      tain the stream state handle
              at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
              at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:823)
              at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:752)
              at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
              ... 7 more
      Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /checkpoints/a6f8f88c8875b83c25da2a6c15aba0a9/chk-4/d74c1116-cde0-4ab3-a180-204164a0f413 (inode 9
      55932) Holder DFSClient_NONMAPREDUCE_-869828808_46 does not have any open files.
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2673)
              at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFileInternal(FSDirWriteFileOp.java:621)
              at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.completeFile(FSDirWriteFileOp.java:601)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2717)
              at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:882)
              at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:556)
              at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
              at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
              at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
              at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
              at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:422)
              at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840)
              at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)
      
              at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
              at org.apache.hadoop.ipc.Client.call(Client.java:1435)
              at org.apache.hadoop.ipc.Client.call(Client.java:1345)
              at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
              at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
              at com.sun.proxy.$Proxy17.complete(Unknown Source)
              at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:484)
              at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
              at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:498)
              at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
              at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
              at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
              at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
              at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
              at com.sun.proxy.$Proxy18.complete(Unknown Source)
              at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:908)
              at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:867)
              at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:850)
              at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805)
              at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
              at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
              at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
              at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
              at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
              ... 12 more
      

      How to reproduce

      1. Submit DataStreamAllroundTestProgram from flink-end-to-end-tests
      2. Invoke cancel with savepoint from CLI
        bin/flink cancel -s hdfs:///[...]
        
      3. Repeat until cancel fails with the above exception.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              gjy Gary Yao
              Votes:
              2 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: