Details
-
New Feature
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
Currently there is no overwrite support when using the State Processor API to create a savepoint at a given location. For applications that may run or generate a given savepoint on a periodic basis (e.g. cron job, nightly process, etc.) this can result in an exception if the job was previously run.
This ticket proposes amending the existing `SavePointWriter` class to support passing the preferred overwrite mode as an optional parameter when writing the savepoint similar to the example below:
SavepointWriter .newSavepoint(env, new HashMapStateBackend(), maxParallelism) .withOperator(OperatorIdentifier.forUid("uid1"), transformation1) .withOperator(OperatorIdentifier.forUid("uid2"), transformation2) .write(savepointPath, FileSystem.WriteMode.OVERWRITE);
This coincides with the underlying writer class which explicitly declares the use of `FileSystem.WriteMode.NO_OVERWRITE` within the `FileCopyFunction` class as seen below:
public final class FileCopyFunction implements OutputFormat<Path> { ... @Override public void writeRecord(Path sourcePath) throws IOException { Path destPath = new Path(path, sourcePath.getName()); try (FSDataOutputStream os = destPath.getFileSystem() .create(destPath, FileSystem.WriteMode.NO_OVERWRITE); FSDataInputStream is = sourcePath.getFileSystem().open(sourcePath)) { IOUtils.copyBytes(is, os); } } ... }
An alternative solution might be to explicitly check for the existence of the file at the destination and deleting it, although the above seems much more elegant.