Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 0.9.0
    • Fix Version/s: 0.9.0
    • Component/s: container
    • Labels:
      None

      Description

      A couple months back – on the mailing list – I mentioned a couple offset management issues I'd been having. (I'm happy to elaborate on this, but in short: I associate some extra state / ordering information with the input offsets, and there's a nontrivial performance cost keeping Samza's checkpoints and my task's state in sync.)

      It occurs to me now that there's a simple workaround for this: disable Samza's checkpointing entirely, and let `StreamTask.init` choose the starting offsets. The task can just keep its checkpoints in an ordinary StorageEngine – and by managing all the state from a single place, it's easy to keep everything in sync.

      The basic implementation actually seems fairly straightforward – the consumers are not started until after the tasks are initialized, so all we'd need to do is allow the `init` method to override the starting offsets. I've attached a small patch that exposes this through the TaskContext interface, just to illustrate the idea – if this seems like an interesting feature for Samza, I'm happy to add more tests / documentation / etc.

        Issue Links

          Activity

          Hide
          criccomini Chris Riccomini added a comment -

          +1 Merged and committed. Thanks!

          Show
          criccomini Chris Riccomini added a comment - +1 Merged and committed. Thanks!
          Hide
          bkirwi Ben Kirwin added a comment - - edited

          Great! I uploaded a new patch with some testing and documentation.

          In the long run, I'm relatively agnostic about where this functionality should go. The advantage of TaskContext is that it's init-specific, and this feature currently only makes sense at task start – but if either of those things change, other options might become a lot more appealing.

          Show
          bkirwi Ben Kirwin added a comment - - edited Great! I uploaded a new patch with some testing and documentation. In the long run, I'm relatively agnostic about where this functionality should go. The advantage of TaskContext is that it's init-specific, and this feature currently only makes sense at task start – but if either of those things change, other options might become a lot more appealing.
          Hide
          criccomini Chris Riccomini added a comment -

          Any thoughts on the API for this?

          Wonder if this should be in TaskContext, SamzaContainerContext, or TaskCoordinator. If we eventually want to allow a task to change offsets from anywhere (not just during init()), it seems like TaskCooridnator is the better place for it.

          Another approach would be to expose OffsetManager through SamzaContainerContext (and expose SamzaContainerContext through TaskContext).

          In the short term, it seems fairly easy to support this use-case as your patch has it. As long as we make the Javadocs clear that the API is not stable, and might change, I think it's fine to move forward with what we've got. A few docs/tests, and I think we should be good.

          Show
          criccomini Chris Riccomini added a comment - Any thoughts on the API for this? Wonder if this should be in TaskContext, SamzaContainerContext, or TaskCoordinator. If we eventually want to allow a task to change offsets from anywhere (not just during init()), it seems like TaskCooridnator is the better place for it. Another approach would be to expose OffsetManager through SamzaContainerContext (and expose SamzaContainerContext through TaskContext). In the short term, it seems fairly easy to support this use-case as your patch has it. As long as we make the Javadocs clear that the API is not stable, and might change, I think it's fine to move forward with what we've got. A few docs/tests, and I think we should be good.
          Hide
          bkirwi Ben Kirwin added a comment -

          So I finally had some time to prototype against this API – it turns out that this works just fine, and I've been able to eliminate a bunch of extra coordination overhead.

          I'd like to try and push this forward. Any thoughts on the API for this? Should I start on some docs / tests?

          Show
          bkirwi Ben Kirwin added a comment - So I finally had some time to prototype against this API – it turns out that this works just fine, and I've been able to eliminate a bunch of extra coordination overhead. I'd like to try and push this forward. Any thoughts on the API for this? Should I start on some docs / tests?
          Hide
          criccomini Chris Riccomini added a comment -

          I agree. I also think that startProducers, startTask, startConsumers (and stopping in reverse order) correctly models the flow of data. Data comes in from consumers, through the task, and into the producer. Starting them in reverse order feels intuitively correct.

          Show
          criccomini Chris Riccomini added a comment - I agree. I also think that startProducers, startTask, startConsumers (and stopping in reverse order) correctly models the flow of data. Data comes in from consumers, through the task, and into the producer. Starting them in reverse order feels intuitively correct.
          Hide
          bkirwi Ben Kirwin added a comment -

          Yeah, that is an unfortunate interaction.

          I like your first suggestion – it's a fairly minimal tweak that keeps the existing behaviour.

          The second suggestion is interesting; it starts to look a lot like SAMZA-255's 'rewindable streams'. That would be a very interesting feature, though Martin brings up some important caveats in that ticket.

          Show
          bkirwi Ben Kirwin added a comment - Yeah, that is an unfortunate interaction. I like your first suggestion – it's a fairly minimal tweak that keeps the existing behaviour. The second suggestion is interesting; it starts to look a lot like SAMZA-255 's 'rewindable streams'. That would be a very interesting feature, though Martin brings up some important caveats in that ticket.
          Hide
          criccomini Chris Riccomini added a comment -

          This ticket relates to SAMZA-567. If we re-order the init lifecycle, as defined in SAMZA-567, this patch gets a bit more complicated. Currently, this patch simply sets the OffsetManager's offset. This works because the consumers are started after the init() method is called. If we start the consumers before the init() method is called, this won't work anymore.

          We could make this continue to work by doing:

          startProducers
          startTask
          startConsumers
          

          Alternatively, we could do what's proposed in SAMZA-567:

          startProducers
          startConsumers
          startTask
          

          And change the TaskContext method to be setOffset(SSP, offset). When this is called, we'd have to completely tear down SystemConsumers, and restart it again with the new offsets set. Because of this, it might make a bit more sense to have the API be setOffsets(Map<SSP, String>); so that multiple offsets can be set at once.

          Show
          criccomini Chris Riccomini added a comment - This ticket relates to SAMZA-567 . If we re-order the init lifecycle, as defined in SAMZA-567 , this patch gets a bit more complicated. Currently, this patch simply sets the OffsetManager's offset. This works because the consumers are started after the init() method is called. If we start the consumers before the init() method is called, this won't work anymore. We could make this continue to work by doing: startProducers startTask startConsumers Alternatively, we could do what's proposed in SAMZA-567 : startProducers startConsumers startTask And change the TaskContext method to be setOffset(SSP, offset). When this is called, we'd have to completely tear down SystemConsumers, and restart it again with the new offsets set. Because of this, it might make a bit more sense to have the API be setOffsets(Map<SSP, String>); so that multiple offsets can be set at once.

            People

            • Assignee:
              bkirwi Ben Kirwin
              Reporter:
              bkirwi Ben Kirwin
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development