Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
OperatorImplGraph builds EndOfStreamStates and WatermarkStates objects with all of the input SSPs from the job model. That includes side-input SSPs. However, high-level operator tasks aren't given messages from side-input SSPs, so high-level operators should not need to include handling for end-of-stream and watermarks.
The result of this issue is that end-of-stream and watermark handling tries to include side-inputs but never updates those states, which can result in not exiting properly (end-of-stream) and not correctly calculating watermarks.
We currently have tests which use partitionBy and side-inputs, but they only use a single partition, so RunLoop is able to shutdown the task (RunLoop doesn't check side inputs when determining if the task is at the end of all streams). Normally, OperatorImpl will shut down the task when using high-level, and I think changing OperatorImpl to do ignore side input SSPs so that it does shut down the task is the fix.