Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12554

WriteToFiles destination no changing condition

Details

    • Bug
    • Status: In Progress
    • P1
    • Resolution: Unresolved
    • 2.27.0, 2.28.0, 2.29.0, 2.30.0, 2.35.0
    • None
    • sdk-py-core
    • None

    Description

      `WriteToFiles` seems to not be working as it should. I have been running some tests and my conclusion is that once the condition for the destination is met once, it is not checked again until a new condition (not seen prev) is met. This results in wrong distribution of elements across files.

      Example code 1:

      (p | "Create" >> beam.Create(range(100))
         | beam.Map(lambda x: str(x))
         | fileio.WriteToFiles(
                    path="./dynamic/",
                    destination=lambda n: "in" if n in ["17"] else "out",
                    sink=fileio.TextSink(),
                    file_naming=fileio.destination_prefix_naming("test"))
      )
      

      Here, the expected result should be a file called "in-xyz" containing only "17" and another called "out-xyz" containing the rest. What we see is that "out" contains numbers 0 to 16 and once 17 condition is met, the rest of numbers would go to "in". So we would have "out" from 0 to 16, "in" from 17 on, which is wrong.

      Changing the number shows it too.

      ____________________
      Example code 2:

      def odd_even(x):
          value = "even" if int(x) % 2 == 0 else "odd"
          print(value, x)
          return value
      
      (p | "Create" >> beam.Create(range(100))
         | beam.Map(lambda x: str(x))
         | fileio.WriteToFiles(
                    path="./dynamic/",
                    destination=odd_even,
                    sink=fileio.TextSink(),
                    file_naming=fileio.destination_prefix_naming("test"))
      )
      

      We can see that the `odd_even` fn is return the right value, but destination is still wrong. We get "even" only with 0 and "odd" with the rest of numbers, since the condition changed with element "1"

      ____________________
      Example code 3:

      Trying more conditionals or different `file_naming` doesn't fix this

      def test_15(n):
          three = "three" if int(n) % 3 == 0 else ""
          five = "five" if int(n) % 5 == 0 else ""
          return f"value-{three}{five}"
      
      def time_format():
          def _inner(window, pane, shard_index, total_shards, compression, destination):
              print(window, pane, shard_index, total_shards, compression, destination)
              return f"dest-{destination}-shards-{shard_index}-of-{total_shards}"
          return _inner
      
      (p | "Create" >> beam.Create(range(N))
         | beam.Map(lambda x: str(x))
         | fileio.WriteToFiles(
                    path="./dynamic/",
                    destination=test_15,
                    sink=fileio.TextSink(),
                    file_naming=time_format())
      )
      

      adding shards or other variables don't help either.

      I have tested this in different SDKs (27, 29, 30) and Dataflow, DirectRunner, InteractiveRunner

      Attachments

        Issue Links

          Activity

            People

              yihu Yi Hu
              Inigosj Inigo San Jose Visiers
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h 20m
                  3h 20m