Affects Version/s: 0.6.0
Fix Version/s: None
There is currently no easy way for a task to process all messages from some offset all the way to the head of a stream, and then shutdown. This behavior is useful in cases where a Samza job is re-processing old data, or for Samza jobs that only run periodically.
Now that we have
SAMZA-147 merged in, SamzaContainer has access to the stream metadata required to do these operations. We should expose this to the tasks in some way, so they can decide when to shut down.
Two ways that I can think of doing this are:
1. Add some sort of mix-in interface (a la InitableTask, ClosableTask, etc) that has a callback that's triggered whenever the last message processed by a task is equal to metadata.newest for the SystemStreamPartition.
2. Expose the SystemStreamMetadata information through the InitableTask.init's TaskContext object.
I favor (2) right now, but we should dig into the code and think through other potential solutions.
Also, something else to consider is that TaskCoordinator.shutdown currently immediately shuts down the task. There is an argument to be made that it should just mean "cease to process any messages for this task instance", and SamzaContainer should only be shutdown when ALL task instances have called coordinator.shutdown (or there is an error). This would be useful in situations where a StreamTask wishes to shutdown when it's fully caught up to head across all partitions. Right now, the first task instance to call shutdown (the first one caught up) would shut down the entire SamzaContainer. This would shut down partitions that might not be caught up yet.