When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer), an IncomingMessageEnvelope with object = EndOfStreamMessage and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and signal that there is no more data to be consumed.
It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin, and SystemStreamPartitionMetadata offer a different usage of "oldestOffset", "newestOffset", and "upcomingOffset" in relation to these cases of bounded streams.
As described in SystemStreamPartitionMetadata:
Areas of concern are highlighted below in red.
In the case of HdfsSystemConsumer, messages are read from hdfs files until all have been read, and an end of stream envelope is appended to the end of the message buffer. The offset metadata returned per partition is:
oldestOffset: offset of beginning of first file in hdfs
newestOffset: offset of beginning of last file in hdfs
- Does NOT include end of stream in numeric offset count
- Per SystemStreamPartitionMetadata, "null" is meant to indicate empty. However, null seems reasonable as "upcoming" does not make much sense in the context of bounded streams.
Further differences are seen in InMemorySystem, where an IncomingMessageEnvelope is placed at the end of the buffer similar to the HdfsSystemConsumer case above, with an object = EndOfStreamMessage and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.
However, InMemorySystemAdmin will generate the following metadata per partition:
newestOffset: numeric index of last message in the buffer
- This will be the index of the end of stream message, and this semantic does not match HdfsSystemConsumer above
upcomingOffset: newestOffset + 1
- This is reasonable, but again upcoming does not make much sense in the context of bounded streams
tl;dr blocks refactoring of side input consumption from leveraging RunLoop
Side input consumption currently blocks the main container start up thread in in ContainerStorageManager, until, for each side input SSP:
- envelope from SystemConsumer returns true for envelope.isEndOfStream (the offset of the envelope is IncomingMessageEnvelope.END_OF_STREAM_OFFSET)
- envelope offset is equal to newest offset for that SSP fetched at CSM initialization
We would like to refactor this flow to, rather than using SystemConsumers directly, leverage RunLoop to take advantage of message dispatching, concurrency, etc.
Since RunLoop sinks end of stream messages, condition 1 can no longer be checked and CSM needs a consistent way of measuring that an SSP's consumption has reached "head" or "end" according to the offset metadata fetched at init time. This causes integration tests to fail as CSM will be waiting indefinitely on side input SSP consumption to be marked as "caught up".
Possible (partial) reconciliation:
- Modify InMemorySystemAdmin to return "newestOffset" as the numeric index of the buffered message immediately preceding the end of stream message. This would make the semantics of "newestOffset" match that of HdfsSystemAdmin.
- Minimal semantic change in a component used primarily internally in samza (InMemorySystem is used in test runner)
- Semantics of "upcomingOffset" would be unchanged and left inconsistent with HdfsSytemAdmin
- Modify RunLoop to either pass end of stream messages through to tasks either directly or by invoking a callback
- EndOfStreamListenerTask today is task-wide, and only called when ALL SSPs in a task reach end of stream. Giving SSP granularity might be welcome flexibility to customers who intermingle bounded / unbounded streams in the same task.
- Uncertain whether customers would actually benefit, and significant API change.
Solution 1 above is cheap and minimizes changes to the externally facing samza API, as InMemorySystem is primarily used in the internals of samza's test runner.