Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 0.9.0
    • Fix Version/s: None
    • Component/s: container
    • Labels:

      Description

      Samza currently supports two modes of operation out of the box: local and YARN. With local mode, a single Java process starts the JobCoordinator, creates a single container, and executes it locally. All partitions are procesed within this container. With YARN, a YARN grid is required to execute the Samza job. In addition, SAMZA-375 introduces a patch to run Samza in Mesos.

      There have been several requests lately to be able to run Samza jobs without any resource manager (YARN, Mesos, etc), but still run it in a distributed fashion.

      The goal of this ticket is to design and implement a samza-standalone module, which will:

      1. Support executing a single Samza job in one or more containers.
      2. Support failover, in cases where a machine is lost.
      1. DESIGN-SAMZA-516-0.md
        10 kB
        Chris Riccomini
      2. DESIGN-SAMZA-516-0.pdf
        139 kB
        Chris Riccomini
      3. DESIGN-SAMZA-516-1.md
        15 kB
        Chris Riccomini
      4. DESIGN-SAMZA-516-1.pdf
        177 kB
        Chris Riccomini
      5. DESIGN-SAMZA-516-2.md
        28 kB
        Chris Riccomini
      6. DESIGN-SAMZA-516-2.pdf
        293 kB
        Chris Riccomini

        Issue Links

          Activity

          Hide
          criccomini Chris Riccomini added a comment -

          Linking to static partition assignment JIRA (SAMZA-41). This ticket probably subsumes that one.

          Show
          criccomini Chris Riccomini added a comment - Linking to static partition assignment JIRA ( SAMZA-41 ). This ticket probably subsumes that one.
          Hide
          criccomini Chris Riccomini added a comment -

          My current line of thinking is to write a StreamJob implementation that orchestrates jobs via ZooKeeper. The basic flow would look something like this:

          Starting a single process.

          1. Run run-job.sh on machine-1.
          2. run-job.sh uses the supplied job config to instantiate StandaloneJobFactory, and gets a StandaloneJob.
          3. StandaloneJob connects to ZK.
          4. StandaloneJob adds itself as a process in the job group ZK directory using an ephemeral node.
          5. StandaloneJob sets itself as a watcher in the job group container assignment directory.
            1. When StandaloneJob receives container assignments, it creates a new thread, and instantiates a new SamzaContainer on the thread.
            2. When StandaloneJob loses a container assignment, it interrupts the appropriate thread, which shuts down the SamzaContainer.
          6. StandaloneJob checks if there is a JobCoordinator leader elected.
          7. If there is no leader elected, StandaloneJob tries to elect itself as leader, and assigns containers to processes in the job group.
            1. Check for the number of connected processes in the job group.
            2. Use job.container.count to create a JobCoordinator with containerCount set accordingly.
            3. Assign containers equally among all processes in the job group.
            4. Set a watcher on the job group ZK directory.
          8. SamzaContainer will then query the JobCoordinator via its HTTP API to begin starting the SamzaContainer.

          Starting a second process.

          1. All of the above steps are repeated. The second run-job.sh command could be run on machine-1, or some other machine.
          2. A leader is already elected, so no leadership election happens for the JobCoordinator.
          3. When this process registers itself in ZK (step (4), above), the watcher set in step (7.4) will be notified.
          4. Watcher set in (7.4) will re-assign partitions from the first process to the second.
          5. When reassignment happens, watcher set in (5) will trigger the start of new containers in this process.

          Handling failover.

          1. The first process fails.
          2. Its ephemeral node times out of ZooKeeper.
          3. The second process is notified via ZK that the process in the job group that was the owner of the JobCoordinator has failed.
          4. The second process elects itself as JobCoordinator leader.
          5. The new JobCoordinator diffs the list of processes in the job group with the container assignments.
          6. Any containers that were assigned to the dead processes are shifted to live processes. This shift triggers the containers to start via the watcher defined in step (5.1).

          Notes

          • The StandaloneJob could use processes instead of threads to run containers. This would allow task.opts to be set on SamzaContainers, and would also keep container code fully isolated from the JobCoordinator, and StandaloneJob ZK logic.
          • The process:container:stream task:partition nomenclature is pretty cumbersome. We're also adding a new concept here: something above the container. Currently, in YARN, the YARN container to SamzaContainer mapping is 1:1. The approach described above breaks this model, and makes it 1:*. The benefit of this is that it means we can shift SamzaContainers amongst Java processes without stopping existing containers. It also means we don't have to change any SamzaContainer code--everything in there can remain immutable.
          • When we move the AM UI out of YARN and into JobCoordinator, you'll be able to use the UI in standalone mode, but the UI will jump from node to node as machines fail. This seems kind of annoying.
          • Does it make sense to have the JobCoordinator run as an independent process? When it's down, existing containers continue to run, but no new containers can be added (I think).
          • This design doesn't support multi-job queries. For example, if you wanted to run "SELECT member_id, COUNT FROM stream GROUP BY member_id", you basically can't, since this query requires running two jobs (one to repartition, and one to aggregate).
          • I haven't worked through the details on whether we could end up with orphaned processes that continue producing even after the coordinator thinks they're failed. This concern was voiced on the mailing list. If we can't safely rely on ZK to notify us when a container is ACTUALLY dead, then we could end up in split-brain scenarios, where we have multiple containers processing the same data.
          Show
          criccomini Chris Riccomini added a comment - My current line of thinking is to write a StreamJob implementation that orchestrates jobs via ZooKeeper. The basic flow would look something like this: Starting a single process. Run run-job.sh on machine-1. run-job.sh uses the supplied job config to instantiate StandaloneJobFactory, and gets a StandaloneJob. StandaloneJob connects to ZK. StandaloneJob adds itself as a process in the job group ZK directory using an ephemeral node. StandaloneJob sets itself as a watcher in the job group container assignment directory. When StandaloneJob receives container assignments, it creates a new thread, and instantiates a new SamzaContainer on the thread. When StandaloneJob loses a container assignment, it interrupts the appropriate thread, which shuts down the SamzaContainer. StandaloneJob checks if there is a JobCoordinator leader elected. If there is no leader elected, StandaloneJob tries to elect itself as leader, and assigns containers to processes in the job group . Check for the number of connected processes in the job group . Use job.container.count to create a JobCoordinator with containerCount set accordingly. Assign containers equally among all processes in the job group . Set a watcher on the job group ZK directory. SamzaContainer will then query the JobCoordinator via its HTTP API to begin starting the SamzaContainer. Starting a second process. All of the above steps are repeated. The second run-job.sh command could be run on machine-1, or some other machine. A leader is already elected, so no leadership election happens for the JobCoordinator. When this process registers itself in ZK (step (4), above), the watcher set in step (7.4) will be notified. Watcher set in (7.4) will re-assign partitions from the first process to the second. When reassignment happens, watcher set in (5) will trigger the start of new containers in this process. Handling failover. The first process fails. Its ephemeral node times out of ZooKeeper. The second process is notified via ZK that the process in the job group that was the owner of the JobCoordinator has failed. The second process elects itself as JobCoordinator leader. The new JobCoordinator diffs the list of processes in the job group with the container assignments. Any containers that were assigned to the dead processes are shifted to live processes. This shift triggers the containers to start via the watcher defined in step (5.1). Notes The StandaloneJob could use processes instead of threads to run containers. This would allow task.opts to be set on SamzaContainers, and would also keep container code fully isolated from the JobCoordinator, and StandaloneJob ZK logic. The process:container:stream task:partition nomenclature is pretty cumbersome. We're also adding a new concept here: something above the container. Currently, in YARN, the YARN container to SamzaContainer mapping is 1:1. The approach described above breaks this model, and makes it 1:*. The benefit of this is that it means we can shift SamzaContainers amongst Java processes without stopping existing containers. It also means we don't have to change any SamzaContainer code--everything in there can remain immutable. When we move the AM UI out of YARN and into JobCoordinator, you'll be able to use the UI in standalone mode, but the UI will jump from node to node as machines fail. This seems kind of annoying. Does it make sense to have the JobCoordinator run as an independent process? When it's down, existing containers continue to run, but no new containers can be added (I think). This design doesn't support multi-job queries. For example, if you wanted to run "SELECT member_id, COUNT FROM stream GROUP BY member_id", you basically can't, since this query requires running two jobs (one to repartition, and one to aggregate). I haven't worked through the details on whether we could end up with orphaned processes that continue producing even after the coordinator thinks they're failed. This concern was voiced on the mailing list. If we can't safely rely on ZK to notify us when a container is ACTUALLY dead, then we could end up in split-brain scenarios, where we have multiple containers processing the same data.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited

          Chris Riccomini, I liked the way that it isolate the logic out of SamzaContainer code. Just add some quick thoughts on the leader election and job execution model:

          1. Is the run-job.sh executed for each job on each node? Is it worth thinking of running a daemon process that registers the available daemon processes to process_group on ZK, and then each daemon process watches a jobs ZK directory for new job node? Once a new job node is added, all daemon processes can try to grab the ephemeral node for the job and the one acquires the ephemeral node immediately becomes the leader (i.e. JobCoordinator)? The leader will then pick and choose the processes that would be part of this job and do the same container assignments as in Step 1.5
            1. One benefit of doing this is that new job creation will simply be a ZK write to add the job node under jobs ZK directory. No need to run run-job.sh on each node for each new job.
            2. The other benefit is that the JobCoordinator can immediately pick up multiple registered processes for the job and do assignment once, instead of requiring re-assign when each node runs run-job.sh (as in Step 2.4)
              ##Down-side: we need to use something similar to daemontools to run daemon processes as a service, bringing in some external dependencies

          When we move the AM UI out of YARN and into JobCoordinator, you'll be able to use the UI in standalone mode, but the UI will jump from node to node as machines fail. This seems kind of annoying.

          How heavy is it to run a light-weighted web-service on each node keep tracking of the JobCoordinator location and re-direct the request?

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited Chris Riccomini , I liked the way that it isolate the logic out of SamzaContainer code. Just add some quick thoughts on the leader election and job execution model: Is the run-job.sh executed for each job on each node? Is it worth thinking of running a daemon process that registers the available daemon processes to process_group on ZK, and then each daemon process watches a jobs ZK directory for new job node? Once a new job node is added, all daemon processes can try to grab the ephemeral node for the job and the one acquires the ephemeral node immediately becomes the leader (i.e. JobCoordinator)? The leader will then pick and choose the processes that would be part of this job and do the same container assignments as in Step 1.5 One benefit of doing this is that new job creation will simply be a ZK write to add the job node under jobs ZK directory. No need to run run-job.sh on each node for each new job. The other benefit is that the JobCoordinator can immediately pick up multiple registered processes for the job and do assignment once, instead of requiring re-assign when each node runs run-job.sh (as in Step 2.4) ##Down-side: we need to use something similar to daemontools to run daemon processes as a service, bringing in some external dependencies When we move the AM UI out of YARN and into JobCoordinator, you'll be able to use the UI in standalone mode, but the UI will jump from node to node as machines fail. This seems kind of annoying. How heavy is it to run a light-weighted web-service on each node keep tracking of the JobCoordinator location and re-direct the request?
          Hide
          criccomini Chris Riccomini added a comment -

          Is the run-job.sh executed for each job on each node?

          In the idea I posted above, yes.

          Is it worth thinking of running a daemon process that registers the available daemon processes to process_group on ZK, and then each daemon process watches a jobs ZK directory for new job node?

          I think there's a trade-off here. This approach would support multiple jobs, which would be nice for things like query language (SAMZA-390). It also adds complexity for end-users (the search space on where their logs are is now a lot larger--all the machines in the grid, not just the N machines that they ran run-job.sh on), and the operator (we would need a way to shift binaries around to the appropriate machine, now).

          If you squint a bit, this proposal looks pretty similar to YARN/Mesos, but without any sophisticated scheduling. You just get full machines, and you either have machines available, or you don't. My initial inclination is that, if you want this, you should just use YARN or Mesos. I will think it through, and add it to the document, though. I currently have 3 potential solutions:

          1. Embedded YARN
          2. ZK-based approach (as described in my original post)
          3. ZK-daemon approach (as you described)

          I will try and think through them, and post the design doc.

          Show
          criccomini Chris Riccomini added a comment - Is the run-job.sh executed for each job on each node? In the idea I posted above, yes. Is it worth thinking of running a daemon process that registers the available daemon processes to process_group on ZK, and then each daemon process watches a jobs ZK directory for new job node? I think there's a trade-off here. This approach would support multiple jobs, which would be nice for things like query language ( SAMZA-390 ). It also adds complexity for end-users (the search space on where their logs are is now a lot larger--all the machines in the grid, not just the N machines that they ran run-job.sh on), and the operator (we would need a way to shift binaries around to the appropriate machine, now). If you squint a bit, this proposal looks pretty similar to YARN/Mesos, but without any sophisticated scheduling. You just get full machines, and you either have machines available, or you don't. My initial inclination is that, if you want this, you should just use YARN or Mesos. I will think it through, and add it to the document, though. I currently have 3 potential solutions: Embedded YARN ZK-based approach (as described in my original post) ZK-daemon approach (as you described) I will try and think through them, and post the design doc.
          Hide
          jkreps Jay Kreps added a comment -

          This is great.

          One gotcha I wasn't sure if this covered is attempting to provide mutual exclusion for a partition. If a new process is started it needs to get an assignment. It is important that whomever currently has the assignment has ceased consuming it's prior assignment, check pointed offsets, and flushed all results prior to the new process taking over that partition.

          Some random thoughts:
          0. I really really think that a simple main method stub that runs the job is what you want. Don't try to make a daemon that runs multiple jobs--that is what mesos/yarn are for.
          1. Not sure why you need any new layers here? I see this as just a wrapper around the container that handles partition changes. You could still think of this as "the container".
          2. I suspect separating the coordinator from the job as separate processes will isolate the coordinator from job GC issues but it will also add a lot of complexity--e.g. now one can fail but not the other. For stream processing I suspect long default zk session timeouts would solve the problem just as well and be much simpler.
          3. Why would you ever have more than one job for a sql query? Even if it repartitions many times can't you do that all in one job that just has a big switch statement over the inputs? Operationally I think one job per query would make things much easier. Separating into multiple jobs "isolates" them, but you can't really isolate a query from itself. Does this introduce scheduling issues or something?

          Show
          jkreps Jay Kreps added a comment - This is great. One gotcha I wasn't sure if this covered is attempting to provide mutual exclusion for a partition. If a new process is started it needs to get an assignment. It is important that whomever currently has the assignment has ceased consuming it's prior assignment, check pointed offsets, and flushed all results prior to the new process taking over that partition. Some random thoughts: 0. I really really think that a simple main method stub that runs the job is what you want. Don't try to make a daemon that runs multiple jobs--that is what mesos/yarn are for. 1. Not sure why you need any new layers here? I see this as just a wrapper around the container that handles partition changes. You could still think of this as "the container". 2. I suspect separating the coordinator from the job as separate processes will isolate the coordinator from job GC issues but it will also add a lot of complexity--e.g. now one can fail but not the other. For stream processing I suspect long default zk session timeouts would solve the problem just as well and be much simpler. 3. Why would you ever have more than one job for a sql query? Even if it repartitions many times can't you do that all in one job that just has a big switch statement over the inputs? Operationally I think one job per query would make things much easier. Separating into multiple jobs "isolates" them, but you can't really isolate a query from itself. Does this introduce scheduling issues or something?
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching half-completed draft design for those that want to review. Will continue working on design document tomorrow.

          Show
          criccomini Chris Riccomini added a comment - Attaching half-completed draft design for those that want to review. Will continue working on design document tomorrow.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Jay Kreps, just trying to get more clear on the comments regarding to # of jobs / query.

          Even if it repartitions many times can't you do that all in one job that just has a big switch statement over the inputs?

          If I have a query that involves re-partition -> group-by on the re-partitioned key, the task for the first re-partition job is taking a topic1 partitioned by keyA, from a certain partition. After re-partition based on keyB, it is sent to another topic2. The second group-by job then can take the second topic2's partition performing the calculation with the assurance that all messages with keyB="value1" are in the same partition. If I am going to use a single job to implement that, each task then needs to take both topics as input, and based on the input topic, perform different part of the query logic. Is this the "big switch statement" that you referred to? It is an interesting concept, I can see both good and bad things here:

          1. Pros:
            1. It is much easier now if there is some state that need to be shared within the whole query
            2. It is also easier to identify the most backlogged topics through the whole query since the task would have subscribed to all topics involved in all stages of query
          2. Cons:
            1. It would not be possible to bounce and move the "group-by" process w/o impacting the "re-partition" process
            2. If different stages in the query processing requires different number of containers, it won't be easy to adjust the # of containers used for "group-by" individually
            3. It can also cause memory issue when a complex query w/ multiple sub-queries have to be condensed in the same task

          Since we can not restrict what kind of query states the user will generate, I would rather plan for the case where I can isolate the query tasks. For simple queries, most likely the query planner can put all operators in a single job.

          Just one reminder: a 4th solution Chris Riccomini and I discussed is not to run the daemon process, rather run the JobCoordinator manually from a separate box to write the container assignment once to ZK, and start containers on all assigned boxes. Deploying more jobs may simply mean run the JobCoordinator for each job once.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Jay Kreps , just trying to get more clear on the comments regarding to # of jobs / query. Even if it repartitions many times can't you do that all in one job that just has a big switch statement over the inputs? If I have a query that involves re-partition -> group-by on the re-partitioned key, the task for the first re-partition job is taking a topic1 partitioned by keyA, from a certain partition. After re-partition based on keyB, it is sent to another topic2. The second group-by job then can take the second topic2's partition performing the calculation with the assurance that all messages with keyB="value1" are in the same partition. If I am going to use a single job to implement that, each task then needs to take both topics as input, and based on the input topic, perform different part of the query logic. Is this the "big switch statement" that you referred to? It is an interesting concept, I can see both good and bad things here: Pros: It is much easier now if there is some state that need to be shared within the whole query It is also easier to identify the most backlogged topics through the whole query since the task would have subscribed to all topics involved in all stages of query Cons: It would not be possible to bounce and move the "group-by" process w/o impacting the "re-partition" process If different stages in the query processing requires different number of containers, it won't be easy to adjust the # of containers used for "group-by" individually It can also cause memory issue when a complex query w/ multiple sub-queries have to be condensed in the same task Since we can not restrict what kind of query states the user will generate, I would rather plan for the case where I can isolate the query tasks. For simple queries, most likely the query planner can put all operators in a single job. Just one reminder: a 4th solution Chris Riccomini and I discussed is not to run the daemon process, rather run the JobCoordinator manually from a separate box to write the container assignment once to ZK, and start containers on all assigned boxes. Deploying more jobs may simply mean run the JobCoordinator for each job once.
          Hide
          jkreps Jay Kreps added a comment -

          Hey Yi Pan yes exactly.

          Let me try to argue that the cons are not so bad!

          1. It's true that you can't bounce the individual parts of the query. But my argument is that the user only cares about the final output and the final output will only be produced if all subcomponents are active, if job A feeds job B then bouncing A will effectively stop B since there will be no input; and bouncing B will effectively stop A since it's output will just queue up and no final results will be produced. I think you can actually flip this argument around--if you want to restart or redeploy you job, which will be common, it will be much simpler if there is just one process to bounce and without having to think about dependencies (i.e. what happens if I change the query and restart first A then B, then there will be a period of time where the old B is getting input from the new A...).

          2. Ah but let's me argue that this is an advantage! The problem in separately sizing A and B is that the users never do a good job of sizing so either A or B will be a bottleneck. If we give them both a shared pool of containers then whether the A part or the B part is more active will not matter. To make this stronger and more general I claim that if you have N containers to use giving them all to the combined job A+B will always be higher throughput than splitting them up and giving some to A and some to B no matter how accurately you estimate the split (i.e. perfect estimation would be equal, but that is unlikely).

          3. This is true. However I think the same argument as in (2) applies. If you segregate A and B the total memory doesn't change but now you create a bottleneck from your division unless you divide perfectly.

          See if you come to the same conclusions I did. It is actually quite subtle. I thought about it and I think the trade-off is that if you isolate parts of a single computation (like a query) into separate processes the following things are true: (1) you always have less resources for the bottleneck component unless you perfectly estimate, in the case of perfect estimation you have the same resources, but (2) you protect the one part from the other. So separating makes sense if you are writing job A and I am writing job B and they both have different purposes, then separating protects us, at the cost of perhaps slightly worse utilization; but if A and B are both part of the same logical query then you can't really protect them from each other and we just reduce utilization.

          Show
          jkreps Jay Kreps added a comment - Hey Yi Pan yes exactly. Let me try to argue that the cons are not so bad! 1. It's true that you can't bounce the individual parts of the query. But my argument is that the user only cares about the final output and the final output will only be produced if all subcomponents are active, if job A feeds job B then bouncing A will effectively stop B since there will be no input; and bouncing B will effectively stop A since it's output will just queue up and no final results will be produced. I think you can actually flip this argument around--if you want to restart or redeploy you job, which will be common, it will be much simpler if there is just one process to bounce and without having to think about dependencies (i.e. what happens if I change the query and restart first A then B, then there will be a period of time where the old B is getting input from the new A...). 2. Ah but let's me argue that this is an advantage! The problem in separately sizing A and B is that the users never do a good job of sizing so either A or B will be a bottleneck. If we give them both a shared pool of containers then whether the A part or the B part is more active will not matter. To make this stronger and more general I claim that if you have N containers to use giving them all to the combined job A+B will always be higher throughput than splitting them up and giving some to A and some to B no matter how accurately you estimate the split (i.e. perfect estimation would be equal, but that is unlikely). 3. This is true. However I think the same argument as in (2) applies. If you segregate A and B the total memory doesn't change but now you create a bottleneck from your division unless you divide perfectly. See if you come to the same conclusions I did. It is actually quite subtle. I thought about it and I think the trade-off is that if you isolate parts of a single computation (like a query) into separate processes the following things are true: (1) you always have less resources for the bottleneck component unless you perfectly estimate, in the case of perfect estimation you have the same resources, but (2) you protect the one part from the other. So separating makes sense if you are writing job A and I am writing job B and they both have different purposes, then separating protects us, at the cost of perhaps slightly worse utilization; but if A and B are both part of the same logical query then you can't really protect them from each other and we just reduce utilization.
          Hide
          jkreps Jay Kreps added a comment -

          Hey Chris, to elaborate on my earlier comment (i.e. #1) now that I see the design document: why try to run multiple containers per process? I agree with your commentary that the idea of a container which holds multiple tasks is already tricky and adding another layer is going to be too much. Presumably the goal of this is to get more parallelism? But you can also get this by just running N processes. There is some overhead to this: can't share Kafka clients, can't share basic JVM memory usage, etc. But I think these are pretty minor overheads...

          Show
          jkreps Jay Kreps added a comment - Hey Chris, to elaborate on my earlier comment (i.e. #1) now that I see the design document: why try to run multiple containers per process? I agree with your commentary that the idea of a container which holds multiple tasks is already tricky and adding another layer is going to be too much. Presumably the goal of this is to get more parallelism? But you can also get this by just running N processes. There is some overhead to this: can't share Kafka clients, can't share basic JVM memory usage, etc. But I think these are pretty minor overheads...
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Jay Kreps, thanks for the answers! It is indeed a subtle tradeoff. Your argument on the bottleneck identification and management really makes sense. My worry is that we may not be able to completely remove the need of separate jobs for a query. So, I still want to throw out some of my concerns/questions here to discuss:

          1. bouncing the containers

            what happens if I change the query and restart first A then B, then there will be a period of time where the old B is getting input from the new A...

            Even with a single job layout, we probably can not completely remove the above possibility. Say there are 4 containers each running a single task, with the re-partition + group-by, container.1 may take outputs from container.2. When we bounce container.2 with the new query, container.1 with the old query may start receiving outputs from the new container.1.

          2. input partitions are not big enough / auto-scaling: what if the first stage of process (re-partition) takes topic1 with total of 2 partitions, while the second stage of process (group-by) need to take topic2 with total of 15 partitions? I guess that we will have to insert a re-partition job to make sure topic1' is also divided into 15 partitions. That would also mean that if auto-scaling is used, instead of changing the partitions just for an intermediate job, we would need to change the partitions for all topics in the pipeline. The other approach is to allow non-consistent partition assignments to containers in the job, which makes it difficult to track and loss the beauty of this single job solution: simple and homogeneous tasks.

          I will think about it further.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Jay Kreps , thanks for the answers! It is indeed a subtle tradeoff. Your argument on the bottleneck identification and management really makes sense. My worry is that we may not be able to completely remove the need of separate jobs for a query. So, I still want to throw out some of my concerns/questions here to discuss: bouncing the containers what happens if I change the query and restart first A then B, then there will be a period of time where the old B is getting input from the new A... Even with a single job layout, we probably can not completely remove the above possibility. Say there are 4 containers each running a single task, with the re-partition + group-by, container.1 may take outputs from container.2. When we bounce container.2 with the new query, container.1 with the old query may start receiving outputs from the new container.1. input partitions are not big enough / auto-scaling: what if the first stage of process (re-partition) takes topic1 with total of 2 partitions, while the second stage of process (group-by) need to take topic2 with total of 15 partitions? I guess that we will have to insert a re-partition job to make sure topic1' is also divided into 15 partitions. That would also mean that if auto-scaling is used, instead of changing the partitions just for an intermediate job, we would need to change the partitions for all topics in the pipeline. The other approach is to allow non-consistent partition assignments to containers in the job, which makes it difficult to track and loss the beauty of this single job solution: simple and homogeneous tasks. I will think about it further.
          Hide
          jkreps Jay Kreps added a comment -

          Yeah I agree with both comments.
          1. This is actually a problem no matter what. The case you describe you could probably solve by just not doing a rolling bounce. But actually however you do this you will have buffered output in Kafka, so I guess the conclusion is just that you have to think about compatibility.
          2. I think the simple case is solved by the pluggable partitioning strategies, right? But I think it is possible to construct a case where you have a very complex query some parts of which need co-partitioning and some which don't...

          Show
          jkreps Jay Kreps added a comment - Yeah I agree with both comments. 1. This is actually a problem no matter what. The case you describe you could probably solve by just not doing a rolling bounce. But actually however you do this you will have buffered output in Kafka, so I guess the conclusion is just that you have to think about compatibility. 2. I think the simple case is solved by the pluggable partitioning strategies, right? But I think it is possible to construct a case where you have a very complex query some parts of which need co-partitioning and some which don't...
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Jay Kreps, some further thoughts:

          2. I think the simple case is solved by the pluggable partitioning strategies, right? But I think it is possible to construct a case where you have a very complex query some parts of which need co-partitioning and some which don't...

          Exactly! I was thinking that if the partition assignment logic is not fixed in configuration, and can somehow be dynamically configurable via ConfigStream or some state put on ZK by the JobCoordinator, the non-consistent partition-to-container assignment may not be the issue, since the code that interpret the configuration would be the same across all tasks.
          My feeling is that we probably can get the model of one-job-per-query for free at the first stage. But I would prefer to keep the possibility open to extend the model to m-jobs-per-query later when we see the complex query case. With a pluggable partitioning strategy, it would satisfy the potential extension. Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Jay Kreps , some further thoughts: 2. I think the simple case is solved by the pluggable partitioning strategies, right? But I think it is possible to construct a case where you have a very complex query some parts of which need co-partitioning and some which don't... Exactly! I was thinking that if the partition assignment logic is not fixed in configuration, and can somehow be dynamically configurable via ConfigStream or some state put on ZK by the JobCoordinator, the non-consistent partition-to-container assignment may not be the issue, since the code that interpret the configuration would be the same across all tasks. My feeling is that we probably can get the model of one-job-per-query for free at the first stage. But I would prefer to keep the possibility open to extend the model to m-jobs-per-query later when we see the complex query case. With a pluggable partitioning strategy, it would satisfy the potential extension. Thanks!
          Hide
          criccomini Chris Riccomini added a comment -

          One gotcha I wasn't sure if this covered is attempting to provide mutual exclusion for a partition.

          Yea, it's not quite clear to me whether ZK ephemeral nodes are sufficient for this. This kind of boils down the the Kafka producer split-brain problem. We could make StandaloneJob kill all containers when it loses connection to ZK, and wait until it can re-establish a connection. Do you think that's sufficient?

          0. I really really think that a simple main method stub that runs the job is what you want. Don't try to make a daemon that runs multiple jobs--that is what mesos/yarn are for.

          I haven't fully though it through (still TODO for design doc), but my instinct is the same as yours.

          Not sure why you need any new layers here? I see this as just a wrapper around the container that handles partition changes. You could still think of this as "the container".

          This is really about StreamTask shifting when a failure occurs, and how SamzaContainer works. SamzaContainers are immutable once they start. If we maintain a 1:1 relationship between process and container, as we have now, then when partitions need to be shifted, we will have to tear down the whole container, add the partitions, and start the container again. If we have a 1:N mapping, then shifting StreamTasks can be done at the container level (we can move an entire container). This means existing containers in the process can continue running without interruption.

          That said, it seems to me that the main cost of restarting a container is restoring state. If the SamzaContainer is restarted on the same machine, it seems that we could skip the state-restoration step, and simply restore state for StreamTasks that weren't previously on the machine. I'd have to think through how the container would know which tasks to restore, and which not to, but if it supported this, we could maintain a 1:1 process:container mapping.

          I suspect separating the coordinator from the job as separate processes will isolate the coordinator from job GC issues but it will also add a lot of complexity--e.g. now one can fail but not the other. For stream processing I suspect long default zk session timeouts would solve the problem just as well and be much simpler.

          There are a few irritations that come from having the coordinator in a random StandaloneJob.

          1. GCs in the job can cause ZK timeouts.
          2. The JobCoordinator UI will bounce from machine to machine as the coordinator moves around.
          3. Adds complexity to the StandaloneJob (there is a second phase of election that has to take place).

          One idea for (2) would be to have every StandaloneJob start an HTTP server. The server's job would be to redirect to the currently active job coordinator's HTTP server. This server could also be used to serve local logs, so that the JobCoordinator's UI could link to logs on all machines, just like the YARN AM UI does today.

          I'm not convinced that it adds a lot of complexity. There is a lot of complexity in running a coordinator inside a shared JVM, properly electing it, and properly failing it over, as well. Trading all of that for simply knowing "when my coordinator is dead, I need to get it back up right away" seems pretty appealing.

          One thought here would be to stage the implementation. We could start with an isolated non-HA coordinator, commit that, then add support for HA coordinator.

          Why would you ever have more than one job for a sql query?

          I'm going to punt on this. We won't support multi-job standalone execution (aside from running two Samza jobs entirely separately). We can discuss the pros/cons of mutli-job SQL queries in SAMZA-390, and whatnot. From a standalone perspective, either 1) SQL queries will run in a single Samza job or 2) you'll have to use a real resource manager (YARN, Mesos, etc) with queries.

          Show
          criccomini Chris Riccomini added a comment - One gotcha I wasn't sure if this covered is attempting to provide mutual exclusion for a partition. Yea, it's not quite clear to me whether ZK ephemeral nodes are sufficient for this. This kind of boils down the the Kafka producer split-brain problem. We could make StandaloneJob kill all containers when it loses connection to ZK, and wait until it can re-establish a connection. Do you think that's sufficient? 0. I really really think that a simple main method stub that runs the job is what you want. Don't try to make a daemon that runs multiple jobs--that is what mesos/yarn are for. I haven't fully though it through (still TODO for design doc), but my instinct is the same as yours. Not sure why you need any new layers here? I see this as just a wrapper around the container that handles partition changes. You could still think of this as "the container". This is really about StreamTask shifting when a failure occurs, and how SamzaContainer works. SamzaContainers are immutable once they start. If we maintain a 1:1 relationship between process and container, as we have now, then when partitions need to be shifted, we will have to tear down the whole container, add the partitions, and start the container again. If we have a 1:N mapping, then shifting StreamTasks can be done at the container level (we can move an entire container). This means existing containers in the process can continue running without interruption. That said, it seems to me that the main cost of restarting a container is restoring state. If the SamzaContainer is restarted on the same machine, it seems that we could skip the state-restoration step, and simply restore state for StreamTasks that weren't previously on the machine. I'd have to think through how the container would know which tasks to restore, and which not to, but if it supported this, we could maintain a 1:1 process:container mapping. I suspect separating the coordinator from the job as separate processes will isolate the coordinator from job GC issues but it will also add a lot of complexity--e.g. now one can fail but not the other. For stream processing I suspect long default zk session timeouts would solve the problem just as well and be much simpler. There are a few irritations that come from having the coordinator in a random StandaloneJob. GCs in the job can cause ZK timeouts. The JobCoordinator UI will bounce from machine to machine as the coordinator moves around. Adds complexity to the StandaloneJob (there is a second phase of election that has to take place). One idea for (2) would be to have every StandaloneJob start an HTTP server. The server's job would be to redirect to the currently active job coordinator's HTTP server. This server could also be used to serve local logs, so that the JobCoordinator's UI could link to logs on all machines, just like the YARN AM UI does today. I'm not convinced that it adds a lot of complexity. There is a lot of complexity in running a coordinator inside a shared JVM, properly electing it, and properly failing it over, as well. Trading all of that for simply knowing "when my coordinator is dead, I need to get it back up right away" seems pretty appealing. One thought here would be to stage the implementation. We could start with an isolated non-HA coordinator, commit that, then add support for HA coordinator. Why would you ever have more than one job for a sql query? I'm going to punt on this. We won't support multi-job standalone execution (aside from running two Samza jobs entirely separately). We can discuss the pros/cons of mutli-job SQL queries in SAMZA-390 , and whatnot. From a standalone perspective, either 1) SQL queries will run in a single Samza job or 2) you'll have to use a real resource manager (YARN, Mesos, etc) with queries.
          Hide
          criccomini Chris Riccomini added a comment -

          One more comment about having a 1:N process:container mapping. I think this design might actually be quite good. With SAMZA-348, we changed yarn.container.count to job.container.count. SAMAZ-375 will pick up and use this for Mesos containers. The point of job.container.count is to specify the parallelism of the job. In samza-standalone, if you want a parallelism of 200 distinct processors (SamzaContainers), you'd say job.container.count=200. You get this level of parallelism whether you run the processors on 1 machine or 100. To me, as a developer, that seems pretty intuitive. It's just like asking for threads in a thread pool.

          It seems like the confusing here is that we're using the word "container". A more appropriate config might be "job.parallelism". In YARN/Mesos, the parallelism:process mapping is 1:1 (one Java process runs one SamzaContainer). In standalone, it'd be 1:N (one Java process runs multiple SamzaContainers).

          Show
          criccomini Chris Riccomini added a comment - One more comment about having a 1:N process:container mapping. I think this design might actually be quite good. With SAMZA-348 , we changed yarn.container.count to job.container.count . SAMAZ-375 will pick up and use this for Mesos containers. The point of job.container.count is to specify the parallelism of the job. In samza-standalone, if you want a parallelism of 200 distinct processors (SamzaContainers), you'd say job.container.count=200 . You get this level of parallelism whether you run the processors on 1 machine or 100. To me, as a developer, that seems pretty intuitive. It's just like asking for threads in a thread pool. It seems like the confusing here is that we're using the word "container". A more appropriate config might be "job.parallelism". In YARN/Mesos, the parallelism:process mapping is 1:1 (one Java process runs one SamzaContainer). In standalone, it'd be 1:N (one Java process runs multiple SamzaContainers).
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching updated design docs. Run a diff on the .MD to see changes.

          • Updated job parallelism/job hierarchy section based on Jay Kreps's comments.
          • Finished independent JobCoordinator section.
          • Section on custom deployment daemon.
          • Some other minor updates.

          I will circulate for feedback before filling out the proposed solution section.

          Show
          criccomini Chris Riccomini added a comment - Attaching updated design docs. Run a diff on the .MD to see changes. Updated job parallelism/job hierarchy section based on Jay Kreps 's comments. Finished independent JobCoordinator section. Section on custom deployment daemon. Some other minor updates. I will circulate for feedback before filling out the proposed solution section.
          Hide
          metacret Jae Hyeon Bae added a comment -

          What will happen when ZK is down? I am a little bit risk averse to distributed coordination based on ZK because I have seen a lot ZK trouble brought down the cluster. For example, Druid is using Apache Curator with PathChildrenCache recipe and it handles ZK connection state change and re-instate all children nodes to recover its states before ZK interruption, quite painful. Even though heavy effort to make Druid resilient to ZK, it's still not fully stable yet, which might be due to Curator bug. So, we're running our own dedicated ZK but eventually, ZK would be down. So, if we can predict the situation, we can prepare for it.

          Show
          metacret Jae Hyeon Bae added a comment - What will happen when ZK is down? I am a little bit risk averse to distributed coordination based on ZK because I have seen a lot ZK trouble brought down the cluster. For example, Druid is using Apache Curator with PathChildrenCache recipe and it handles ZK connection state change and re-instate all children nodes to recover its states before ZK interruption, quite painful. Even though heavy effort to make Druid resilient to ZK, it's still not fully stable yet, which might be due to Curator bug. So, we're running our own dedicated ZK but eventually, ZK would be down. So, if we can predict the situation, we can prepare for it.
          Hide
          criccomini Chris Riccomini added a comment -

          Jae Hyeon Bae, in the current proposal, if ZK is down, then the containers would all kill themselves. The problem from the container's perspective is it can't know whether ZK is down, or whether there's a network partition, and it could hang around. So the only solution seems to be to wait for some period of time, and then kill itself (just like YARN behaves). I should note, if ZK is down, your Kafka cluster is going to be in extremely bad shape as well.

          When discussing with Jay Kreps, one of the things we're planning on doing is having a fairly high (~1 minute) ZK timeout, which should absorb almost all GC-based timeouts within a container. This means that we should only see a timeout that is a "real" timeout (ZK is dead, or there's a network partition). This also implies that fail-over will take at least a minute, in a case where a machine dies.

          Show
          criccomini Chris Riccomini added a comment - Jae Hyeon Bae , in the current proposal, if ZK is down, then the containers would all kill themselves. The problem from the container's perspective is it can't know whether ZK is down, or whether there's a network partition, and it could hang around. So the only solution seems to be to wait for some period of time, and then kill itself (just like YARN behaves). I should note, if ZK is down, your Kafka cluster is going to be in extremely bad shape as well. When discussing with Jay Kreps , one of the things we're planning on doing is having a fairly high (~1 minute) ZK timeout, which should absorb almost all GC-based timeouts within a container. This means that we should only see a timeout that is a "real" timeout (ZK is dead, or there's a network partition). This also implies that fail-over will take at least a minute, in a case where a machine dies.
          Hide
          metacret Jae Hyeon Bae added a comment - - edited

          Chris Riccomini ZK will eventually shutdown if its transaction id 4 bytes integer. Even though it will re-elect master in a very short time, I saw Druid killed all its indexing jobs on this incident, this might be a bug on Curator I am not sure whether you will use it. This situation will rarely happen but can we do better with minimum ZK dependency such that state management will be done by the gossip protocol with other persistent storage such as DB or file? ZK dependency is needed only for leader election among multiple masters. AFAIK, kafka broker and producer wouldn't be impacted with anything on ZK outage and just additional coordination won't be possible. I hope Samza standalone will work like this.

          Show
          metacret Jae Hyeon Bae added a comment - - edited Chris Riccomini ZK will eventually shutdown if its transaction id 4 bytes integer. Even though it will re-elect master in a very short time, I saw Druid killed all its indexing jobs on this incident, this might be a bug on Curator I am not sure whether you will use it. This situation will rarely happen but can we do better with minimum ZK dependency such that state management will be done by the gossip protocol with other persistent storage such as DB or file? ZK dependency is needed only for leader election among multiple masters. AFAIK, kafka broker and producer wouldn't be impacted with anything on ZK outage and just additional coordination won't be possible. I hope Samza standalone will work like this.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Jae Hyeon Bae, I generally agree that we should not use ZK as a meta storage. It should be used mainly as a distributed coordination service for leader election, distributed synchronization/consensus protocols. However, if the shared state is used as part of the consensus / coordination protocol, it should be saved to ZK. Introducing another gossip protocol trying to sharing the state to gain a consensus in a distributed system is going to face the same problems that ZK solves. Using DB or file introduces single point failure problem if there is only one copy. If there are multiple copies, again, you are in the zone to solve the distributed consensus problem w/ a customized protocol, which is as hard as building another ZK solution.

          In summary, I would agree to reduce the amount of metadata stored on ZK, but not to implement another distributed consensus protocol for sharing the state.

          Just my 2-cents.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Jae Hyeon Bae , I generally agree that we should not use ZK as a meta storage. It should be used mainly as a distributed coordination service for leader election, distributed synchronization/consensus protocols. However, if the shared state is used as part of the consensus / coordination protocol, it should be saved to ZK. Introducing another gossip protocol trying to sharing the state to gain a consensus in a distributed system is going to face the same problems that ZK solves. Using DB or file introduces single point failure problem if there is only one copy. If there are multiple copies, again, you are in the zone to solve the distributed consensus problem w/ a customized protocol, which is as hard as building another ZK solution. In summary, I would agree to reduce the amount of metadata stored on ZK, but not to implement another distributed consensus protocol for sharing the state. Just my 2-cents.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Jae Hyeon Bae, I am also curious about the ZK behavior you described:

          ZK will eventually shutdown if its transaction id 4 bytes integer.

          I assume that you were referring to https://issues.apache.org/jira/browse/ZOOKEEPER-1277? This has been fixed in ZK 3.3.5, 3.4.4, 3.5.0.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Jae Hyeon Bae , I am also curious about the ZK behavior you described: ZK will eventually shutdown if its transaction id 4 bytes integer. I assume that you were referring to https://issues.apache.org/jira/browse/ZOOKEEPER-1277? This has been fixed in ZK 3.3.5, 3.4.4, 3.5.0.
          Hide
          metacret Jae Hyeon Bae added a comment -

          Yi Pan (Data Infrastructure) I can understand your concern. Using reliable DB such as C* or AWS RDS can be good. Killing all jobs when it loses ZK sessions sounds we need to improve a little more. If the main purpose of Samza standalone is for easy deployment with relatively small scale, ZK dependency is good. But I hope Samza standalone will remove any external dependency of cluster management with hundreds of machines and thousands of containers.

          Show
          metacret Jae Hyeon Bae added a comment - Yi Pan (Data Infrastructure) I can understand your concern. Using reliable DB such as C* or AWS RDS can be good. Killing all jobs when it loses ZK sessions sounds we need to improve a little more. If the main purpose of Samza standalone is for easy deployment with relatively small scale, ZK dependency is good. But I hope Samza standalone will remove any external dependency of cluster management with hundreds of machines and thousands of containers.
          Hide
          metacret Jae Hyeon Bae added a comment -

          I know but I saw Curator lost its session for more than 15 minutes on re-election leader. The root cause was many clients tried to reconnect to ZK servers on re-election leaders and ZK quorum was not able to joined fast due to network thundering herd. So, if the ZK is shared among many clients, it can happen ZK quorum is joined very slowly on ZK transaction id rolls over. If you haven't seen this kind of problem, two things, either I was a stupid ZK SRE or LinkedIn ZK SRE is excellent.

          Show
          metacret Jae Hyeon Bae added a comment - I know but I saw Curator lost its session for more than 15 minutes on re-election leader. The root cause was many clients tried to reconnect to ZK servers on re-election leaders and ZK quorum was not able to joined fast due to network thundering herd. So, if the ZK is shared among many clients, it can happen ZK quorum is joined very slowly on ZK transaction id rolls over. If you haven't seen this kind of problem, two things, either I was a stupid ZK SRE or LinkedIn ZK SRE is excellent.
          Hide
          metacret Jae Hyeon Bae added a comment -

          So, we could not bear this situation anymore, we decided to have our dedicated ZK cluster. Now, we're so happy with this and I have less concern about ZK connection problem. But its potential risk still exists.

          Show
          metacret Jae Hyeon Bae added a comment - So, we could not bear this situation anymore, we decided to have our dedicated ZK cluster. Now, we're so happy with this and I have less concern about ZK connection problem. But its potential risk still exists.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Jae Hyeon Bae, I guess that I would have to fully understand your concern first. I apologize for any earlier comments if they were based on wrong assumptions.
          Here are some of my questions related to your concern:

          1. From your point of view, what's the ideal behavior of the Samza standalone service if the distributed coordination service (be it ZK or something else) is down? Should all the containers continue running or shutting themselves down?
          2. In your targeted deployment, what's the total number of machines and containers in mind? And how frequent are the machines and containers coming up and down?

          Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Jae Hyeon Bae , I guess that I would have to fully understand your concern first. I apologize for any earlier comments if they were based on wrong assumptions. Here are some of my questions related to your concern: From your point of view, what's the ideal behavior of the Samza standalone service if the distributed coordination service (be it ZK or something else) is down? Should all the containers continue running or shutting themselves down? In your targeted deployment, what's the total number of machines and containers in mind? And how frequent are the machines and containers coming up and down? Thanks!
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Ah. I see. Yes, I agree if there are many clients coming simultaneously while ZK restarts, there is a problem. I guess that the ZK client should have some exponential back off logic build in on connection rejection from the server. The other thing to note is to also avoid "herd" effect in leader election node as stated here: http://zookeeper.apache.org/doc/trunk/recipes.html#sc_leaderElection.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Ah. I see. Yes, I agree if there are many clients coming simultaneously while ZK restarts, there is a problem. I guess that the ZK client should have some exponential back off logic build in on connection rejection from the server. The other thing to note is to also avoid "herd" effect in leader election node as stated here: http://zookeeper.apache.org/doc/trunk/recipes.html#sc_leaderElection .
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          One question I had with the DB solution is: how do you notify the containers if there is a change in the assignments? This is one of the fundamental difference between a simple metadata store vs a ZK service: a normal storage service does not give real-time notification on the state changes in the metadata store to the clients, which is needed in distributed coordination.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - One question I had with the DB solution is: how do you notify the containers if there is a change in the assignments? This is one of the fundamental difference between a simple metadata store vs a ZK service: a normal storage service does not give real-time notification on the state changes in the metadata store to the clients, which is needed in distributed coordination.
          Hide
          closeuris Yan Fang added a comment -

          The JobCoordinator UI will bounce from machine to machine as the coordinator moves around.

          Another approach is to provide a persistent standalone UI server. It talks to Zookeeper (assume we use this as the distributed coordination service) to get the leader. It also provides other benefits, such as manage different Samza jobs at one place, review historical records, etc.

          Show
          closeuris Yan Fang added a comment - The JobCoordinator UI will bounce from machine to machine as the coordinator moves around. Another approach is to provide a persistent standalone UI server. It talks to Zookeeper (assume we use this as the distributed coordination service) to get the leader. It also provides other benefits, such as manage different Samza jobs at one place, review historical records, etc.
          Hide
          metacret Jae Hyeon Bae added a comment -

          1. When distribution coordination is down, jobs should be still running. When coordination service comes back, everything should be reinstated just like before coordination was down.

          2. We have more than a few millions messages per second on the average. Depending on Samza throughput(not just Samza framework itself, all over stream processing job throughput) and AWS EC2 instance type we will use as Samza container, we might need more than two hundred machines. If we run eight containers in each machines, it will be be less than two thousands containers. We're not planning to autoscaling right now, so machines will not come and go frequently.

          Show
          metacret Jae Hyeon Bae added a comment - 1. When distribution coordination is down, jobs should be still running. When coordination service comes back, everything should be reinstated just like before coordination was down. 2. We have more than a few millions messages per second on the average. Depending on Samza throughput(not just Samza framework itself, all over stream processing job throughput) and AWS EC2 instance type we will use as Samza container, we might need more than two hundred machines. If we run eight containers in each machines, it will be be less than two thousands containers. We're not planning to autoscaling right now, so machines will not come and go frequently.
          Hide
          metacret Jae Hyeon Bae added a comment -

          For example of ElasticSearch's master node

          New worker node should register itself to DB and waits for job assignments.
          Master polls DB on a regular basis to refresh its worker node list.
          Master is heartbeating with each worker node to check whether it's live or not.
          To assign the job to the worker node, master send the request to the worker node. Master should write its assignment information to DB.
          When the worker dies, master will be notified of it when the heartbeat times out a few times in a row. Then, master should re-attempt jobs running on dead machine. Master should delete the worker node information.

          Looks like similar to Yarn

          Show
          metacret Jae Hyeon Bae added a comment - For example of ElasticSearch's master node New worker node should register itself to DB and waits for job assignments. Master polls DB on a regular basis to refresh its worker node list. Master is heartbeating with each worker node to check whether it's live or not. To assign the job to the worker node, master send the request to the worker node. Master should write its assignment information to DB. When the worker dies, master will be notified of it when the heartbeat times out a few times in a row. Then, master should re-attempt jobs running on dead machine. Master should delete the worker node information. Looks like similar to Yarn
          Hide
          criccomini Chris Riccomini added a comment -

          AFAIK, kafka broker and producer wouldn't be impacted with anything on ZK outage and just additional coordination won't be possible. I hope Samza standalone will work like this.

          Yes, I agree. I think this has to be a goal. Will update docs to make explicit.

          Another approach is to provide a persistent standalone UI server. It talks to Zookeeper (assume we use this as the distributed coordination service) to get the leader. It also provides other benefits, such as manage different Samza jobs at one place, review historical records, etc.

          This is a great idea. I'll update the design doc with it. I think we'll do this.

          When distribution coordination is down, jobs should be still running. When coordination service comes back, everything should be reinstated just like before coordination was down.

          Idea: what if we have the containers pause, rather than die, when a ZK connection is lost. That way, processing will stop, and there will be no split brain, but the jobs will not all kill themselves when ZK is lost. When the containers connect back to ZK, they will relinquish their old partitions, and check for new assignments. To me, this seems like the best of both worlds: the jobs don't die (they just pause) when ZK is lost, and you minimize split-brain risk (because of the pause). We could probably add some timeout to allow containers to continue processing without ZK for a minute or two, in order to minimize unnecessary rebalancing when outages are short (e.g. deployment, GC, etc).

          Show
          criccomini Chris Riccomini added a comment - AFAIK, kafka broker and producer wouldn't be impacted with anything on ZK outage and just additional coordination won't be possible. I hope Samza standalone will work like this. Yes, I agree. I think this has to be a goal. Will update docs to make explicit. Another approach is to provide a persistent standalone UI server. It talks to Zookeeper (assume we use this as the distributed coordination service) to get the leader. It also provides other benefits, such as manage different Samza jobs at one place, review historical records, etc. This is a great idea. I'll update the design doc with it. I think we'll do this. When distribution coordination is down, jobs should be still running. When coordination service comes back, everything should be reinstated just like before coordination was down. Idea: what if we have the containers pause, rather than die, when a ZK connection is lost. That way, processing will stop, and there will be no split brain, but the jobs will not all kill themselves when ZK is lost. When the containers connect back to ZK, they will relinquish their old partitions, and check for new assignments. To me, this seems like the best of both worlds: the jobs don't die (they just pause) when ZK is lost, and you minimize split-brain risk (because of the pause). We could probably add some timeout to allow containers to continue processing without ZK for a minute or two, in order to minimize unnecessary rebalancing when outages are short (e.g. deployment, GC, etc).
          Hide
          criccomini Chris Riccomini added a comment -

          On the "pause when ZK is lost" idea: obviously this means there will be downtime when the ZK grid disappears. This isn't quite what Jae Hyeon Bae was asking for He wanted, "kafka broker and producer wouldn't be impacted with anything on ZK outage and just additional coordination won't be possible. I hope Samza standalone will work like this." I think we'll have to provide a knob to set how long a container can remain running before it pauses. This will let those who care about duplicates set a relatively short pause timeout, and those who care more about uptime set a relatively long (or infinite) pause timeout.

          Show
          criccomini Chris Riccomini added a comment - On the "pause when ZK is lost" idea: obviously this means there will be downtime when the ZK grid disappears. This isn't quite what Jae Hyeon Bae was asking for He wanted, "kafka broker and producer wouldn't be impacted with anything on ZK outage and just additional coordination won't be possible. I hope Samza standalone will work like this." I think we'll have to provide a knob to set how long a container can remain running before it pauses. This will let those who care about duplicates set a relatively short pause timeout, and those who care more about uptime set a relatively long (or infinite) pause timeout.
          Hide
          metacret Jae Hyeon Bae added a comment -

          Pause should be OK, if this can make things simpler and more robust. I believe we have practiced enough to prevent long time outage of ZK quorum.

          Show
          metacret Jae Hyeon Bae added a comment - Pause should be OK, if this can make things simpler and more robust. I believe we have practiced enough to prevent long time outage of ZK quorum.
          Hide
          sriramsub Sriram Subramanian added a comment -

          Thanks Chris for the writeup. I am assuming your latest design doc has all the proposed changes.

          1. Failover -
          I can see three approaches for this.

          a. Standalone job starts container as a sub process -
          Pros

          • Existing containers will not be restarted to failover the containers that were part of the failed stand alone job
          • The degree of parallelism is unaffected by the definition of jobs.container.count. This means even under failure, we maintain the same container count
          • The state of the existing containers is unaffected and restoration of state needs to happen only for the failed containers. This avoids any code to check if the state is locally available to avoid restoration.
            Cons
          • Adds another level of hierarchy but actually it may not be that bad. You can map the stand alone jobs to node managers in the yarn world. The containers are simply monitored by the Standalone job similar to Yarn.

          b. Move tasks to other containers on failure
          Pros

          • On a Standalone job failure, ownership of tasks gets transferred by a rebalance. I am not sure if this makes things any simpler.
            Cons
          • The tasks can get distributed amongst all the containers and in the worst case will restart all the containers in the job
          • Would need more code to make containers immutable and I kind of dislike that. It seems easier to reason about when assignments are immutable
          • We would need special code to avoid restoration of state from Kafka for stores that are already local
          • The jobs.container.count config no longer holds good. The number of containers does not remain constant and can actually reduce over time if we have multiple failures.

          c. Have one standalone job per container

          Here is another proposal

          We do not restrict in having one Standalone job per machine. Instead we can have n number of standalone jobs per machine. This would be determined by the job coordinator. A single stand alone job can run only one container. This maintains 1:1 mapping between process and container.

          Instead of starting the Standalone job on each machine, the user simply starts a Standalone job on one of the machines. The job would try to become the job coordinator on startup and if it succeeds, will proceed to the assignment process. The job coordinator then simply gets the list of machines and no of containers and assigns the containers to machines (Simple round robin strategy should be good enough). It then updates ZK with this assignment and simply starts one Standalone job for each container. The container is completely left as is and does not require any change. The Standalone jobs would listen to ZK for job coordinator changes and for electing new coordinator (This is no different from what you suggest). On a failover election, the coordinator simply gets the assignment from ZK, ensures that the assignment still holds good and if not does a reassignment (This is required in all the approaches).

          Pros

          • Keeps the container immutable and maintains 1:1 process to container mapping
          • Starting a job is a lot easier. Simply start one stand alone job on any machine.
          • On failure, state needs to be restored only for the failed containers.
          • There is no scenario where all containers have to be restarted when one of them change

          Cons

          • This may cause some JVM overhead but that should be minimal

          2. Standalone coordinator

          I really prefer having the coordinator not be standalone since we would need to implement a HA story around it. Having a redirection on each of the standalone job to the coordinator should solve the problem of the jumping UI and should be less complex than coming up with a HA story. Also, with one container per Standalone job process, leaking containers on a coordinator failure should not be an issue.

          Show
          sriramsub Sriram Subramanian added a comment - Thanks Chris for the writeup. I am assuming your latest design doc has all the proposed changes. 1. Failover - I can see three approaches for this. a. Standalone job starts container as a sub process - Pros Existing containers will not be restarted to failover the containers that were part of the failed stand alone job The degree of parallelism is unaffected by the definition of jobs.container.count. This means even under failure, we maintain the same container count The state of the existing containers is unaffected and restoration of state needs to happen only for the failed containers. This avoids any code to check if the state is locally available to avoid restoration. Cons Adds another level of hierarchy but actually it may not be that bad. You can map the stand alone jobs to node managers in the yarn world. The containers are simply monitored by the Standalone job similar to Yarn. b. Move tasks to other containers on failure Pros On a Standalone job failure, ownership of tasks gets transferred by a rebalance. I am not sure if this makes things any simpler. Cons The tasks can get distributed amongst all the containers and in the worst case will restart all the containers in the job Would need more code to make containers immutable and I kind of dislike that. It seems easier to reason about when assignments are immutable We would need special code to avoid restoration of state from Kafka for stores that are already local The jobs.container.count config no longer holds good. The number of containers does not remain constant and can actually reduce over time if we have multiple failures. c. Have one standalone job per container Here is another proposal We do not restrict in having one Standalone job per machine. Instead we can have n number of standalone jobs per machine. This would be determined by the job coordinator. A single stand alone job can run only one container. This maintains 1:1 mapping between process and container. Instead of starting the Standalone job on each machine, the user simply starts a Standalone job on one of the machines. The job would try to become the job coordinator on startup and if it succeeds, will proceed to the assignment process. The job coordinator then simply gets the list of machines and no of containers and assigns the containers to machines (Simple round robin strategy should be good enough). It then updates ZK with this assignment and simply starts one Standalone job for each container. The container is completely left as is and does not require any change. The Standalone jobs would listen to ZK for job coordinator changes and for electing new coordinator (This is no different from what you suggest). On a failover election, the coordinator simply gets the assignment from ZK, ensures that the assignment still holds good and if not does a reassignment (This is required in all the approaches). Pros Keeps the container immutable and maintains 1:1 process to container mapping Starting a job is a lot easier. Simply start one stand alone job on any machine. On failure, state needs to be restored only for the failed containers. There is no scenario where all containers have to be restarted when one of them change Cons This may cause some JVM overhead but that should be minimal 2. Standalone coordinator I really prefer having the coordinator not be standalone since we would need to implement a HA story around it. Having a redirection on each of the standalone job to the coordinator should solve the problem of the jumping UI and should be less complex than coming up with a HA story. Also, with one container per Standalone job process, leaking containers on a coordinator failure should not be an issue.
          Hide
          criccomini Chris Riccomini added a comment -

          Adds another level of hierarchy but actually it may not be that bad. You can map the stand alone jobs to node managers in the yarn world.

          Yea, I tend to agree. If you call them NodeManagers, it's identical to YARN.

          Here is another proposal

          I think you used "standalone job" in some places where you meant to say something else. If I understand you correctly, what you're proposing is to have a little daemon on each node that can start SamzaContainers as separate processes, right? Unlike the "Custom deployment daemon" proposed solution, this daemon would only be a member of a single job (i.e. it can only execute containers for one job).

          In this proposal, how are you monitoring liveness of the subprocesses that are running the SamzaContainers? This was always my worry with using subprocesses vs. threads. The positive aspects of subprocesses are 1) increased isolation, and 2) task.opts works properly.

          I really prefer having the coordinator not be standalone

          Agreed. The proposed solution (not yet doc'd) will have an embedded JC.

          Show
          criccomini Chris Riccomini added a comment - Adds another level of hierarchy but actually it may not be that bad. You can map the stand alone jobs to node managers in the yarn world. Yea, I tend to agree. If you call them NodeManagers, it's identical to YARN. Here is another proposal I think you used "standalone job" in some places where you meant to say something else. If I understand you correctly, what you're proposing is to have a little daemon on each node that can start SamzaContainers as separate processes, right? Unlike the "Custom deployment daemon" proposed solution, this daemon would only be a member of a single job (i.e. it can only execute containers for one job). In this proposal, how are you monitoring liveness of the subprocesses that are running the SamzaContainers? This was always my worry with using subprocesses vs. threads. The positive aspects of subprocesses are 1) increased isolation, and 2) task.opts works properly. I really prefer having the coordinator not be standalone Agreed. The proposed solution (not yet doc'd) will have an embedded JC.
          Hide
          sriramsub Sriram Subramanian added a comment -

          I actually think we don't need a daemon here.

          This is similar to the ZK proposal. These standalone jobs/process could simply start a container as a separate thread or a sub process. Each standalone job/process or whatever we want to call it will contain at most one container. The monitoring flexibility would determine which approach we go with (sub process Vs threads). The main difference here is that we simply wrap a process around a container and maintain the 1:1 mapping. We can spawn n number of process independent of the number of machines.

          Show
          sriramsub Sriram Subramanian added a comment - I actually think we don't need a daemon here. This is similar to the ZK proposal. These standalone jobs/process could simply start a container as a separate thread or a sub process. Each standalone job/process or whatever we want to call it will contain at most one container. The monitoring flexibility would determine which approach we go with (sub process Vs threads). The main difference here is that we simply wrap a process around a container and maintain the 1:1 mapping. We can spawn n number of process independent of the number of machines.
          Hide
          jkreps Jay Kreps added a comment - - edited

          This isn't really an argument for any particular design, but one clarifying thing for me is having a clear picture of how this would be used. Here is what I was imagining.

          Phase 1: You are interested in trying Samza. At this point almost by definition you have one job or maybe two jobs to run. Having to set up and understand YARN is a huge pain. Just starting the processes is easier. The definition of "good" in this phase is simple to get going on your desktop with few moving parts.

          Phase 2: You move this one or two jobs to production and all is well. The definition of "good" in this phase is easy to translate from your desktop to a reasonably operational production setting.

          Phase 3: Over time you decide you want more and more of these. Let's say by the time you have five jobs, balancing them over machines starts to become painful. At this point you bite the bullet and adopt YARN/MESOS or run with more direct EC2 integration or even good old-fashioned deployment frameworks. For people who have already adopted one of these frameworks likely they would skip (2) entirely. I really think these "cluster manager frameworks" are the future and we should really not reinvent them. However adopting one is a pretty big commitment and the last thing you want to do is adopt multiple of them, one for new thing. So the goal of the standalone work is two-fold:
          a. To make initial adoption easier.
          b. To make it easier to plug into a cluster manager framework (because now the only thing the cluster manager does is attempt to keep N instances alive). The end result for most orgs will still be large scale deployment in such a framework.
          The definition of good in this phase is that it is possible to run at organization wide scale on a shared pool of capacity.

          It's worth discussing if we agree one this "life cycle" and use cases, since if we disagree it is hard to evaluate any proposal.

          For me, an outcome of this is I think the goal should be to make the standalone deployment model as simple as possible and just run one single threaded process--no spawning of processes or anything like that. Why? Two reasons:
          a. If we start spawning processes then embedding this inside YARN/MESOS/etc will become super weird--you will have a cluster manager within a cluster manager.
          b. It complicates the usage in phase (1) and (2) which was the primary goal. The more complicated model might be better in (3), but for (3) our answer is "Son, you need to get yourself a real cluster manager".

          I do think ideally this process would manage the lifecycle of task dynamically so a container could add and drop tasks without restarting all tasks, but just restarting the full container instance would be an intermediate to that. I also think ideally we would retain this model whether running inside cluster managers or not, this way your mental model is the same in all contexts, the only difference is whether your processes get restarted or not.

          Show
          jkreps Jay Kreps added a comment - - edited This isn't really an argument for any particular design, but one clarifying thing for me is having a clear picture of how this would be used. Here is what I was imagining. Phase 1: You are interested in trying Samza. At this point almost by definition you have one job or maybe two jobs to run. Having to set up and understand YARN is a huge pain. Just starting the processes is easier. The definition of "good" in this phase is simple to get going on your desktop with few moving parts. Phase 2: You move this one or two jobs to production and all is well. The definition of "good" in this phase is easy to translate from your desktop to a reasonably operational production setting. Phase 3: Over time you decide you want more and more of these. Let's say by the time you have five jobs, balancing them over machines starts to become painful. At this point you bite the bullet and adopt YARN/MESOS or run with more direct EC2 integration or even good old-fashioned deployment frameworks. For people who have already adopted one of these frameworks likely they would skip (2) entirely. I really think these "cluster manager frameworks" are the future and we should really not reinvent them. However adopting one is a pretty big commitment and the last thing you want to do is adopt multiple of them, one for new thing. So the goal of the standalone work is two-fold: a. To make initial adoption easier. b. To make it easier to plug into a cluster manager framework (because now the only thing the cluster manager does is attempt to keep N instances alive). The end result for most orgs will still be large scale deployment in such a framework. The definition of good in this phase is that it is possible to run at organization wide scale on a shared pool of capacity. It's worth discussing if we agree one this "life cycle" and use cases, since if we disagree it is hard to evaluate any proposal. For me, an outcome of this is I think the goal should be to make the standalone deployment model as simple as possible and just run one single threaded process--no spawning of processes or anything like that. Why? Two reasons: a. If we start spawning processes then embedding this inside YARN/MESOS/etc will become super weird--you will have a cluster manager within a cluster manager. b. It complicates the usage in phase (1) and (2) which was the primary goal. The more complicated model might be better in (3), but for (3) our answer is "Son, you need to get yourself a real cluster manager". I do think ideally this process would manage the lifecycle of task dynamically so a container could add and drop tasks without restarting all tasks, but just restarting the full container instance would be an intermediate to that. I also think ideally we would retain this model whether running inside cluster managers or not, this way your mental model is the same in all contexts, the only difference is whether your processes get restarted or not.
          Hide
          criccomini Chris Riccomini added a comment -

          We can spawn n number of process independent of the number of machines.

          I'm confused. How does this fit with job.container.count? If I start 5 processes, and I have job.container.count=5, and I maintain a 1:1 process:container mapping, are 5 containers just never going to run?

          Jay Kreps, I think the phases you've outlined sound reasonable. Thinking through what these phases would look like with multiple SamzaContainers per process.

          Phase 1: this is pretty much going to work. You'll run your job with one process, and job.container.count=1 will work as expected. 1 number of threads will be started in your process. Things will just work. You won't even know about the threading or container count. As soon as you start a second process for your job, you'll have to understand what job.container.count means, and you'll have to change it (on all processes, since every process has its own config file right now).

          Phase 2: Here, you'll need to understand job.container.count. You'll need to set it to something based on your max process count (or some value greater than this).

          Phase 3: Go use a cluster manager. Everything continues to work the same way, except now you have to understand job.container.count.

          Phases 1 and 2 sound pretty bad, from a usability perspective. I anticipate that we'd have to answer a lot of questions on the mailing list about this.

          Thinking through the design of partition-shifting, I think the main sticking point here is whether Samza should have the ability to add/remove containers. Under the 1:1 process:container proposal, Samza doesn't have the ability to add or remove containers. This number is determined by how many containers are started by the user. This number changes over time, as new processes (or machines) start and stop.

          In all of the other proposals, Samza can ask to start a container. This very much models how things work with YARN or Mesos. The reason that I think this is an important distinction is because it has a big effect on HA. If Samza can't start containers, then it has to shift partitions when a failure occurs if we want to be HA. Samza, from the beginning, was explicitly designed not to have to shift partitions. We relied on YARN for this.

          Changing Samza to a model where it moves partitions on running containers is a very big paradigm shift. To do this right would mean re-writing large chunks of SamzaContainer, as well as samza-yarn. Honestly, if we want to go this route, then using Helix seems like a reasonable fit, since it's exactly what Helix does.

          The use case you've outlined is one in which partition shifting is central to Samza's design, as it would be used by all deployments, whether on YARN, Mesos, or standalone. If we go this route, I feel like we should re-write the container to do partition shifting properly. The other proposals are all essentially ways to avoid having to do this.

          Your point about making Samza more pluggable for other cluster managers (3b) is relevant. Our strategy for this was to invert the AM/JobCoordinator control, such that JobCoordinator becomes the thing that you run (not run-am.sh), and it has a ClusterManager interface, which it can use to tell the cluster manager what to do (start/stop containers). We can then plug in YARN, Mesos, or our own standalone-zk-based one. One could argue that this isn't the right approach, and it's going to be hard to write an interface that interoperates properly with more than one cluster manager (requesting hosts, defining resource requirements, etc). We've already done that with the JobRunner interface, though, and it's worked well enough for both YARN and Mesos.

          The internal struggle I'm having is whether all of the partition-shifting work is worth it, just to avoid job.container.count. If we believe that the world is going to be run in cluster-managed containers in the future, an argument could be made that we should optimize for that case. We can still support standalone jobs right now, using one of the proposals above. I agree that it is confusing to have job.container.count=10, and run just a single Java process. I also agree that using N threads in this model is confusing, and that task.opts won't work. It's janky.

          I'm not trying to be particularly dogmatic about the design, I'm just concerned about practicalities. I want to do the "right" thing for the main use case. I think there's some tension here between what's "right" for standalone, and what's "right" for cluster-managed. Thus far, what we've built is "right" for cluster-managed. If we support standalone as a first-class citizen, and introduce partition-shifting as a first-class feature, it complicates the operations for YARN and Mesos jobs (they now require ZK-based dependencies, have to have two layers of liveness monitoring (ZK + NM heartbeat), have two tiers of coordination (AM + JobCoordinator), etc). It also means a lot more code has to change.

          Show
          criccomini Chris Riccomini added a comment - We can spawn n number of process independent of the number of machines. I'm confused. How does this fit with job.container.count? If I start 5 processes, and I have job.container.count=5, and I maintain a 1:1 process:container mapping, are 5 containers just never going to run? Jay Kreps , I think the phases you've outlined sound reasonable. Thinking through what these phases would look like with multiple SamzaContainers per process. Phase 1: this is pretty much going to work. You'll run your job with one process, and job.container.count=1 will work as expected. 1 number of threads will be started in your process. Things will just work. You won't even know about the threading or container count. As soon as you start a second process for your job, you'll have to understand what job.container.count means, and you'll have to change it (on all processes, since every process has its own config file right now). Phase 2: Here, you'll need to understand job.container.count. You'll need to set it to something based on your max process count (or some value greater than this). Phase 3: Go use a cluster manager. Everything continues to work the same way, except now you have to understand job.container.count. Phases 1 and 2 sound pretty bad, from a usability perspective. I anticipate that we'd have to answer a lot of questions on the mailing list about this. Thinking through the design of partition-shifting, I think the main sticking point here is whether Samza should have the ability to add/remove containers. Under the 1:1 process:container proposal, Samza doesn't have the ability to add or remove containers. This number is determined by how many containers are started by the user. This number changes over time, as new processes (or machines) start and stop. In all of the other proposals, Samza can ask to start a container. This very much models how things work with YARN or Mesos. The reason that I think this is an important distinction is because it has a big effect on HA. If Samza can't start containers, then it has to shift partitions when a failure occurs if we want to be HA. Samza, from the beginning, was explicitly designed not to have to shift partitions. We relied on YARN for this. Changing Samza to a model where it moves partitions on running containers is a very big paradigm shift. To do this right would mean re-writing large chunks of SamzaContainer, as well as samza-yarn. Honestly, if we want to go this route, then using Helix seems like a reasonable fit, since it's exactly what Helix does. The use case you've outlined is one in which partition shifting is central to Samza's design, as it would be used by all deployments, whether on YARN, Mesos, or standalone. If we go this route, I feel like we should re-write the container to do partition shifting properly. The other proposals are all essentially ways to avoid having to do this. Your point about making Samza more pluggable for other cluster managers (3b) is relevant. Our strategy for this was to invert the AM/JobCoordinator control, such that JobCoordinator becomes the thing that you run (not run-am.sh), and it has a ClusterManager interface, which it can use to tell the cluster manager what to do (start/stop containers). We can then plug in YARN, Mesos, or our own standalone-zk-based one. One could argue that this isn't the right approach, and it's going to be hard to write an interface that interoperates properly with more than one cluster manager (requesting hosts, defining resource requirements, etc). We've already done that with the JobRunner interface, though, and it's worked well enough for both YARN and Mesos. The internal struggle I'm having is whether all of the partition-shifting work is worth it, just to avoid job.container.count. If we believe that the world is going to be run in cluster-managed containers in the future, an argument could be made that we should optimize for that case. We can still support standalone jobs right now, using one of the proposals above. I agree that it is confusing to have job.container.count=10, and run just a single Java process. I also agree that using N threads in this model is confusing, and that task.opts won't work. It's janky. I'm not trying to be particularly dogmatic about the design, I'm just concerned about practicalities. I want to do the "right" thing for the main use case. I think there's some tension here between what's "right" for standalone, and what's "right" for cluster-managed. Thus far, what we've built is "right" for cluster-managed. If we support standalone as a first-class citizen, and introduce partition-shifting as a first-class feature, it complicates the operations for YARN and Mesos jobs (they now require ZK-based dependencies, have to have two layers of liveness monitoring (ZK + NM heartbeat), have two tiers of coordination (AM + JobCoordinator), etc). It also means a lot more code has to change.
          Hide
          sriramsub Sriram Subramanian added a comment -

          I completely agree with that vision.The basic reason we want this feature is to have small number of jobs in production in companies that do not want to use YARN/MESOS. Once they scale to more jobs, they should definitely be using YARN/MESOS.

          On those lines, I can think of the following design goals

          1. Design should be easily pluggable into a cluster manager. We already have this design in place which we use with YARN.
          2. If 1 is true, it would be useful for the containers to be unaffected by this change.
          3. Make minimal changes what we have already to support this use case

          Show
          sriramsub Sriram Subramanian added a comment - I completely agree with that vision.The basic reason we want this feature is to have small number of jobs in production in companies that do not want to use YARN/MESOS. Once they scale to more jobs, they should definitely be using YARN/MESOS. On those lines, I can think of the following design goals 1. Design should be easily pluggable into a cluster manager. We already have this design in place which we use with YARN. 2. If 1 is true, it would be useful for the containers to be unaffected by this change. 3. Make minimal changes what we have already to support this use case
          Hide
          criccomini Chris Riccomini added a comment -

          Also, 1:1 process:container mapping need not require partition shifting, which I think was your point at the end. Fully restarting a container would allow this to work. The price we pay here is really the restart time for state. We could probably re-use state that existed before the restart, though. To your point:

          I do think ideally this process would manage the lifecycle of task dynamically so a container could add and drop tasks without restarting all tasks, but just restarting the full container instance would be an intermediate to that. I also think ideally we would retain this model whether running inside cluster managers or not, this way your mental model is the same in all contexts, the only difference is whether your processes get restarted or not.

          I think that bouncing containers is the wrong long-approach if we want partition shifting as a first-class feature. Based on the above quote, it sounds like you feel it should be. We could start with simple container restarts, though, then add local state reuse, then add full-blown partition shifting later. I just fear that we'll do (1) and (2), and never get to (3). If that's the case, then we've got a crummy implementation of partition shifting, and are treating it as a full-fledged "thing" that everyone uses in YARN, Mesos, as well as standalone.

          Show
          criccomini Chris Riccomini added a comment - Also, 1:1 process:container mapping need not require partition shifting, which I think was your point at the end. Fully restarting a container would allow this to work. The price we pay here is really the restart time for state. We could probably re-use state that existed before the restart, though. To your point: I do think ideally this process would manage the lifecycle of task dynamically so a container could add and drop tasks without restarting all tasks, but just restarting the full container instance would be an intermediate to that. I also think ideally we would retain this model whether running inside cluster managers or not, this way your mental model is the same in all contexts, the only difference is whether your processes get restarted or not. I think that bouncing containers is the wrong long-approach if we want partition shifting as a first-class feature. Based on the above quote, it sounds like you feel it should be. We could start with simple container restarts, though, then add local state reuse, then add full-blown partition shifting later. I just fear that we'll do (1) and (2), and never get to (3). If that's the case, then we've got a crummy implementation of partition shifting, and are treating it as a full-fledged "thing" that everyone uses in YARN, Mesos, as well as standalone.
          Hide
          sriramsub Sriram Subramanian added a comment -

          Chris: I think the disconnect is about the naming here. With what I am suggesting, the stand alone process is not a standalone job. The job.container.count is a property of the coordinator (one per job). The coordinator simply starts multiple containers each wrapped in its own process which contains the logic for coordinator election and liveness check.

          Show
          sriramsub Sriram Subramanian added a comment - Chris: I think the disconnect is about the naming here. With what I am suggesting, the stand alone process is not a standalone job. The job.container.count is a property of the coordinator (one per job). The coordinator simply starts multiple containers each wrapped in its own process which contains the logic for coordinator election and liveness check.
          Hide
          jkreps Jay Kreps added a comment -

          Hey Chris, yes exactly.

          I think what I was proposing was
          1. Move to a partition shifting model. Effectively we have a partition shifting model today but the way we do that is by stopping and restarting the job.
          2. Initial implementation of this could destroy and recreate the container object to avoid to much code change. But this would be done transparently inside the process without actually killing the process. If state was reused I actually don't think there would be any great downside to this implementation as the new container could potentially be recreated quite quickly.
          3. Improved future implementation could potentially move to individual task destruction and creation within the container code, but as I said this is just an implementation detail of how the task switching is accomplished.

          Show
          jkreps Jay Kreps added a comment - Hey Chris, yes exactly. I think what I was proposing was 1. Move to a partition shifting model. Effectively we have a partition shifting model today but the way we do that is by stopping and restarting the job. 2. Initial implementation of this could destroy and recreate the container object to avoid to much code change. But this would be done transparently inside the process without actually killing the process. If state was reused I actually don't think there would be any great downside to this implementation as the new container could potentially be recreated quite quickly. 3. Improved future implementation could potentially move to individual task destruction and creation within the container code, but as I said this is just an implementation detail of how the task switching is accomplished.
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching design doc with a straw man proposed solution. The proposal is thinking through the 1:1 process:container ZooKeeper managed solution. This implementation seems a bit complicated. There just seem to be a lot of mis-matches between the way Samza was designed, and how we're trying to shoe-horn it into a ZooKeeper managed model.

          Have a look. Feedback welcome.

          Show
          criccomini Chris Riccomini added a comment - Attaching design doc with a straw man proposed solution. The proposal is thinking through the 1:1 process:container ZooKeeper managed solution. This implementation seems a bit complicated. There just seem to be a lot of mis-matches between the way Samza was designed, and how we're trying to shoe-horn it into a ZooKeeper managed model. Have a look. Feedback welcome.
          Hide
          criccomini Chris Riccomini added a comment -

          Hey all, I read over my strawman today, and actually felt much better about it. My main gripes were:

          1. It required a change to container (re-usable state)
          2. It breaks some configs (job.container.count, task.opts)
          3. It inverts the logic of JobCoordinator.

          For (1), I realized that we're going to want this feature no matter what. There are two modes where this is useful. One is on container fail-over/auto-scale. The other is just when you bounce/redploy your job on a machine. In the latter case, re-usable state is useful, whether we have multiple containers in a single process or not. It's also useful in YARN/Mesos, where you can re-request machines for a container.

          For (2), job.container.count is a new config introduced in 0.9.0. It was yarn.container.count before, so we could just move back to that model. For task.opts, it actually makes sense to have this at the cluster-manager level. We already have problems with this for ThreadJob, where we have to WARN that task.opts doesn't work. Making it yarn.task.opts makes a lot more sense. In fact, it should be yarn.container.opts.

          For (3), I actually think changing the way that we think about JobCoordinator might make sense. When we inver the YARN AM/JobCoordinator logic, I was thinking the JC would tell YARN, "and now start container X." Obviously this doesn't work in standalone mode because you have no mechanism to start more containers (unless a human adds one). You have to respond to container availability,"hey, there are N containers available, what partitions should they process?" This model actually fits fairly well with Mesos' offer system (here are the containers, pick what you want). So, I think I'd just been looking at things from a YARN-based perspective, when actually both standalone AND Mesos would benefit from an offer-based approach.

          So I'm actually pretty happy with the single-container-per-partition proposal in the design doc.

          The only remaining question is whether this belongs in samza-core, or in samza-standalone. There's an argument to be made that if we have samza-standalone, you should just use that. If you want to use YARN, you can use samza-standalone with Slider. If you want to use samza-standalone with Mesos, you can just use Marathon.

          Two reasons that I'm not crazy about having these changes in samza-core are:

          1. It adds a ZK-based dependency in Mesos/YARN Samza jobs when you don't actually need it.
          2. You fore-go some cluster-management-specific features that might be useful.

          I'm going to move forward with samza-standalone as an isolated module. There's nothing preventing users from using it with Mesos or YARN via Marathon/Slider. In the short term, we'll probably continue maintaining YARN-specific and Mesos-specific implementations that don't use ZooKeeper. In the future, if we find the benefit isn't worth it, we can delete the cluster-specific implementations.

          Show
          criccomini Chris Riccomini added a comment - Hey all, I read over my strawman today, and actually felt much better about it. My main gripes were: It required a change to container (re-usable state) It breaks some configs (job.container.count, task.opts) It inverts the logic of JobCoordinator. For (1), I realized that we're going to want this feature no matter what. There are two modes where this is useful. One is on container fail-over/auto-scale. The other is just when you bounce/redploy your job on a machine. In the latter case, re-usable state is useful, whether we have multiple containers in a single process or not. It's also useful in YARN/Mesos, where you can re-request machines for a container. For (2), job.container.count is a new config introduced in 0.9.0. It was yarn.container.count before, so we could just move back to that model. For task.opts, it actually makes sense to have this at the cluster-manager level. We already have problems with this for ThreadJob, where we have to WARN that task.opts doesn't work. Making it yarn.task.opts makes a lot more sense. In fact, it should be yarn.container.opts. For (3), I actually think changing the way that we think about JobCoordinator might make sense. When we inver the YARN AM/JobCoordinator logic, I was thinking the JC would tell YARN, "and now start container X." Obviously this doesn't work in standalone mode because you have no mechanism to start more containers (unless a human adds one). You have to respond to container availability,"hey, there are N containers available, what partitions should they process?" This model actually fits fairly well with Mesos' offer system (here are the containers, pick what you want). So, I think I'd just been looking at things from a YARN-based perspective, when actually both standalone AND Mesos would benefit from an offer-based approach. So I'm actually pretty happy with the single-container-per-partition proposal in the design doc. The only remaining question is whether this belongs in samza-core, or in samza-standalone. There's an argument to be made that if we have samza-standalone, you should just use that. If you want to use YARN, you can use samza-standalone with Slider. If you want to use samza-standalone with Mesos, you can just use Marathon. Two reasons that I'm not crazy about having these changes in samza-core are: It adds a ZK-based dependency in Mesos/YARN Samza jobs when you don't actually need it. You fore-go some cluster-management-specific features that might be useful. I'm going to move forward with samza-standalone as an isolated module. There's nothing preventing users from using it with Mesos or YARN via Marathon/Slider. In the short term, we'll probably continue maintaining YARN-specific and Mesos-specific implementations that don't use ZooKeeper. In the future, if we find the benefit isn't worth it, we can delete the cluster-specific implementations.
          Hide
          criccomini Chris Riccomini added a comment -

          I was thinking more about fully adopting the ZK model, and specific YARN/Mesos support (my last comment). This would allow us to delete a bunch of code, which is pretty nice. One thing that I wonder is how it will affect other parts of Samza. I think query languages (SAMZA-390) could still work without a cluster manager, provided that all queries always run as a single job (something that is do-able). Running a query via the CLI would just start a container (or more than one) locally. One area that's a bit more unclear is how developers could reprocess data easily. I think most reprocessing would work pretty well in standalone mode--you'd just start a different instance of your job (job.id). One that's a little more confusing is when you run a Samza job like a batch job. For example, if you keep your current job running, but spin up a second job to reprocess from offset 0 to head, then shutdown (while the current job keeps running). In this mode, without a cluster manager, it's a little strange. You basically have to start a standalone job, wait until it catches up, and then shut it down. Not a show stopper, but worth considering.

          Show
          criccomini Chris Riccomini added a comment - I was thinking more about fully adopting the ZK model, and specific YARN/Mesos support (my last comment). This would allow us to delete a bunch of code, which is pretty nice. One thing that I wonder is how it will affect other parts of Samza. I think query languages ( SAMZA-390 ) could still work without a cluster manager, provided that all queries always run as a single job (something that is do-able). Running a query via the CLI would just start a container (or more than one) locally. One area that's a bit more unclear is how developers could reprocess data easily. I think most reprocessing would work pretty well in standalone mode--you'd just start a different instance of your job (job.id). One that's a little more confusing is when you run a Samza job like a batch job. For example, if you keep your current job running, but spin up a second job to reprocess from offset 0 to head, then shutdown (while the current job keeps running). In this mode, without a cluster manager, it's a little strange. You basically have to start a standalone job, wait until it catches up, and then shut it down. Not a show stopper, but worth considering.
          Hide
          criccomini Chris Riccomini added a comment - - edited

          Note: Yi Pan (Data Infrastructure) had a good pointer to [this|http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection
          ] leader election trick to reduce the number of watchers on a single znode. Adding here so I don't lose track of it.

          Show
          criccomini Chris Riccomini added a comment - - edited Note: Yi Pan (Data Infrastructure) had a good pointer to [this|http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection ] leader election trick to reduce the number of watchers on a single znode. Adding here so I don't lose track of it.
          Hide
          criccomini Chris Riccomini added a comment -

          I have opened subtasks for this ticket. Please speak up if you think I've missed something.

          Show
          criccomini Chris Riccomini added a comment - I have opened subtasks for this ticket. Please speak up if you think I've missed something.
          Hide
          benjaminblack Benjamin Black added a comment - - edited

          I know I'm late to this game, so apologies. The 1:1 approach with existing ZK hierarchy, bouncing containers to move tasks and assuming that a large number of production jobs means running YARN/Mesos don't jibe with what I was hoping to get out of standalone. What I was hoping to see is something more like Ordasity (https://github.com/boundary/ordasity) that supports 1:N in a more flexible hierarchy, does online rebalancing of tasks, and does not assume a heavyweight cluster manager. The existing proposal seems to treat standalone as a toy/dev thing rather than a first class production thing.

          Show
          benjaminblack Benjamin Black added a comment - - edited I know I'm late to this game, so apologies. The 1:1 approach with existing ZK hierarchy, bouncing containers to move tasks and assuming that a large number of production jobs means running YARN/Mesos don't jibe with what I was hoping to get out of standalone. What I was hoping to see is something more like Ordasity ( https://github.com/boundary/ordasity ) that supports 1:N in a more flexible hierarchy, does online rebalancing of tasks, and does not assume a heavyweight cluster manager. The existing proposal seems to treat standalone as a toy/dev thing rather than a first class production thing.
          Hide
          criccomini Chris Riccomini added a comment -

          Thanks for the feedback, Ben. Our intent is not to treat standalone as a toy/dev thing.

          I think there are several things onere:

          1. Requires cluster manager for large number of jobs?
          2. 1:N container:task mapping?
          3. Online rebalancing?

          For (1), if you never want to run with a cluster manager (understandable), there's nothing preventing you from using standalone. If you are convinced that this feature is not a toy, then it should work fine for 100s or 1000s of jobs, provided you have a good deployment mechanism for them.

          We already support a 1:N mapping as a feature. I think what you're saying is you'd rather have multiple containers per Java process, each with its own set of tasks. This would be a 1 JVM : N container s: M tasks style, right? I think all people really care about are that there's a 1:N JVM:task mapping. Whether they're in one container or many containers within a single JVM shouldn't matter all that much.

          The other thing that people care about w.r.t. (2) is that if there's just a single container per-JVM, that rebalances happen fast. To speed this up, we're working on state-reuse (SAMZA-557), so restarting a container (but keeping the same JVM process) to add/remove tasks should be very cheap (a few seconds).

          I agree that this design is unconventional. From scratch, something like Ordasity or Helix would make more sense. But we're trying to balance what we've already got with the direction that we're trying to go in. Eventually, I think we'll probably just use standalone, and cluster managers will just use Slider, Marathon, etc to execute the standalone jobs as long-running processes.

          Show
          criccomini Chris Riccomini added a comment - Thanks for the feedback, Ben. Our intent is not to treat standalone as a toy/dev thing. I think there are several things onere: Requires cluster manager for large number of jobs? 1:N container:task mapping? Online rebalancing? For (1), if you never want to run with a cluster manager (understandable), there's nothing preventing you from using standalone. If you are convinced that this feature is not a toy, then it should work fine for 100s or 1000s of jobs, provided you have a good deployment mechanism for them. We already support a 1:N mapping as a feature. I think what you're saying is you'd rather have multiple containers per Java process, each with its own set of tasks. This would be a 1 JVM : N container s: M tasks style, right? I think all people really care about are that there's a 1:N JVM:task mapping. Whether they're in one container or many containers within a single JVM shouldn't matter all that much. The other thing that people care about w.r.t. (2) is that if there's just a single container per-JVM, that rebalances happen fast. To speed this up, we're working on state-reuse ( SAMZA-557 ), so restarting a container (but keeping the same JVM process) to add/remove tasks should be very cheap (a few seconds). I agree that this design is unconventional. From scratch, something like Ordasity or Helix would make more sense. But we're trying to balance what we've already got with the direction that we're trying to go in. Eventually, I think we'll probably just use standalone, and cluster managers will just use Slider, Marathon, etc to execute the standalone jobs as long-running processes.
          Hide
          amalhotra Ankit Malhotra added a comment -

          Hi - Have not seen much of an update on this ticket. Is this something that is actively being worked on? Not needing YARN/Mesos would be a huge win.

          Show
          amalhotra Ankit Malhotra added a comment - Hi - Have not seen much of an update on this ticket. Is this something that is actively being worked on? Not needing YARN/Mesos would be a huge win.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Ankit Malhotra, yes, we recognize the need from the community. The current effort was shifted to SAMZA-1063. Please take a look at the refreshed design and comment.

          P.S. some of the sub-tasks here may also be superseded by newer tasks under SAMZA-1063. We will update here. Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Ankit Malhotra , yes, we recognize the need from the community. The current effort was shifted to SAMZA-1063 . Please take a look at the refreshed design and comment. P.S. some of the sub-tasks here may also be superseded by newer tasks under SAMZA-1063 . We will update here. Thanks!

            People

            • Assignee:
              criccomini Chris Riccomini
              Reporter:
              criccomini Chris Riccomini
            • Votes:
              3 Vote for this issue
              Watchers:
              16 Start watching this issue

              Dates

              • Created:
                Updated:

                Development