With the current approach, job will fail if dstl.dfs.upload.max-in-flight (bytes) is reached.
Unsetting the limit roughly matches the current behaviour for other backends: async phase doesn't backpressure
(state.backend.rocksdb.checkpoint.transfer.thread.num only sets the upload thread pool size which uses an unbounded queue).
Note that blocking caller in DfsWriter.persistInternal() will also block regular stream processing (because of pre-emptive writes). This may or may not be desired behaviour.
Blocking sync phase of a snapshot can also have some issues (e.g. not being able to abort the checkpoint) which should be considered.