Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
None
-
None
-
None
Description
TaskSideInputStorageManager is used to restore the side-input streams of a samza job.
Users can process messages from the side-input stream by plugging in a UDF.
TaskSideInputStorageManager maintains the last processed side-input message offset inmemory and flushes them to the disk periodically.
TaskSideInputStorageManager uses AsyncRunLoop to fetch messages from the side-input streams and can receive multiple messages for the same side-input stream partition concurrently depending upon the task.max.concurrency configuration.
Currently the offset updation logic in TaskSideInputStorageManager just updates the inmemory offsets map with the offset of the last processed message.
This can cause offset corruption when there's out of order event completion as in the following scenario:
Consider a side-input stream with messages: [0, 1, 2] in partition: 0 and task.max.concurrency is set to 3.
Time 0: SideInputStorageManager receives the message 0 and hands it to over to UDF. Time 1: SideInputStorageManager receives the message 1 and hands it to over to UDF. Time 2: SideInputStorageManager receives the message 2 and hands it to over to UDF. Time 3: Processing for message 2 is completed by the UDF and TaskSideInputStorageManager records the last processed offset for partition 0 as 2. Time 4: Processing for message 1 is completed by the UDF and TaskSideInputStorageManager records the last processed offset for partition 0 as 1. Time 5: Processing for message 0 is completed by the UDF and TaskSideInputStorageManager records the last processed offset for partition 0 as 0. Time 6: TaskSideInputStorageManager flushes the incorrect offset map to disk.