Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7.0
    • Component/s: None
    • Labels:
      None

      Description

      Extracted from SAMZA-179.

      At the moment, TaskCoordinator.shutdown() can be called by any task, and immediately shuts down the container as soon as the current message has finished processing. That is appropriate in some cases, but not always. Sometimes, what we actually want is for each task to vote that it's ready to shut down, and for the container to be shut down when all tasks within that container have voted.

      A first implementation of this is on https://reviews.apache.org/r/19384/ but it's mixed up with several other concerns. This ticket is to extract only the shutdown API changes (TaskCoordinator.ShutdownMethod) from that patch, and to address the relevant comments made on that RB.

      1. SAMZA-253.1.patch
        18 kB
        Martin Kleppmann
      2. SAMZA-253.3.patch
        36 kB
        Martin Kleppmann
      3. SAMZA-253.final.patch
        36 kB
        Martin Kleppmann

        Issue Links

          Activity

          Hide
          martinkl Martin Kleppmann added a comment -

          Attached patch in which just the shutdown API is extracted from SAMZA-179.

          Show
          martinkl Martin Kleppmann added a comment - Attached patch in which just the shutdown API is extracted from SAMZA-179 . New RB: https://reviews.apache.org/r/20802/ I'm also adding some comments to SAMZA-179 's RB: https://reviews.apache.org/r/19384/
          Hide
          criccomini Chris Riccomini added a comment -

          1. The TaskCoordinators/TaskInstance setup still bugs me a bit. The reason it bugs me is that we've made the coordinator mutable, and live beyond a single process loop, but have kept the APIs the same.

          What do you think about just giving the TaskInstanceCoordinator as a parameter in TaskInstance's constructor, and eliminating the (..., coordinator: TaskCoordinators) parameter from all methods in SamzaContainer/TaskInstance? The SamzaContainer can still use the TaskCoordinators to check commit/shutdown, and reset.

          The reason I like this approach a bit more is because I think it fits a bit better with how the coordinator works now. Before, we had to pass the coordinator into every method because it's lifecycle only extended to one process() invocation. Without this, it seems better to just give it to the TaskInstance once.

          2. One other thing to think through is that having the coordinator live beyond a single process() call means that a StreamTask can hold on to it and potentially do weird things like manipulate it from another thread. In general, we haven't allows this, which is why the ReadableCoordinator only lived for one process-loop. The only place where we deviate from this (that I recall) is with the MetricsRegistry in the TaskContext. In a sense, this is the same problem that I describe in (1), above. We are continuing to tread the coordinator as a single-process-loop object, when it's not anymore.

          What do you think about keeping ReadableCoordinator the way it is (GC'd at end of process()-loop), and having a separate shutdown object that the SamzaCoordinator updates? (I realize this is a different approach than what I was saying in (1) above, but bear with me.) This approach would look at the coordinator's shutdown invocation (SHUTDOWN_NOW, or WAIT_FOR_ALL_TASKS), and update the shutdown object accordingly. The SamzaContainer would then check the shutdown object to see if it should shutdown. I kind of like this approach because it keeps the ReadableCoordinator more isolated, which I worry about when giving objects to StreamTasks.

          3. Also, I'm a little worried about performance here. We're adding three new loops per-process loop. These loops have proven to be slow (especially the Scala loops). I think this is OK, though, since we're going to have to tackle the loop-performance issue holistically in the SamzaContainer anyway.

          Taking the approach I outline in (2) would eliminate some of these loops because you could simply discard the process()-local coordinator, rather than iterating over all of them to reset them.

          Show
          criccomini Chris Riccomini added a comment - 1. The TaskCoordinators/TaskInstance setup still bugs me a bit. The reason it bugs me is that we've made the coordinator mutable, and live beyond a single process loop, but have kept the APIs the same. What do you think about just giving the TaskInstanceCoordinator as a parameter in TaskInstance's constructor, and eliminating the (..., coordinator: TaskCoordinators) parameter from all methods in SamzaContainer/TaskInstance? The SamzaContainer can still use the TaskCoordinators to check commit/shutdown, and reset. The reason I like this approach a bit more is because I think it fits a bit better with how the coordinator works now. Before, we had to pass the coordinator into every method because it's lifecycle only extended to one process() invocation. Without this, it seems better to just give it to the TaskInstance once. 2. One other thing to think through is that having the coordinator live beyond a single process() call means that a StreamTask can hold on to it and potentially do weird things like manipulate it from another thread. In general, we haven't allows this, which is why the ReadableCoordinator only lived for one process-loop. The only place where we deviate from this (that I recall) is with the MetricsRegistry in the TaskContext. In a sense, this is the same problem that I describe in (1), above. We are continuing to tread the coordinator as a single-process-loop object, when it's not anymore. What do you think about keeping ReadableCoordinator the way it is (GC'd at end of process()-loop), and having a separate shutdown object that the SamzaCoordinator updates? (I realize this is a different approach than what I was saying in (1) above, but bear with me.) This approach would look at the coordinator's shutdown invocation (SHUTDOWN_NOW, or WAIT_FOR_ALL_TASKS), and update the shutdown object accordingly. The SamzaContainer would then check the shutdown object to see if it should shutdown. I kind of like this approach because it keeps the ReadableCoordinator more isolated, which I worry about when giving objects to StreamTasks. 3. Also, I'm a little worried about performance here. We're adding three new loops per-process loop. These loops have proven to be slow (especially the Scala loops). I think this is OK, though, since we're going to have to tackle the loop-performance issue holistically in the SamzaContainer anyway. Taking the approach I outline in (2) would eliminate some of these loops because you could simply discard the process()-local coordinator, rather than iterating over all of them to reset them.
          Hide
          jghoman Jakob Homan added a comment -

          Canceling patch post-review.

          Show
          jghoman Jakob Homan added a comment - Canceling patch post-review.
          Hide
          martinkl Martin Kleppmann added a comment -

          Ok, makes sense. I've implemented your point 2 in an alternative patch: https://reviews.apache.org/r/21014/

          It was complicated by the fact that a task could request shutdown in a process call, a window call or both. Since one iteration of the run loop involves calling the process call on only one TaskInstance, but the window call on either all or none task instances, a bit of extra bookkeeping is required to keep track of which task instances have requested shutdown.

          The nicest approach seemed to be to move the time checks (for window and for commit) from the TaskInstance into the SamzaContainer, and to use each coordinator for only one process or window call. That also saves looping over all task instances on every iteration of the run loop. While I was at it, I also changed the commit request semantics as described in SAMZA-23. Not optimal to lump this together in one patch, but it's quite closely related.

          The patch still needs tests, but I'd be interested in your feedback on whether you think this approach is better.

          Show
          martinkl Martin Kleppmann added a comment - Ok, makes sense. I've implemented your point 2 in an alternative patch: https://reviews.apache.org/r/21014/ It was complicated by the fact that a task could request shutdown in a process call, a window call or both. Since one iteration of the run loop involves calling the process call on only one TaskInstance, but the window call on either all or none task instances, a bit of extra bookkeeping is required to keep track of which task instances have requested shutdown. The nicest approach seemed to be to move the time checks (for window and for commit) from the TaskInstance into the SamzaContainer, and to use each coordinator for only one process or window call. That also saves looping over all task instances on every iteration of the run loop. While I was at it, I also changed the commit request semantics as described in SAMZA-23 . Not optimal to lump this together in one patch, but it's quite closely related. The patch still needs tests, but I'd be interested in your feedback on whether you think this approach is better.
          Hide
          martinkl Martin Kleppmann added a comment -

          This will probably conflict with SAMZA-71/SAMZA-123, since the current code works with a map from Partition to TaskInstance, and that will need to change to using task name (cohort) instead of partition. Jakob Homan, is there something I should be doing now to minimize merge conflicts?

          Show
          martinkl Martin Kleppmann added a comment - This will probably conflict with SAMZA-71 / SAMZA-123 , since the current code works with a map from Partition to TaskInstance, and that will need to change to using task name (cohort) instead of partition. Jakob Homan , is there something I should be doing now to minimize merge conflicts?
          Hide
          jghoman Jakob Homan added a comment -

          I'm merging master in each day, so there shouldn't be any problems.

          Show
          jghoman Jakob Homan added a comment - I'm merging master in each day, so there shouldn't be any problems.
          Hide
          criccomini Chris Riccomini added a comment -

          Published comments on RB. The second approach is a little rough around the edges, but if it were cleaned up, I think I'd like it better. What do you guys think?

          Show
          criccomini Chris Riccomini added a comment - Published comments on RB. The second approach is a little rough around the edges, but if it were cleaned up, I think I'd like it better. What do you guys think?
          Hide
          martinkl Martin Kleppmann added a comment -

          Updated RB: https://reviews.apache.org/r/21014/ and attaching new patch (SAMZA-253.3.patch). This tidies up the previous patch, in particular extracting the run loop out of SamzaContainer into a new class, and adding tests.

          Show
          martinkl Martin Kleppmann added a comment - Updated RB: https://reviews.apache.org/r/21014/ and attaching new patch ( SAMZA-253 .3.patch). This tidies up the previous patch, in particular extracting the run loop out of SamzaContainer into a new class, and adding tests.
          Hide
          criccomini Chris Riccomini added a comment -

          Latest patch was put up on RB, which I +1'd. Make sure to post here before committing.

          Show
          criccomini Chris Riccomini added a comment - Latest patch was put up on RB, which I +1'd. Make sure to post here before committing.
          Hide
          martinkl Martin Kleppmann added a comment -

          Got a "ship it" on the latest iteration of the RB. Thanks for the review Chris! Attaching the latest patch, rebased onto master. I'm committing this now.

          Show
          martinkl Martin Kleppmann added a comment - Got a "ship it" on the latest iteration of the RB. Thanks for the review Chris! Attaching the latest patch, rebased onto master. I'm committing this now.
          Hide
          criccomini Chris Riccomini added a comment -

          Could you also commit to the 0.7.0 branch?

          Show
          criccomini Chris Riccomini added a comment - Could you also commit to the 0.7.0 branch?
          Hide
          martinkl Martin Kleppmann added a comment -

          Committed to both the master and the 0.7.0 branches. Resolving.

          Show
          martinkl Martin Kleppmann added a comment - Committed to both the master and the 0.7.0 branches. Resolving.

            People

            • Assignee:
              martinkl Martin Kleppmann
              Reporter:
              martinkl Martin Kleppmann
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development