We can spawn n number of process independent of the number of machines.
I'm confused. How does this fit with job.container.count? If I start 5 processes, and I have job.container.count=5, and I maintain a 1:1 process:container mapping, are 5 containers just never going to run?
Jay Kreps, I think the phases you've outlined sound reasonable. Thinking through what these phases would look like with multiple SamzaContainers per process.
Phase 1: this is pretty much going to work. You'll run your job with one process, and job.container.count=1 will work as expected. 1 number of threads will be started in your process. Things will just work. You won't even know about the threading or container count. As soon as you start a second process for your job, you'll have to understand what job.container.count means, and you'll have to change it (on all processes, since every process has its own config file right now).
Phase 2: Here, you'll need to understand job.container.count. You'll need to set it to something based on your max process count (or some value greater than this).
Phase 3: Go use a cluster manager. Everything continues to work the same way, except now you have to understand job.container.count.
Phases 1 and 2 sound pretty bad, from a usability perspective. I anticipate that we'd have to answer a lot of questions on the mailing list about this.
Thinking through the design of partition-shifting, I think the main sticking point here is whether Samza should have the ability to add/remove containers. Under the 1:1 process:container proposal, Samza doesn't have the ability to add or remove containers. This number is determined by how many containers are started by the user. This number changes over time, as new processes (or machines) start and stop.
In all of the other proposals, Samza can ask to start a container. This very much models how things work with YARN or Mesos. The reason that I think this is an important distinction is because it has a big effect on HA. If Samza can't start containers, then it has to shift partitions when a failure occurs if we want to be HA. Samza, from the beginning, was explicitly designed not to have to shift partitions. We relied on YARN for this.
Changing Samza to a model where it moves partitions on running containers is a very big paradigm shift. To do this right would mean re-writing large chunks of SamzaContainer, as well as samza-yarn. Honestly, if we want to go this route, then using Helix seems like a reasonable fit, since it's exactly what Helix does.
The use case you've outlined is one in which partition shifting is central to Samza's design, as it would be used by all deployments, whether on YARN, Mesos, or standalone. If we go this route, I feel like we should re-write the container to do partition shifting properly. The other proposals are all essentially ways to avoid having to do this.
Your point about making Samza more pluggable for other cluster managers (3b) is relevant. Our strategy for this was to invert the AM/JobCoordinator control, such that JobCoordinator becomes the thing that you run (not run-am.sh), and it has a ClusterManager interface, which it can use to tell the cluster manager what to do (start/stop containers). We can then plug in YARN, Mesos, or our own standalone-zk-based one. One could argue that this isn't the right approach, and it's going to be hard to write an interface that interoperates properly with more than one cluster manager (requesting hosts, defining resource requirements, etc). We've already done that with the JobRunner interface, though, and it's worked well enough for both YARN and Mesos.
The internal struggle I'm having is whether all of the partition-shifting work is worth it, just to avoid job.container.count. If we believe that the world is going to be run in cluster-managed containers in the future, an argument could be made that we should optimize for that case. We can still support standalone jobs right now, using one of the proposals above. I agree that it is confusing to have job.container.count=10, and run just a single Java process. I also agree that using N threads in this model is confusing, and that task.opts won't work. It's janky.
I'm not trying to be particularly dogmatic about the design, I'm just concerned about practicalities. I want to do the "right" thing for the main use case. I think there's some tension here between what's "right" for standalone, and what's "right" for cluster-managed. Thus far, what we've built is "right" for cluster-managed. If we support standalone as a first-class citizen, and introduce partition-shifting as a first-class feature, it complicates the operations for YARN and Mesos jobs (they now require ZK-based dependencies, have to have two layers of liveness monitoring (ZK + NM heartbeat), have two tiers of coordination (AM + JobCoordinator), etc). It also means a lot more code has to change.