Samza
  1. Samza
  2. SAMZA-2

Fine-grain control over stream consumption

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.6.0
    • Fix Version/s: 0.7.0
    • Component/s: container
    • Labels:

      Description

      Currently, samza exposes configuration in the form of "streams.%s.consumer.max.bytes.per.sec" for throttling the # of bytes the Task will read from a stream. This is a feature request for programmatic fine-grain control over stream consumption. The use-case is a samza task that will be consuming multiple streams where some streams may be from live systems that have stricter SLA requirements and must always be prioritized over other streams that may be from batch systems. The above configuration is not the ideal way to express this type of stream prioritization because configuring the "batch" streams with a low consumption rate will decrease the overall throughput of the system when there is no data in the "live" streams. Furthermore, we'll want to throttle each "batch" stream based on external signals that can change over time. Because of the dynamic nature of these external signals, we would like to have a programmatic interface that can dynamically change the prioritization as the signal changes.

      Review board:

      https://reviews.apache.org/r/13725/

      1. DESIGN-SAMZA-2-0.md
        22 kB
        Chris Riccomini
      2. DESIGN-SAMZA-2-0.pdf
        177 kB
        Chris Riccomini
      3. SAMZA-2.5.patch
        120 kB
        Chris Riccomini
      4. SAMZA-2.4.patch
        97 kB
        Chris Riccomini
      5. SAMZA-2.3.patch
        92 kB
        Chris Riccomini
      6. SAMZA-2.2.patch
        99 kB
        Chris Riccomini
      7. SAMZA-2.1.patch
        28 kB
        Chris Riccomini
      8. SAMZA-2.0.patch
        23 kB
        Chris Riccomini

        Issue Links

          Activity

          Hide
          Chris Riccomini added a comment -

          Migrating wiki-based SEP to .md/.pdf-based design doc as part of SAMZA-404.

          Show
          Chris Riccomini added a comment - Migrating wiki-based SEP to .md/.pdf-based design doc as part of SAMZA-404 .
          Hide
          Chris Riccomini added a comment -

          Got +1 from Sriram Subramanian on RB.

          Merged and committed.

          Pushed docs to site.

          Show
          Chris Riccomini added a comment - Got +1 from Sriram Subramanian on RB. Merged and committed. Pushed docs to site.
          Hide
          Chris Riccomini added a comment -

          Attaching updated patch.

          1. Fix comments, per Sriram Subramanian's RB.
          2. Use TopicMetadataCache, per Sriram Subramanian's RB.
          3. Rename WrappedChooser to DefaultChooser, per Sriram Subramanian's RB.
          4. Add batching, priority, and bootstrap docs to streams.md.
          5. Add metrics and logging to everything.

          I believe this patch is feature/test complete. Please +1 or let me know if there's more cleanup to be done.

          Show
          Chris Riccomini added a comment - Attaching updated patch. 1. Fix comments, per Sriram Subramanian 's RB. 2. Use TopicMetadataCache, per Sriram Subramanian 's RB. 3. Rename WrappedChooser to DefaultChooser, per Sriram Subramanian 's RB. 4. Add batching, priority, and bootstrap docs to streams.md. 5. Add metrics and logging to everything. I believe this patch is feature/test complete. Please +1 or let me know if there's more cleanup to be done.
          Hide
          Chris Riccomini added a comment -

          Attaching an updated patch. First pass at adding KafkaSystemAdmin.getLastOffsets.

          Show
          Chris Riccomini added a comment - Attaching an updated patch. First pass at adding KafkaSystemAdmin.getLastOffsets.
          Hide
          Chris Riccomini added a comment -

          Attaching a new patch.

          Per latest comments on RB:

          1. Removed the task.chooser.wrapper.class config.
          2. Renamed DefaultChooser to WrappedChooser.
          3. Force SamzaContainer to always use WrappedChooser.
          4. WrappedChooser now uses SystemAdmin map wired in from SamzaContainer, rather than instantiating its own SystemAdmins.
          5. Removed PriorityChooser.
          6. Changed class composition from bootstrap/priority/batch to bootstrap/batch/priority.

          Regarding (3), Sriram Subramanian gives good arguments on the RB on why to do this.

          Regarding (5), PriorityChooser was initially created as a helper class to make it easy for people to implement their own choosers. It was expected that folks would be implementing their own choosers frequently, since we provided little functionality out of the box. Given that we're now providing so much rich functionality with WrappedChooser, there's not as much need for us to provide this class. I'm removing it.

          Regarding (6), there's some discussion on the RB about this.

          Show
          Chris Riccomini added a comment - Attaching a new patch. Per latest comments on RB: 1. Removed the task.chooser.wrapper.class config. 2. Renamed DefaultChooser to WrappedChooser. 3. Force SamzaContainer to always use WrappedChooser. 4. WrappedChooser now uses SystemAdmin map wired in from SamzaContainer, rather than instantiating its own SystemAdmins. 5. Removed PriorityChooser. 6. Changed class composition from bootstrap/priority/batch to bootstrap/batch/priority. Regarding (3), Sriram Subramanian gives good arguments on the RB on why to do this. Regarding (5), PriorityChooser was initially created as a helper class to make it easy for people to implement their own choosers. It was expected that folks would be implementing their own choosers frequently, since we provided little functionality out of the box. Given that we're now providing so much rich functionality with WrappedChooser, there's not as much need for us to provide this class. I'm removing it. Regarding (6), there's some discussion on the RB about this.
          Hide
          Chris Riccomini added a comment -

          I want to elaborate a bit on start/stop/register (1). The problem that I'm trying to address is that BootstrappingChooser gets a mapping from SystemStreamPartition to "offset of last message in the SSP" when it starts up. If a StreamTask is already caught up for a given SSP, then it will receive no messages for this SSP (until more are written). In cases where no more messages are written to a caught-up bootstrap stream, the BootstrappingChooser will continue waiting for a message from the bootstrap stream's partitions, and block all choose calls until it receives one. This could lead to potentially hours or days with no messages processed. For example, if you have SSP1 (for a bootstrap stream) with a last offset of 123, and the checkpointed offset for SSP1 is also 123, when BootstrappingChooser starts up, it will wait for messages from SSP1, and block MessageChooser.choose until it gets an update from SSP1. If no update ever comes, then the StreamTask is blocked, even though SSP1 is actually caught up.

          Adding a register method lets us check if the last checkpointed offset == the offset of the last message in the SSP. If it does, we treat the stream as "caught up", and remove it from the bootstrap list.

          Two alternative implementations that come to mind:

          1. Have DefaultChooserFactory instantiate a CheckpointManager, and use the CheckpointManager to get the offset for each SSP. This is what we're doing with the SystemAdmins in DefaultChooserFactory.buildLatestOffsets.
          2. Do the bootstrapping in the SystemConsumers class, which already has a register(ssp, offset) method.

          I was against the first approach (1) because I didn't want to query the checkpoint manager repeatedly for the same information (TaskInstance makes the same call), but given that it's only checkpoints, and not metadata getting fetched from Kafka, it's pretty low cost from a performance perspective, and only happens once at startup time. I'm now leaning towards this approach over start/stop/register.

          (2) is what's in the design proposal, but it's more invasive. Sriram wanted an alternative solution (4.2 in his comments), and I agree.

          Show
          Chris Riccomini added a comment - I want to elaborate a bit on start/stop/register (1). The problem that I'm trying to address is that BootstrappingChooser gets a mapping from SystemStreamPartition to "offset of last message in the SSP" when it starts up. If a StreamTask is already caught up for a given SSP, then it will receive no messages for this SSP (until more are written). In cases where no more messages are written to a caught-up bootstrap stream, the BootstrappingChooser will continue waiting for a message from the bootstrap stream's partitions, and block all choose calls until it receives one. This could lead to potentially hours or days with no messages processed. For example, if you have SSP1 (for a bootstrap stream) with a last offset of 123, and the checkpointed offset for SSP1 is also 123, when BootstrappingChooser starts up, it will wait for messages from SSP1, and block MessageChooser.choose until it gets an update from SSP1. If no update ever comes, then the StreamTask is blocked, even though SSP1 is actually caught up. Adding a register method lets us check if the last checkpointed offset == the offset of the last message in the SSP. If it does, we treat the stream as "caught up", and remove it from the bootstrap list. Two alternative implementations that come to mind: 1. Have DefaultChooserFactory instantiate a CheckpointManager, and use the CheckpointManager to get the offset for each SSP. This is what we're doing with the SystemAdmins in DefaultChooserFactory.buildLatestOffsets. 2. Do the bootstrapping in the SystemConsumers class, which already has a register(ssp, offset) method. I was against the first approach (1) because I didn't want to query the checkpoint manager repeatedly for the same information (TaskInstance makes the same call), but given that it's only checkpoints, and not metadata getting fetched from Kafka, it's pretty low cost from a performance perspective, and only happens once at startup time. I'm now leaning towards this approach over start/stop/register. (2) is what's in the design proposal, but it's more invasive. Sriram wanted an alternative solution (4.2 in his comments), and I agree.
          Hide
          Chris Riccomini added a comment -

          Updated patch with feedback from JIRA and design proposal.

          Items to watch out for:

          1. I added MessageChooser.start, MessageChooser.stop, and MessageChooser.register back in. The BootstrappingChooser needs the last offset for each bootstrap stream in order to mark streams as "caught up" in cases where the last checkpointed offset == the offset of the last message in the stream. This data is not available at wire-up time – it's only available after the TaskInstance restores its offsets, and registers with SystemConsumers. I'm all ears if you guys have a better/cleaner idea.
          2. The behavior of the DefaultChooser is such that a bootstrap stream will not re-process the entire stream every time the StreamTask starts up. It will only process from the last checkpointed offset forward. I think this is actually desirable, since the use case for a bootstrap stream is usually to bootstrap the stream into a key/value store. The key/value store gets restored, itself, on startup, so there's no point in re-processing the whole bootstrap stream; you can simply pick up where you left off.
          3. DefaultChooserFactory is the default factory. Due to tie-breaking of same-priority envelopes, and for batching without priority streams, the DefaultChooser actually takes a MessageChooser, itself. This is kind of weird, because we end up with two configs: task.chooser.class, and task.chooser.wrapped.class. The first one is the actual pluggable config that the SamzaContainer uses via SystemConsumers. The second one is what DefaultChooser uses to break priority ties, or in cases where its just going to batch.
          4. It's unclear to me if there's a better 'idiomatic Scala' approach to the class composition of the choosers. Mixins, traits, etc. Open to advice.
          5. Batching is disabled by default.
          6. This code is quite complicated. I tried to make it as simple as possible, but we're just trying to do a lot. There's probably an argument to be made in having three simple choosers, instead of one complex one that does bootstrapping, batching, and prioritizing.

          Remaining TODOs:

          1. Add logging.
          2. Write KafkaSystemAdmin.getLastOffsets
          3. Update documentation.
          4. Add a bootstrapping task to the integration test framework.

          All tests pass.

          Show
          Chris Riccomini added a comment - Updated patch with feedback from JIRA and design proposal. Items to watch out for: 1. I added MessageChooser.start, MessageChooser.stop, and MessageChooser.register back in. The BootstrappingChooser needs the last offset for each bootstrap stream in order to mark streams as "caught up" in cases where the last checkpointed offset == the offset of the last message in the stream. This data is not available at wire-up time – it's only available after the TaskInstance restores its offsets, and registers with SystemConsumers. I'm all ears if you guys have a better/cleaner idea. 2. The behavior of the DefaultChooser is such that a bootstrap stream will not re-process the entire stream every time the StreamTask starts up. It will only process from the last checkpointed offset forward. I think this is actually desirable, since the use case for a bootstrap stream is usually to bootstrap the stream into a key/value store. The key/value store gets restored, itself, on startup, so there's no point in re-processing the whole bootstrap stream; you can simply pick up where you left off. 3. DefaultChooserFactory is the default factory. Due to tie-breaking of same-priority envelopes, and for batching without priority streams, the DefaultChooser actually takes a MessageChooser, itself. This is kind of weird, because we end up with two configs: task.chooser.class, and task.chooser.wrapped.class. The first one is the actual pluggable config that the SamzaContainer uses via SystemConsumers. The second one is what DefaultChooser uses to break priority ties, or in cases where its just going to batch. 4. It's unclear to me if there's a better 'idiomatic Scala' approach to the class composition of the choosers. Mixins, traits, etc. Open to advice. 5. Batching is disabled by default. 6. This code is quite complicated. I tried to make it as simple as possible, but we're just trying to do a lot. There's probably an argument to be made in having three simple choosers, instead of one complex one that does bootstrapping, batching, and prioritizing. Remaining TODOs: 1. Add logging. 2. Write KafkaSystemAdmin.getLastOffsets 3. Update documentation. 4. Add a bootstrapping task to the integration test framework. All tests pass.
          Hide
          Chris Riccomini added a comment -

          Sriram Subramanian

          1. Agreed. Will remove (per-RB comments).

          2. I have added this segment to the wiki:

          Since the MessageChooser only receives one message at a time per SSP, it can be used to order messages between different SSPs, but it can't be used to re-order messages within a single SSP (a buffered sort). This must be done within a StreamTask.
          

          I have also updated the Javadocs for MessageChooser to show this. (not yet pushed to a new patch, though)

          3. We could do random. I think I'll probably do whatever is easiest to implement, whether that's random, round robin, or whatever. Just as long as it's not a strategy that can lead to starvation.

          4.1 Agreed. I do, however, want to make the config as simple as possible. The default when no configs are set would be all that all streams are set to an equal priority of 0, and no stream is bootstrapped.

          To just bootstrap a stream:

          task.chooser.bootstrap.kafka.currencyCodes=true
          

          This would set currencyCodes' priority to Int.MAX and make the SystemConsumers class force messages for the currencyCodes stream.

          To override a priority you could then do:

          task.chooser.bootstrap.kafka.currencyCodes=true
          task.chooser.priorities.kafka.currencyCode=2
          

          To bootstrap and prioritize simultaneously:

          task.chooser.bootstrap.kafka.currencyCodes=true
          task.chooser.priorities.kafka.currencyCode=2
          task.chooser.priorities.kafka.liveStream=1
          task.chooser.priorities.kafka.batchComputedStream=0
          

          This would bootstrap currencyCodes, and then favor messages from liveStream over messages fro batchComputedStream.

          Bootstrapping multiple streams in a specific order could then be done using:

          task.chooser.bootstrap.kafka.currencyCodes=true
          task.chooser.bootstrap.kafka.currencyConversions=true
          task.chooser.priorities.kafka.currencyCodes=2
          task.chooser.priorities.kafka.currencyConversions=1
          task.chooser.priorities.kafka.transactions=0
          

          This would bootstrap currencyCodes first, then currencyConversions, and then read messages from all streams as usual.

          4.2 I think there are a lot of ways to implement the bootstrapping feature. Here are a few:

          1. Give the StreamTask some way to tell the container which stream it wants next.
          2. Make the container register only the bootstrap streams with the SystemContainers class first, then register the rest of the streams when the bootstrap is complete.
          3. The wiki proposal.
          4. StreamTask buffers all non-boostrap-stream messages that it can't process yet because the bootstrap streams haven't been fully bootstrapped.
          5. Declare that all bootstrap streams must be fed directly into a StorageEngine.

          All of these except (4) require code changes in the SamzaContainer/SystemConsumers code, and (4) is a non-workable because it could force a StreamTask to buffer an unbounded amount of data. Even (4) would require a slight code change so that the StreamTask can know when a stream has been fully bootstrapped (envelope.getOffset == lastOffsetInStreamPartition).

          (5) is kind of interesting because the StorageEngine.restore phase is handled at a different point in the lifecycle, before the StreamTask handles any messages. One knock on this approach is that it forces a bootstrap stream to be materialized to a StorageEngine without any transformations. For example, if you had a profile stream, where messages were ~5kb each (due to plain text in profile description), but all you wanted was the user's name, the StorageEngine would still be restored with the full profile messages in the store, which is wasteful. It also wouldn't allow for any aggregation or filtering of a bootstrap stream's messages.

          5. Totally agree. I think we'll just punt on the whole time-alignment thing now. Developers can either implement a MessageChooser that does approximate time alignment, or a StreamTask, which does a time-aligned buffered sort (or they can do both). If we decide to add stronger time support in the future, I can't see any backwards incompatible changes that would need to be made, so just making this a feature that developers have to implement seems safest and most general for now.

          Show
          Chris Riccomini added a comment - Sriram Subramanian 1. Agreed. Will remove (per-RB comments). 2. I have added this segment to the wiki: Since the MessageChooser only receives one message at a time per SSP, it can be used to order messages between different SSPs, but it can't be used to re-order messages within a single SSP (a buffered sort). This must be done within a StreamTask. I have also updated the Javadocs for MessageChooser to show this. (not yet pushed to a new patch, though) 3. We could do random. I think I'll probably do whatever is easiest to implement, whether that's random, round robin, or whatever. Just as long as it's not a strategy that can lead to starvation. 4.1 Agreed. I do, however, want to make the config as simple as possible. The default when no configs are set would be all that all streams are set to an equal priority of 0, and no stream is bootstrapped. To just bootstrap a stream: task.chooser.bootstrap.kafka.currencyCodes=true This would set currencyCodes' priority to Int.MAX and make the SystemConsumers class force messages for the currencyCodes stream. To override a priority you could then do: task.chooser.bootstrap.kafka.currencyCodes=true task.chooser.priorities.kafka.currencyCode=2 To bootstrap and prioritize simultaneously: task.chooser.bootstrap.kafka.currencyCodes=true task.chooser.priorities.kafka.currencyCode=2 task.chooser.priorities.kafka.liveStream=1 task.chooser.priorities.kafka.batchComputedStream=0 This would bootstrap currencyCodes, and then favor messages from liveStream over messages fro batchComputedStream. Bootstrapping multiple streams in a specific order could then be done using: task.chooser.bootstrap.kafka.currencyCodes=true task.chooser.bootstrap.kafka.currencyConversions=true task.chooser.priorities.kafka.currencyCodes=2 task.chooser.priorities.kafka.currencyConversions=1 task.chooser.priorities.kafka.transactions=0 This would bootstrap currencyCodes first, then currencyConversions, and then read messages from all streams as usual. 4.2 I think there are a lot of ways to implement the bootstrapping feature. Here are a few: 1. Give the StreamTask some way to tell the container which stream it wants next. 2. Make the container register only the bootstrap streams with the SystemContainers class first, then register the rest of the streams when the bootstrap is complete. 3. The wiki proposal. 4. StreamTask buffers all non-boostrap-stream messages that it can't process yet because the bootstrap streams haven't been fully bootstrapped. 5. Declare that all bootstrap streams must be fed directly into a StorageEngine. All of these except (4) require code changes in the SamzaContainer/SystemConsumers code, and (4) is a non-workable because it could force a StreamTask to buffer an unbounded amount of data. Even (4) would require a slight code change so that the StreamTask can know when a stream has been fully bootstrapped (envelope.getOffset == lastOffsetInStreamPartition). (5) is kind of interesting because the StorageEngine.restore phase is handled at a different point in the lifecycle, before the StreamTask handles any messages. One knock on this approach is that it forces a bootstrap stream to be materialized to a StorageEngine without any transformations. For example, if you had a profile stream, where messages were ~5kb each (due to plain text in profile description), but all you wanted was the user's name, the StorageEngine would still be restored with the full profile messages in the store, which is wasteful. It also wouldn't allow for any aggregation or filtering of a bootstrap stream's messages. 5. Totally agree. I think we'll just punt on the whole time-alignment thing now. Developers can either implement a MessageChooser that does approximate time alignment, or a StreamTask, which does a time-aligned buffered sort (or they can do both). If we decide to add stronger time support in the future, I can't see any backwards incompatible changes that would need to be made, so just making this a feature that developers have to implement seems safest and most general for now.
          Hide
          Sriram Subramanian added a comment -

          Summarizing the discussions in the RB / JIRA / Wiki, these are my thoughts -

          1. Message Chooser interface - I can see how start/stop and register may be useful but I still have not been able to think of a use case for those. It might be useful to think of a concrete example where this would be useful or else we can drop them for now. We do not use them for any of our pickers.

          2. We have agreed on making the chooser pluggable which is reflected in the RB as well. What is important though is to define the role of the chooser. The chooser can potentially do multiple things and we need to be very clear in defining how and why we restrict its behavior. For example, if a chooser can get multiple events for the same stream partition it could do some kind of ordering within a stream partition. If this is not allowed, the chooser can only decide on scheduling between stream partitions and leave the rest to the task. I would be explaining this in more detail for the time alignment case below but we need to make these assumptions/behaviors explicit for the implementors of the message chooser. Similarly, we should define the contract between the container and the chooser (Assumption that the update method is invoked in a round robin fashion across the stream partitions).

          3. W.r.t prioritizing between streams with the same priority, would picking at random work? Are we not using a heap to do the prioritization. Should it not already pick a stream at random if the streams have equal priority?

          4. Two concerns I have here is
          1. Performance - To ensure that all stream partitions have received an event we would need to constantly poke them. For this purpose, we would need to wait for at least one event from each of the stream partitions before choose is called every time. The downside of doing this is that not all topics are going to be bootstrapped. There could be cases where systems hosting a specific stream partition is acting slow or down and is waiting to do the leader transition. In these cases it would just add to the bootstrap time. This may or may not be a big deal but an alternative would be to do the above logic only for the topic that needs to be bootstrapped. We could take the topic to be bootstrapped as a config.
          2. Bootstrap specific code - I am still not sure about this but it might be worth thinking if we can do this bootstrap without introducing bootstrap logic into the container/consumer code. I have not been able to think of a solution that does not add lot of complexity to the code.

          5. Time alignment
          This is interesting and I think we should for now achieve this by using a time alignment operator that runs as a task. If two streams need to be time aligned, the framework user can always insert the alignment operator between processing steps. There is no way we could do a good job of aligning streams based on time without a timestamp. None of the approaches offset/messages behind/percentage behind will help in anyway. Take the simple case of two streams A and B where stream A gets 1 event for every 1000 events in stream B. None of the heuristics would work here in aligning them by time. Also the streams are infinite in length and there is no way to predict how much you need to buffer before the streams get aligned. This requires quite a bit of memory. For this purpose, I suggest we make this a task responsibility and add timestamps to the message. The time align task can then use the timestamps in the stream to produce timestamp ordered / time aligned outputs. It could use the state management to do the buffering. Again any kind of time alignment is best effort (windowing based) but the task could do a much better job than the chooser. Also remember the chooser does not allow to change ordering within a stream so it would not be possible.

          Show
          Sriram Subramanian added a comment - Summarizing the discussions in the RB / JIRA / Wiki, these are my thoughts - 1. Message Chooser interface - I can see how start/stop and register may be useful but I still have not been able to think of a use case for those. It might be useful to think of a concrete example where this would be useful or else we can drop them for now. We do not use them for any of our pickers. 2. We have agreed on making the chooser pluggable which is reflected in the RB as well. What is important though is to define the role of the chooser. The chooser can potentially do multiple things and we need to be very clear in defining how and why we restrict its behavior. For example, if a chooser can get multiple events for the same stream partition it could do some kind of ordering within a stream partition. If this is not allowed, the chooser can only decide on scheduling between stream partitions and leave the rest to the task. I would be explaining this in more detail for the time alignment case below but we need to make these assumptions/behaviors explicit for the implementors of the message chooser. Similarly, we should define the contract between the container and the chooser (Assumption that the update method is invoked in a round robin fashion across the stream partitions). 3. W.r.t prioritizing between streams with the same priority, would picking at random work? Are we not using a heap to do the prioritization. Should it not already pick a stream at random if the streams have equal priority? 4. Two concerns I have here is 1. Performance - To ensure that all stream partitions have received an event we would need to constantly poke them. For this purpose, we would need to wait for at least one event from each of the stream partitions before choose is called every time. The downside of doing this is that not all topics are going to be bootstrapped. There could be cases where systems hosting a specific stream partition is acting slow or down and is waiting to do the leader transition. In these cases it would just add to the bootstrap time. This may or may not be a big deal but an alternative would be to do the above logic only for the topic that needs to be bootstrapped. We could take the topic to be bootstrapped as a config. 2. Bootstrap specific code - I am still not sure about this but it might be worth thinking if we can do this bootstrap without introducing bootstrap logic into the container/consumer code. I have not been able to think of a solution that does not add lot of complexity to the code. 5. Time alignment This is interesting and I think we should for now achieve this by using a time alignment operator that runs as a task. If two streams need to be time aligned, the framework user can always insert the alignment operator between processing steps. There is no way we could do a good job of aligning streams based on time without a timestamp. None of the approaches offset/messages behind/percentage behind will help in anyway. Take the simple case of two streams A and B where stream A gets 1 event for every 1000 events in stream B. None of the heuristics would work here in aligning them by time. Also the streams are infinite in length and there is no way to predict how much you need to buffer before the streams get aligned. This requires quite a bit of memory. For this purpose, I suggest we make this a task responsibility and add timestamps to the message. The time align task can then use the timestamps in the stream to produce timestamp ordered / time aligned outputs. It could use the state management to do the buffering. Again any kind of time alignment is best effort (windowing based) but the task could do a much better job than the chooser. Also remember the chooser does not allow to change ordering within a stream so it would not be possible.
          Hide
          Chris Riccomini added a comment -

          I've thrown up a proposal on how to implement this:

          https://wiki.apache.org/samza/Pluggable%20MessageChooser

          It combines some of the ideas from my initial patches with some feedback from the RB: namely that the default chooser should ideally handle 1) prioritizing streams and 2) aligning streams of equal priority in some way.

          Show
          Chris Riccomini added a comment - I've thrown up a proposal on how to implement this: https://wiki.apache.org/samza/Pluggable%20MessageChooser It combines some of the ideas from my initial patches with some feedback from the RB: namely that the default chooser should ideally handle 1) prioritizing streams and 2) aligning streams of equal priority in some way.
          Hide
          Chris Riccomini added a comment -

          Updated RB again. Added start, stop, and register to MessageChooser. This will let choosers setup and teardown anything that they need. It also gives choosers a chance to initialize state (e.g. maps/queues/etc) based on the SystemStreamPartitions that it's going to be getting envelopes for.

          There's some discussion on the RB about whether or not to make the choosers support multiple updates from the same SystemStreamPartition even though it would be broken behavior from the SamzaContainer. I'm opting to ignore this for now, since multiple updates to a chooser is bad behavior on the part of the container. I can't really think of another place where we'd use a chooser, so I'm OK with tying its behavior to the container's.

          Show
          Chris Riccomini added a comment - Updated RB again. Added start, stop, and register to MessageChooser. This will let choosers setup and teardown anything that they need. It also gives choosers a chance to initialize state (e.g. maps/queues/etc) based on the SystemStreamPartitions that it's going to be getting envelopes for. There's some discussion on the RB about whether or not to make the choosers support multiple updates from the same SystemStreamPartition even though it would be broken behavior from the SamzaContainer. I'm opting to ignore this for now, since multiple updates to a chooser is bad behavior on the part of the container. I can't really think of another place where we'd use a chooser, so I'm OK with tying its behavior to the container's.
          Hide
          Chris Riccomini added a comment -

          Updated with Jay's feedback from RB:

          1. Added docs for MessageChooser.
          2. Switched RoundRobinChooser back to a queue-based implementation.

          Also, a caveat: the current implementation won't be able to support the "bootstrap from offset 0 all the way to the head of the stream" use case because IncomingMessageEnvelope only has the offset of the message, and not how far the message is from the head of the stream.

          Show
          Chris Riccomini added a comment - Updated with Jay's feedback from RB: 1. Added docs for MessageChooser. 2. Switched RoundRobinChooser back to a queue-based implementation. Also, a caveat: the current implementation won't be able to support the "bootstrap from offset 0 all the way to the head of the stream" use case because IncomingMessageEnvelope only has the offset of the message, and not how far the message is from the head of the stream.
          Hide
          Chris Riccomini added a comment -

          Attaching patch.

          1. Added PriorityChooser, which is an abstract class that uses a PriorityQueue to sort envelopes. Must implement the prioritize method when extending this class.
          2. Added MessageChooserFactory.
          3. Added "task.chooser.class" config in TaskConfig, which defines a MessageChooserFactory.
          4. Renamed DefaultChooser to RoundRobinChooser.
          5. Switched RoundRobingChooser to extend PriorityChooser.
          6. Added StreamChooser, which prioritizes envelope according to their system/stream.
          7. Added tests for StreamChooser, RoundRobinChooser, and PriorityChooser.

          All tests are passing.

          Items for discussion:

          1. I put PriorityQueue in samza-api, so developers could implement priority-based choosers in Java. I put RoundRobinChooser/StreamChooser in samza-core.
          2. I added a MessageChooserFactory. You could just have MessageChooser.init(config), but this isn't the pattern we've been following for anything else (except CheckpointManager).
          3. StreamChooser picks de-prioritizes any envelope to -1 if it's not in the priority list. You could argue an exception should be thrown. You could also argue falling back to round robin for de-prioritized envelope, rather than -1, since ordering of equal priority items is undefined in PriorityQueue.
          4. I defined higher priority as a larger number. This seems more intuitive to me, but it's the reverse of PriorityQueue's behavior: "The head of this queue is the least element with respect to the specified ordering."

          Review Board: https://reviews.apache.org/r/13725/

          Show
          Chris Riccomini added a comment - Attaching patch. 1. Added PriorityChooser, which is an abstract class that uses a PriorityQueue to sort envelopes. Must implement the prioritize method when extending this class. 2. Added MessageChooserFactory. 3. Added "task.chooser.class" config in TaskConfig, which defines a MessageChooserFactory. 4. Renamed DefaultChooser to RoundRobinChooser. 5. Switched RoundRobingChooser to extend PriorityChooser. 6. Added StreamChooser, which prioritizes envelope according to their system/stream. 7. Added tests for StreamChooser, RoundRobinChooser, and PriorityChooser. All tests are passing. Items for discussion: 1. I put PriorityQueue in samza-api, so developers could implement priority-based choosers in Java. I put RoundRobinChooser/StreamChooser in samza-core. 2. I added a MessageChooserFactory. You could just have MessageChooser.init(config), but this isn't the pattern we've been following for anything else (except CheckpointManager). 3. StreamChooser picks de-prioritizes any envelope to -1 if it's not in the priority list. You could argue an exception should be thrown. You could also argue falling back to round robin for de-prioritized envelope, rather than -1, since ordering of equal priority items is undefined in PriorityQueue. 4. I defined higher priority as a larger number. This seems more intuitive to me, but it's the reverse of PriorityQueue's behavior: "The head of this queue is the least element with respect to the specified ordering." Review Board: https://reviews.apache.org/r/13725/
          Hide
          Chris Riccomini added a comment -

          Planning to implement:

          1. Make MessageChooser pluggable via configuration.
          2. A PriorityChooser with heap that defaults to some silly priority like envelope offset string comparison.
          3. A StreamChooser which will extend priority chooser, and override the prioritize method to use a hard coded priority for each input stream.

          Show
          Chris Riccomini added a comment - Planning to implement: 1. Make MessageChooser pluggable via configuration. 2. A PriorityChooser with heap that defaults to some silly priority like envelope offset string comparison. 3. A StreamChooser which will extend priority chooser, and override the prioritize method to use a hard coded priority for each input stream.
          Hide
          Jay Kreps added a comment -

          Yeah I don't have this figured out. I was just throwing out a strawman proposal.

          The pros I see for the current update/pick Picker api is that it is more generic. The use case I think it can handle that the priority api can't handle would be one where effectively the priority changes from call to call so the heap assumption is not valid. I think it is helpful, though, to list all the specific implemenations we can think of to understand this.

          The pro I see for the priority api is that it is a simple to think about--i.e. timestamp or lag would directly translate to a priority. The importance of this depends on the frequency of custom picks. If essentially every user needs to implement one in order to yank out the timestamp field they are using we want it to be simple.

          You could, of course, support both layers, though that is usually the worst option.

          Show
          Jay Kreps added a comment - Yeah I don't have this figured out. I was just throwing out a strawman proposal. The pros I see for the current update/pick Picker api is that it is more generic. The use case I think it can handle that the priority api can't handle would be one where effectively the priority changes from call to call so the heap assumption is not valid. I think it is helpful, though, to list all the specific implemenations we can think of to understand this. The pro I see for the priority api is that it is a simple to think about--i.e. timestamp or lag would directly translate to a priority. The importance of this depends on the frequency of custom picks. If essentially every user needs to implement one in order to yank out the timestamp field they are using we want it to be simple. You could, of course, support both layers, though that is usually the worst option.
          Hide
          Chris Riccomini added a comment -

          So, I think there's some confusion between Jay and Sriram's discussion. Currently, there is this interface:

          public interface IncomingMessageEnvelopePicker

          { void update(IncomingMessageEnvelope envelopes); IncomingMessageEnvelope pick(); }

          We have an implementation called DefaultPicker:

          class DefaultPicker extends IncomingMessageEnvelopePicker

          { var q = new ArrayDeque[IncomingMessageEnvelope]() def update(envelope: IncomingMessageEnvelope) = q.add(envelope) def pick = q.poll }

          The DefaultPicker is hard coded right now.

          I think the confusion is whether we eliminate the picker all together, or simply change the DefaultPicker to be a picker that uses a MessagePrioritizer and heap to do the picking. Jay Kreps, which one are you proposing?

          I kind of think this is just an implementation detail, and we could switch back and forth underneath the hood without end users knowing, but I think Sriram wants to keep the picker (but keep it hard-coded) and make the default picker become the PriorityPicker. I'm not opposed to this, since we already have the picker code written, and it's more generic.

          Alan Li Agreed, the batch size should be configurable. The deterministic ordering across streams really comes into play during replay, and is important if you want to guarantee exactly once messaging and deterministic state. If you don't care about this, I think you would just disable this whole feature, and get at least once, which is what we have right now.

          Show
          Chris Riccomini added a comment - So, I think there's some confusion between Jay and Sriram's discussion. Currently, there is this interface: public interface IncomingMessageEnvelopePicker { void update(IncomingMessageEnvelope envelopes); IncomingMessageEnvelope pick(); } We have an implementation called DefaultPicker: class DefaultPicker extends IncomingMessageEnvelopePicker { var q = new ArrayDeque[IncomingMessageEnvelope]() def update(envelope: IncomingMessageEnvelope) = q.add(envelope) def pick = q.poll } The DefaultPicker is hard coded right now. I think the confusion is whether we eliminate the picker all together, or simply change the DefaultPicker to be a picker that uses a MessagePrioritizer and heap to do the picking. Jay Kreps , which one are you proposing? I kind of think this is just an implementation detail, and we could switch back and forth underneath the hood without end users knowing, but I think Sriram wants to keep the picker (but keep it hard-coded) and make the default picker become the PriorityPicker. I'm not opposed to this, since we already have the picker code written, and it's more generic. Alan Li Agreed, the batch size should be configurable. The deterministic ordering across streams really comes into play during replay, and is important if you want to guarantee exactly once messaging and deterministic state. If you don't care about this, I think you would just disable this whole feature, and get at least once, which is what we have right now.
          Hide
          Alan Li added a comment -

          I think the MessagePriority is a sensible interface. Would a custom MessagePriority object be expose via a new interface in StreamTask? And what are the key & value input parameters to assignPriority()?

          Re prioritizing batch of messages:
          1. I'm assuming there will be a way to control the prioritization batch size? If the prioritization batch size is too large, this may lead to situation where the "live" stream will get starved depending on the order of when events arrive.
          2. How important is it to have deterministic ordering across streams? It seems that without this requirement, we can avoid a lot of the bookkeeping.

          Show
          Alan Li added a comment - I think the MessagePriority is a sensible interface. Would a custom MessagePriority object be expose via a new interface in StreamTask? And what are the key & value input parameters to assignPriority()? Re prioritizing batch of messages: 1. I'm assuming there will be a way to control the prioritization batch size? If the prioritization batch size is too large, this may lead to situation where the "live" stream will get starved depending on the order of when events arrive. 2. How important is it to have deterministic ordering across streams? It seems that without this requirement, we can avoid a lot of the bookkeeping.
          Hide
          Jay Kreps added a comment -

          Chris, yeah I withdrawn that implementation proposal . That said I think we can find an encoding that is efficient for batches but allows arbitrary scheduling.

          Show
          Jay Kreps added a comment - Chris, yeah I withdrawn that implementation proposal . That said I think we can find an encoding that is efficient for batches but allows arbitrary scheduling.
          Hide
          Jay Kreps added a comment -

          Sriram, I agree it is a scheduling problem but can you help us understand the limitations of the priority api as a way to implement a scheduling algorithm, the api you are proposing, and some of the use cases this would enable? I think the priority api captures any use case where priority can be statically assigned at the time the message is seen and doesn't change. The two reasons I kind of liked this restriction is that (1) it corresponds well to the "partial sort" property of a heap so it is easy to efficiently implement, and (2) it gives a very simple contract to the user. Presumably most schedulers which depended on non-static priority would require resorting all messages on each pick() call which would not be good for cases like where we consume thousands of partitions.

          Perhaps one thing to think about is how often will people use this api? If it is very common to plug in an adapter to get time out of your messages then a simple api is important. If it is a pretty rare, low-level thing then maybe the more powerful api is better.

          Also I think it would be good for us to clarify what we are trying to solve. When we originally discussed this we kind of thought we could address all kinds of message sorting here, but I no longer think that makes sense (it would be reasonable to disagree with this). I think this api should solve the problem of which stream to choose next. I think it doesn't make sense to attempt any kind of message re-organization. Essentially we want the user to pick the next stream. Another way of saying this is the components of the offset position vector would be non-decreasing. Resorting can be done in the task, perhaps with a special storage engine to help.

          Show
          Jay Kreps added a comment - Sriram, I agree it is a scheduling problem but can you help us understand the limitations of the priority api as a way to implement a scheduling algorithm, the api you are proposing, and some of the use cases this would enable? I think the priority api captures any use case where priority can be statically assigned at the time the message is seen and doesn't change. The two reasons I kind of liked this restriction is that (1) it corresponds well to the "partial sort" property of a heap so it is easy to efficiently implement, and (2) it gives a very simple contract to the user. Presumably most schedulers which depended on non-static priority would require resorting all messages on each pick() call which would not be good for cases like where we consume thousands of partitions. Perhaps one thing to think about is how often will people use this api? If it is very common to plug in an adapter to get time out of your messages then a simple api is important. If it is a pretty rare, low-level thing then maybe the more powerful api is better. Also I think it would be good for us to clarify what we are trying to solve. When we originally discussed this we kind of thought we could address all kinds of message sorting here, but I no longer think that makes sense (it would be reasonable to disagree with this). I think this api should solve the problem of which stream to choose next. I think it doesn't make sense to attempt any kind of message re-organization. Essentially we want the user to pick the next stream. Another way of saying this is the components of the offset position vector would be non-decreasing. Resorting can be done in the task, perhaps with a special storage engine to help.
          Hide
          Sriram Subramanian added a comment -

          What we want is to really schedule a message or a batch of messages for processing. I would model this as a scheduling problem and make priority based scheduling as just one type. I would not want to reduce round robin and time based scheduling into a priority based scheduling even though we can force it to do something like that. We could run the scheduler multiple times on the input streams or just provide a batch size while initializing the scheduler and it could output "batch size" messages on every call to the scheduler. Round robin based scheduling/ First in First scheduling are just different types of schedulers which are then very trivial to implement.

          Show
          Sriram Subramanian added a comment - What we want is to really schedule a message or a batch of messages for processing. I would model this as a scheduling problem and make priority based scheduling as just one type. I would not want to reduce round robin and time based scheduling into a priority based scheduling even though we can force it to do something like that. We could run the scheduler multiple times on the input streams or just provide a batch size while initializing the scheduler and it could output "batch size" messages on every call to the scheduler. Round robin based scheduling/ First in First scheduling are just different types of schedulers which are then very trivial to implement.
          Hide
          Chris Riccomini added a comment -

          I think Jay's proposal is probably the best solution given the constraints we're working under.

          Jay Kreps Regarding your implementation section, one thing to consider regarding lexicographically sorting is what happens if a user wants to add a new input stream in config, and restart their job? I think this seems like a reasonable use case, but it would break the sorting, right? Are you operating under the assumption that input streams are static once a job is executed for the first time (i.e. that we'll never add another input stream to the job's config and restart)? Not saying it's wrong, but it's a limiting decision, and not one I'd considered.

          Show
          Chris Riccomini added a comment - I think Jay's proposal is probably the best solution given the constraints we're working under. Jay Kreps Regarding your implementation section, one thing to consider regarding lexicographically sorting is what happens if a user wants to add a new input stream in config, and restart their job? I think this seems like a reasonable use case, but it would break the sorting, right? Are you operating under the assumption that input streams are static once a job is executed for the first time (i.e. that we'll never add another input stream to the job's config and restart)? Not saying it's wrong, but it's a limiting decision, and not one I'd considered.
          Hide
          Jay Kreps added a comment -

          I think the round-robin behavior may work for what Alan described to me in terms of QOS, however they also have a need to bootstrap their state stores first.

          Here is a good generalization of what we have in 0.7 would be to make the prioritization of messages pluggable rather than the Chooser. This supports the use cases I know.That is we have something like

          interface MessagePriority

          { double assignPriority(topic, partition, key, value) }

          Our guarantee would be that we always take the message from the stream whose next message has the highest priority. We will always ensure we have a message from each stream prioritized.

          The current round-robin behavior could be implemented by keeping a counter in the prioritizer and assigning sequential increasing integers to each message so they are handled first-come, first-served.

          The lag-based prioritization could be handled by using the lag as the priority, and the time-based prioritization could use the message time stamp.

          Bootstrapping off particular streams before processing could be implemented by just setting these streams priority to infinite.

          Implementation

          Internally this would just be implemented as a heap with one message per topic partition that we continually refresh as messages are removed.

          We would reserve the right to batch these up prior to calling process--i.e. we allow ourselves to call prioritize multiple times before feeding the ouputs to the task. The reason would be to ensure that we can save out this ordering to make it deterministic in the case of a failure and retry but also not have too much journalling of decisions. To avoid journaling too many of these decisions we will try to prioritize a bunch of messages at once, prior to processing (say up to 100), and log out a single message with these 100 prioritizations. We can use varints and encode a topic-partition just by its index in the lexicographically sorted set of all topic-partition inputs for the job, so each message need only be ~1 byte per decision plus a starting offset vector. This will not buy any efficiency for low-volume streams, but presumably for those we don't care.

          Show
          Jay Kreps added a comment - I think the round-robin behavior may work for what Alan described to me in terms of QOS, however they also have a need to bootstrap their state stores first. Here is a good generalization of what we have in 0.7 would be to make the prioritization of messages pluggable rather than the Chooser. This supports the use cases I know.That is we have something like interface MessagePriority { double assignPriority(topic, partition, key, value) } Our guarantee would be that we always take the message from the stream whose next message has the highest priority. We will always ensure we have a message from each stream prioritized. The current round-robin behavior could be implemented by keeping a counter in the prioritizer and assigning sequential increasing integers to each message so they are handled first-come, first-served. The lag-based prioritization could be handled by using the lag as the priority, and the time-based prioritization could use the message time stamp. Bootstrapping off particular streams before processing could be implemented by just setting these streams priority to infinite. Implementation Internally this would just be implemented as a heap with one message per topic partition that we continually refresh as messages are removed. We would reserve the right to batch these up prior to calling process--i.e. we allow ourselves to call prioritize multiple times before feeding the ouputs to the task. The reason would be to ensure that we can save out this ordering to make it deterministic in the case of a failure and retry but also not have too much journalling of decisions. To avoid journaling too many of these decisions we will try to prioritize a bunch of messages at once, prior to processing (say up to 100), and log out a single message with these 100 prioritizations. We can use varints and encode a topic-partition just by its index in the lexicographically sorted set of all topic-partition inputs for the job, so each message need only be ~1 byte per decision plus a starting offset vector. This will not buy any efficiency for low-volume streams, but presumably for those we don't care.
          Hide
          Chris Riccomini added a comment -

          One other item of note: our justification for totally removing throttling from 0.7.0 is that usually what folks really want is:

          1) Per-stream prioritization (what you're describing).
          2) Per-process isolation (I want to know that my process gets at least X % of the CPU, disk, network, etc).

          It's not actually the case that people want to voluntarily go slower than they can (which is what the throttler in 0.6.0 was doing). They want to go as fast as possible, but prioritize the inputs, as described in the summary.

          For #1, we've added the MessageChooser, as described above.

          For #2, we are going to be relying on CGroups.

          Show
          Chris Riccomini added a comment - One other item of note: our justification for totally removing throttling from 0.7.0 is that usually what folks really want is: 1) Per-stream prioritization (what you're describing). 2) Per-process isolation (I want to know that my process gets at least X % of the CPU, disk, network, etc). It's not actually the case that people want to voluntarily go slower than they can (which is what the throttler in 0.6.0 was doing). They want to go as fast as possible, but prioritize the inputs, as described in the summary. For #1, we've added the MessageChooser, as described above. For #2, we are going to be relying on CGroups.
          Hide
          Chris Riccomini added a comment - - edited

          In 0.7.0, the version we're working on (and will be available once we get our Apache git repo), we've removed all "throttling", and we've introduced the concept of a MessageChooser instead. This is an object that is given up to one message at a time from each input stream/partition pair (when they're available). The chooser's job is to return the incoming message envelope that should be processed next. For example, Samza would say, "I have IncomingMessageEnvelopes from stream/partition X, and stream/partitionY, which should I process next?"

          Our current 0.7.0 MessageChooser implementation just round robins between streams. This has the effect of throttling the "batch" streams you're talking about, when messages are available from the "live" streams. When messages are not available from the live stream, the chooser will just choose the batch messages every time, thereby causing the task to process messages from the batch stream as fast as possible.

          One caveat here is that the MessageChooser is currently not pluggable. We've hard coded this behavior into a "DefaultChooser" for now. The motivation for keeping it from being pluggable is that we're not entirely certain of the interface yet, and we're also not sure how well it will integrate with our fault tolerance and state management semantics (what happens if a pluggable MessageChooser is maintaining state, and there's a failure?).

          This leads to the question: is the round robin approach the best approach for a generic message chooser (given that it's not pluggable at this time)? I think the answer is that it's not. A better approach would simply be a priority list that says, "Always process stream X, then stream Y, then stream Z." This would actually be better for your use case, since you could ALWAYS process "live" messages first, not just round-robin between live and batch streams.

          Another alternative would be, "always process the stream that is farthest behind," for some definition of "farthest behind". Some definitions include:

          1) Farthest behind in wall-clock time (now() - timestamp in message).
          2) Farthest number of messages behind high watermark (when reading from Kafka).

          What's your take on this new 0.7.0 feature? I think the existing MessageChooser would accomodate your needs, as is, but could be made even better with a priority list.

          Jay had some thoughts on this as well.

          Show
          Chris Riccomini added a comment - - edited In 0.7.0, the version we're working on (and will be available once we get our Apache git repo), we've removed all "throttling", and we've introduced the concept of a MessageChooser instead. This is an object that is given up to one message at a time from each input stream/partition pair (when they're available). The chooser's job is to return the incoming message envelope that should be processed next. For example, Samza would say, "I have IncomingMessageEnvelopes from stream/partition X, and stream/partitionY, which should I process next?" Our current 0.7.0 MessageChooser implementation just round robins between streams. This has the effect of throttling the "batch" streams you're talking about, when messages are available from the "live" streams. When messages are not available from the live stream, the chooser will just choose the batch messages every time, thereby causing the task to process messages from the batch stream as fast as possible. One caveat here is that the MessageChooser is currently not pluggable. We've hard coded this behavior into a "DefaultChooser" for now. The motivation for keeping it from being pluggable is that we're not entirely certain of the interface yet, and we're also not sure how well it will integrate with our fault tolerance and state management semantics (what happens if a pluggable MessageChooser is maintaining state, and there's a failure?). This leads to the question: is the round robin approach the best approach for a generic message chooser (given that it's not pluggable at this time)? I think the answer is that it's not. A better approach would simply be a priority list that says, "Always process stream X, then stream Y, then stream Z." This would actually be better for your use case, since you could ALWAYS process "live" messages first, not just round-robin between live and batch streams. Another alternative would be, "always process the stream that is farthest behind," for some definition of "farthest behind". Some definitions include: 1) Farthest behind in wall-clock time (now() - timestamp in message). 2) Farthest number of messages behind high watermark (when reading from Kafka). What's your take on this new 0.7.0 feature? I think the existing MessageChooser would accomodate your needs, as is, but could be made even better with a priority list. Jay had some thoughts on this as well.

            People

            • Assignee:
              Chris Riccomini
              Reporter:
              Chris Riccomini
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development