1. The TaskCoordinators/TaskInstance setup still bugs me a bit. The reason it bugs me is that we've made the coordinator mutable, and live beyond a single process loop, but have kept the APIs the same.
What do you think about just giving the TaskInstanceCoordinator as a parameter in TaskInstance's constructor, and eliminating the (..., coordinator: TaskCoordinators) parameter from all methods in SamzaContainer/TaskInstance? The SamzaContainer can still use the TaskCoordinators to check commit/shutdown, and reset.
The reason I like this approach a bit more is because I think it fits a bit better with how the coordinator works now. Before, we had to pass the coordinator into every method because it's lifecycle only extended to one process() invocation. Without this, it seems better to just give it to the TaskInstance once.
2. One other thing to think through is that having the coordinator live beyond a single process() call means that a StreamTask can hold on to it and potentially do weird things like manipulate it from another thread. In general, we haven't allows this, which is why the ReadableCoordinator only lived for one process-loop. The only place where we deviate from this (that I recall) is with the MetricsRegistry in the TaskContext. In a sense, this is the same problem that I describe in (1), above. We are continuing to tread the coordinator as a single-process-loop object, when it's not anymore.
What do you think about keeping ReadableCoordinator the way it is (GC'd at end of process()-loop), and having a separate shutdown object that the SamzaCoordinator updates? (I realize this is a different approach than what I was saying in (1) above, but bear with me.) This approach would look at the coordinator's shutdown invocation (SHUTDOWN_NOW, or WAIT_FOR_ALL_TASKS), and update the shutdown object accordingly. The SamzaContainer would then check the shutdown object to see if it should shutdown. I kind of like this approach because it keeps the ReadableCoordinator more isolated, which I worry about when giving objects to StreamTasks.
3. Also, I'm a little worried about performance here. We're adding three new loops per-process loop. These loops have proven to be slow (especially the Scala loops). I think this is OK, though, since we're going to have to tackle the loop-performance issue holistically in the SamzaContainer anyway.
Taking the approach I outline in (2) would eliminate some of these loops because you could simply discard the process()-local coordinator, rather than iterating over all of them to reset them.