added a comment - Currently a Samza job is defined essentially at start-up and, except checkpoint and state, is unchanged for the duration of the job. It would be nice to be able to alter some of these characteristics as the job proceeds.
Example use cases:
Adding new SystemStreamPartitions (SSPs) after a job has begun Currently via the ConfigWriter (specifically for Kafka, but accessible for any systems), the set of input SSPs can be determined at job submission, meaning no one has to enumerate all of them each time. This is nice for jobs that consume a large number expanding topics. However, after the job starts, new SSPs are not not picked up without a job restart. This is what we are doing with production jobs here. It would be better if the AM or user-code running on the AM could monitor the source-of-SSP-truth and when a new one is detected, either spin up a new container to handle it or assign it to an existing container.
Dynamically expand capacity when load increases For jobs that are consuming live site, there is often a routine, predictable increase and decrease in volume (perhaps increases during the workday and decreases during the night). Currently, one must provision the Samza job to handle peak traffic, allocating more containers than is necessary during low volume times, or, alternatively, underprovision for peak and expect latency to increase during heavy traffic periods. It would be nice if the AM, or some user code on the AM, could detect increased backpressure on the SSPs and spin up new containers, moving support of those SSPs to the new containers and removing it from the existing ones.
Signaling changes to global state, such as input or output schemas The AM, or user-code running on the AM, could monitor a central resource to detect when the input or output schema for a job has changed and signal this change to all the affected containers. Having this monitored from one central bit of code could lessen the impact, rather than say, having a thousand SamzaContainers regularly pinging away at some schema registry.
There are a couple ways to implement this AM-Container communication:
RPC We could introduce direct synchronous or asynchronous communication between the AM and containers. This would be the most flexible, but we currently have no RPC framework at all (aside from the AM-RM), so would be a significant change.
ZooKeeper Usual drill; would put more pressure on ZK than is already done, but likely to not be overly problematic.
Write state changes/commands from AM to containers into another topic This approach would be the most consistent with our current approach to similar bits of state, such as checkpointing and state stores. For instance, if the AM detected a container was falling too far behind on a particular task instance, it could:
Write a command to the Control log: Container N: Shutdown TI-Foo.
Wait for Container N to do so and signal by writing into the topic.
Spin up new container, M, and assign to it the SSPs for TI-Foo.
In this way, the job continues without interruption. Later, it could be scaled down as well.
A few thoughts:
The control log effectively acts as an edits log/state store for the job itself and could be either read by other, ancillary processes or merged to be restored from later in a quick fashion.
The commands should be framework agnostic (hattip: Chris Riccomini ) so we are not tied to YARN by this feature.
The command vocabulary should be easily extended and deprecated as necessary.
These are all non-critical, distant requirements, but we should keep them in mind as we expand Samza in the near term.