Details
-
Improvement
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
Description
I think the existing FileIO.writeDynamic is a bit easy to misuse, as something like this looks correct, and compiles:
FileIO.writeDynamic()
.by(...)
.withNaming(new SerializableFunction[String, FileNaming] {
override def apply(str: String): FileNaming =
FileIO.Write.relativeFileNaming(
"some/directory",
new FileNaming {
override defFilename(window: BoundedWindow, pane: PaneInfo, numShards: Int, shardIndex: Int, compression: Compression): String = "some_filename.txt"}
.via(...)
.to("gs://some/bucket")
However, for dynamic writes, if `outputDirectory` (.to("...")) is set, under the hood, Beam will wrap the provided `fileNamingFn` in `FileIO.Write.relativeFileNaming(...)` as well, so it ends up as a nested `relativeFileNaming` function. (https://github.com/apache/beam/blob/da9e17288e8473925674a4691d9e86252e67d7d7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1243))]
IMO, `relativeFileNaming` should either be made private, so that it's only used internally by FileIO.Write, or a precondition should be added when a dynamic FileIO.Write is expanded, to check that `outputDirectory` can't be set if the provided `fileNamingFn` is relative.
wdyt?