Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33932

Support Retry Mechanism in RocksDBStateDataTransfer

Agile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsAdd voteVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      Currently, there is no retry mechanism for downloading and uploading RocksDB state files. Any jittering of remote filesystem might lead to a checkpoint failure. By supporting retry mechanism in RocksDBStateDataTransfer, we can significantly reduce the failure rate of checkpoint during asynchronous phase.
      The exception is as below:

       
      2023-12-19 08:46:00,197 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline checkpoint 2 by task 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of job ffffffffa025f19e0000000000000000 at application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
      org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed.
          at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
          at java.lang.Thread.run(Thread.java:829) [?:?]
      Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> Calc[133] (184/500)#0.
          at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          ... 4 more
      Caused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush to file and close the file system output stream to hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the stream state handle
          at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
          at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
          at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          ... 3 more
      Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Could not flush to file and close the file system output stream to hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the stream state handle
          at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
          at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?]
          ... 3 more
      Caused by: org.apache.flink.util.SerializedThrowable: java.net.ConnectException: Connection timed out
          at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
          at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) ~[?:?]
          at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) ~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) ~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1835) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1268) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1257) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1414) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1149) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:652) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
       

      We can support retry mechanism for rocksdb uploader to decrease the failure rate of checkpointing in the async phase.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            xiangyu0xf xiangyu feng
            dianer17 Guojun Li

            Dates

              Created:
              Updated:

              Slack

                Issue deployment