Details
-
Bug
-
Status: Triage Needed
-
P2
-
Resolution: Unresolved
-
None
-
None
-
None
Description
Python Reshuffle holds elements when pipeline is running, and likely release them in a batch. In contrast, Java Reshuffle triggers on every element as noted in its documentation
"the trigger used with
which triggers on every element and never buffers
- state."
Here is a working example:
def test(p: Pipeline): class SlowProcessFn(beam.DoFn): def process(self, element): time.sleep(0.5) yield element result = (p | beam.Create(range(100)) | beam.ParDo(SlowProcessFn()) | beam.Reshuffle() # HERE | beam.Map(lambda x: print(x, time.time()))) return result
Tested on local runner and flink runner (1.14), the elements are printed after 50 secs. If commenting out Reshuffle, every half second an element gets printed.
This behavior introduces issue when downstream PTransform involves some kind of time-sensitive operation, like receiving a list of updated files from input and read them done by filebasedsource.ReadAllFiles transform. Because there is a Reshuffle in ReadAll, the actual read will be blocked.
Attachments
Issue Links
- blocks
-
BEAM-14315 Update fileio.MatchContinuously to allow reading already read files with a new timestamp
- In Progress