Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-14497

Python Reshuffle holds elements

Details

    • Bug
    • Status: Triage Needed
    • P2
    • Resolution: Unresolved
    • None
    • None
    • sdk-py-core
    • None

    Description

      Python Reshuffle holds elements when pipeline is running, and likely release them in a batch. In contrast, Java Reshuffle triggers on every element as noted in its documentation
      "the trigger used with

      {@link Reshuffle}

      which triggers on every element and never buffers

      • state."

      Here is a working example:

      def test(p: Pipeline):
        class SlowProcessFn(beam.DoFn):
          def process(self, element):
            time.sleep(0.5)
            yield element
      
        result = (p 
          | beam.Create(range(100)) 
          | beam.ParDo(SlowProcessFn())
          | beam.Reshuffle() # HERE
          | beam.Map(lambda x: print(x, time.time())))
        return result
      

      Tested on local runner and flink runner (1.14), the elements are printed after 50 secs. If commenting out Reshuffle, every half second an element gets printed.

      This behavior introduces issue when downstream PTransform involves some kind of time-sensitive operation, like receiving a list of updated files from input and read them done by filebasedsource.ReadAllFiles transform. Because there is a Reshuffle in ReadAll, the actual read will be blocked.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              yihu Yi Hu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: