Details
-
Bug
-
Status: In Progress
-
P1
-
Resolution: Unresolved
-
2.27.0, 2.28.0, 2.29.0, 2.30.0, 2.35.0
-
None
-
None
Description
`WriteToFiles` seems to not be working as it should. I have been running some tests and my conclusion is that once the condition for the destination is met once, it is not checked again until a new condition (not seen prev) is met. This results in wrong distribution of elements across files.
Example code 1:
(p | "Create" >> beam.Create(range(100)) | beam.Map(lambda x: str(x)) | fileio.WriteToFiles( path="./dynamic/", destination=lambda n: "in" if n in ["17"] else "out", sink=fileio.TextSink(), file_naming=fileio.destination_prefix_naming("test")) )
Here, the expected result should be a file called "in-xyz" containing only "17" and another called "out-xyz" containing the rest. What we see is that "out" contains numbers 0 to 16 and once 17 condition is met, the rest of numbers would go to "in". So we would have "out" from 0 to 16, "in" from 17 on, which is wrong.
Changing the number shows it too.
____________________
Example code 2:
def odd_even(x): value = "even" if int(x) % 2 == 0 else "odd" print(value, x) return value (p | "Create" >> beam.Create(range(100)) | beam.Map(lambda x: str(x)) | fileio.WriteToFiles( path="./dynamic/", destination=odd_even, sink=fileio.TextSink(), file_naming=fileio.destination_prefix_naming("test")) )
We can see that the `odd_even` fn is return the right value, but destination is still wrong. We get "even" only with 0 and "odd" with the rest of numbers, since the condition changed with element "1"
____________________
Example code 3:
Trying more conditionals or different `file_naming` doesn't fix this
def test_15(n): three = "three" if int(n) % 3 == 0 else "" five = "five" if int(n) % 5 == 0 else "" return f"value-{three}{five}" def time_format(): def _inner(window, pane, shard_index, total_shards, compression, destination): print(window, pane, shard_index, total_shards, compression, destination) return f"dest-{destination}-shards-{shard_index}-of-{total_shards}" return _inner (p | "Create" >> beam.Create(range(N)) | beam.Map(lambda x: str(x)) | fileio.WriteToFiles( path="./dynamic/", destination=test_15, sink=fileio.TextSink(), file_naming=time_format()) )
adding shards or other variables don't help either.
I have tested this in different SDKs (27, 29, 30) and Dataflow, DirectRunner, InteractiveRunner
Attachments
Issue Links
- links to