Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.10.0
    • Component/s: container
    • Labels:
      None

      Description

      There are a lot of discussion in SAMZA-353 about assigning the same SSP to multiple taskNames. This ticket is a subset of the discussion. Only focus on the broadcast stream implementation.

      The goal is to assign one SSP to all the taskNames.

      1. BroadcastStreamDesign_1.md
        13 kB
        Yan Fang
      2. BroadcastStreamDesign_1.pdf
        162 kB
        Yan Fang
      3. BroadcastStreamDesign.md
        9 kB
        Yan Fang
      4. BroadcastStreamDesign.pdf
        141 kB
        Yan Fang
      5. SAMZA-676.2.patch
        92 kB
        Yan Fang
      6. SAMZA-676.3.patch
        98 kB
        Yan Fang
      7. SAMZA-676.4.patch
        99 kB
        Yan Fang
      8. SAMZA-676.5.patch
        105 kB
        Yan Fang

        Issue Links

          Activity

          Hide
          closeuris Yan Fang added a comment -

          Attached the design document. Looking for feedback. Especially it may require the changes in the SystemConsumer API. Thank you.

          Show
          closeuris Yan Fang added a comment - Attached the design document. Looking for feedback. Especially it may require the changes in the SystemConsumer API. Thank you.
          Hide
          navina Navina Ramesh added a comment -

          Hi Yan,
          I went through your design document and have a few questions/comments.

          1. I think usecase #3 and #4 are very similar. I have seen many instances of #4 coming up (reg. bootstrap stream) which will work well with global state implementation. For now, broadcast stream is a feature for convenience. I think we have been suggesting workarounds in the mailing lists Thanks for picking it up!
          2.

          TaskName: Partition 0 Partition 1 Partition 2
           
          Stream A Partition 0 Partition 1 Partition 2
          Stream B Partition 0 Partition 1 Partition 2
          Stream C Partition 0 Partition 1  
          Broadcast Stream Partition 0 Partition 0 Partition 0

          a. Do all broadcast streams have only 1 partition?

          b. How does this affect the consumer’s messagechooser priority? does it provide more priority to broadcast stream by default ? In general, my question is how will each task proceed at the same rate. We could have hot partitions and those tasks may not react to the broadcast stream at the same time as other tasks.

          c. Is the broadcast stream also intended to make config changes at a task level? Isn’t that a functionality at the JC?

          3. bq. However, this is the feature we will need for the broadcast stream. Because all the tasks will have the broadcast stream. When more than two tasks are assigned to the same container, the two broadcast streams have different offsets, the consumer needs to consumer the same stream more than once, with different offsets.
          > Can you explain this better?

          4.

          task.global.input=kafka.foo#1,kafka.doo#0

          Why is partition number needed here? Are you suggesting that the tasks can consume from one partition of the broadcast stream only?
          If I have a broadcast topic with 32 partitions and I want all tasks to consume from all of them, then specifying the config will be tedious.

          Show
          navina Navina Ramesh added a comment - Hi Yan, I went through your design document and have a few questions/comments. 1. I think usecase #3 and #4 are very similar. I have seen many instances of #4 coming up (reg. bootstrap stream) which will work well with global state implementation. For now, broadcast stream is a feature for convenience. I think we have been suggesting workarounds in the mailing lists Thanks for picking it up! 2. TaskName: Partition 0 Partition 1 Partition 2   Stream A Partition 0 Partition 1 Partition 2 Stream B Partition 0 Partition 1 Partition 2 Stream C Partition 0 Partition 1   Broadcast Stream Partition 0 Partition 0 Partition 0 a. Do all broadcast streams have only 1 partition? b. How does this affect the consumer’s messagechooser priority? does it provide more priority to broadcast stream by default ? In general, my question is how will each task proceed at the same rate. We could have hot partitions and those tasks may not react to the broadcast stream at the same time as other tasks. c. Is the broadcast stream also intended to make config changes at a task level? Isn’t that a functionality at the JC? 3. bq. However, this is the feature we will need for the broadcast stream. Because all the tasks will have the broadcast stream. When more than two tasks are assigned to the same container, the two broadcast streams have different offsets, the consumer needs to consumer the same stream more than once, with different offsets. > Can you explain this better? 4. task.global.input=kafka.foo#1,kafka.doo#0 Why is partition number needed here? Are you suggesting that the tasks can consume from one partition of the broadcast stream only? If I have a broadcast topic with 32 partitions and I want all tasks to consume from all of them, then specifying the config will be tedious.
          Hide
          closeuris Yan Fang added a comment -

          Thanks for reviewing, Navina.

          I think usecase #3 and #4 are very similar. I have seen many instances of #4 coming up (reg. bootstrap stream) which will work well with global state implementation.

          Yes, agreed. Since there are still use case 1 and 2, I am thinking it's still worth implementing this feature. Though we have workaround in mailing list, prefer to provide this out-of-box. It's a "major" not "critical" feature.

          a. Do all broadcast streams have only 1 partition?

          No. It's just an example. We can have any number of broadcast SSPs as long as users config them.

          b. How does this affect the consumer’s messagechooser priority? does it provide more priority to broadcast stream by default ? In general, my question is how will each task proceed at the same rate. We could have hot partitions and those tasks may not react to the broadcast stream at the same time as other tasks.

          In current consumer's implementation, there is no way that we can guarantee "each task proceed at the same rate" if more than one tasks are in the same container. Because we only have one consumer per system per container, and messageChooser is per container. So it's not possible that two tasks receive the same messages at the same time.

          I do not differentiate the broadcast stream with the normal stream in the consumer's level. So if users want to give the broadcast stream higher priority, they can set the priority config systems.kafka.streams.broadcastStream.samza.priority=2.

          c. Is the broadcast stream also intended to make config changes at a task level? Isn’t that a functionality at the JC?

          I was following the same config fashion as task.input, so not exactly sure which level it should go. In my opinion, reading this config actually happens in the JobCoordinator class.

          3. bq. However, this is the feature we will need for the broadcast stream. Because all the tasks will have the broadcast stream. When more than two tasks are assigned to the same container, the two broadcast streams have different offsets, the consumer needs to consumer the same stream more than once, with different offsets.
          > Can you explain this better?

          Again, assume there is only one system in the job.

          Currently, the consumer has two important methods, register(SSP, offset), and poll() which returns Map<SSP, List>.

          So if we have two tasks:
          task 1 has stream1-partition0, stream2-partition0, broad-stream-partition0
          task 2 has stream1-partition1, stream2-partition1, broad-stream-partition0

          If those two tasks are in the same container, the consumer will register all those SSPs (there is only one consumer). Since the consumer only returns a Map, when it returns a <broad-stream-partition0, list> , it can not tell it's for task 1 or task 2. What really will happen is that, it only returns a Map with five keys - stream1-partition0, stream2-partition0, stream1-partition1, stream2-partition1, broad-stream-partition0. So broad-stream-partition0 will only be processed once, either in task 1 or task 2. Therefore, my suggestion is that, when the consumer returns the result, it should also return the taskName information, such as task 1 -> Map, task 2 -> Map. This requires us to change the Consumer API, as well as the Chooser API. Is it a little more clear?

          As I also mentioned in the design doc, this change to Consumer API and Chooser API will also help for multiple-partition subscribe. Because when we assign one partition to more than one task, if those tasks are in the same container, we will come across the same problem.

          Of course, another way is to have as many consumers as the task number, but this seems not single-thread solution...

          Why is partition number needed here? Are you suggesting that the tasks can consume from one partition of the broadcast stream only?

          I think if users only want a few partitions, they should contain the partition number.

          If I have a broadcast topic with 32 partitions and I want all tasks to consume from all of them, then specifying the config will be tedious.

          This is quite interesting. Should we encourage the broadcast topic to have so many partitions ? Because this introduces more complexity in the system - how do we prioritize those 32 partitions in one task ? In the config, we can simply put something like broadcastTopic#all to allow convenient configuration.

          Show
          closeuris Yan Fang added a comment - Thanks for reviewing, Navina. I think usecase #3 and #4 are very similar. I have seen many instances of #4 coming up (reg. bootstrap stream) which will work well with global state implementation. Yes, agreed. Since there are still use case 1 and 2, I am thinking it's still worth implementing this feature. Though we have workaround in mailing list, prefer to provide this out-of-box. It's a "major" not "critical" feature. a. Do all broadcast streams have only 1 partition? No. It's just an example. We can have any number of broadcast SSPs as long as users config them. b. How does this affect the consumer’s messagechooser priority? does it provide more priority to broadcast stream by default ? In general, my question is how will each task proceed at the same rate. We could have hot partitions and those tasks may not react to the broadcast stream at the same time as other tasks. In current consumer's implementation, there is no way that we can guarantee "each task proceed at the same rate" if more than one tasks are in the same container. Because we only have one consumer per system per container, and messageChooser is per container. So it's not possible that two tasks receive the same messages at the same time. I do not differentiate the broadcast stream with the normal stream in the consumer's level. So if users want to give the broadcast stream higher priority, they can set the priority config systems.kafka.streams. broadcastStream .samza.priority=2. c. Is the broadcast stream also intended to make config changes at a task level? Isn’t that a functionality at the JC? I was following the same config fashion as task.input, so not exactly sure which level it should go. In my opinion, reading this config actually happens in the JobCoordinator class. 3. bq. However, this is the feature we will need for the broadcast stream. Because all the tasks will have the broadcast stream. When more than two tasks are assigned to the same container, the two broadcast streams have different offsets, the consumer needs to consumer the same stream more than once, with different offsets. > Can you explain this better? Again, assume there is only one system in the job. Currently, the consumer has two important methods, register(SSP, offset), and poll() which returns Map<SSP, List>. So if we have two tasks: task 1 has stream1-partition0, stream2-partition0, broad-stream-partition0 task 2 has stream1-partition1, stream2-partition1, broad-stream-partition0 If those two tasks are in the same container, the consumer will register all those SSPs (there is only one consumer). Since the consumer only returns a Map, when it returns a < broad-stream-partition0 , list> , it can not tell it's for task 1 or task 2. What really will happen is that, it only returns a Map with five keys - stream1-partition0, stream2-partition0, stream1-partition1, stream2-partition1, broad-stream-partition0 . So broad-stream-partition0 will only be processed once, either in task 1 or task 2. Therefore, my suggestion is that, when the consumer returns the result, it should also return the taskName information, such as task 1 -> Map, task 2 -> Map. This requires us to change the Consumer API, as well as the Chooser API. Is it a little more clear? As I also mentioned in the design doc, this change to Consumer API and Chooser API will also help for multiple-partition subscribe. Because when we assign one partition to more than one task, if those tasks are in the same container, we will come across the same problem. Of course, another way is to have as many consumers as the task number, but this seems not single-thread solution... Why is partition number needed here? Are you suggesting that the tasks can consume from one partition of the broadcast stream only? I think if users only want a few partitions, they should contain the partition number. If I have a broadcast topic with 32 partitions and I want all tasks to consume from all of them, then specifying the config will be tedious. This is quite interesting. Should we encourage the broadcast topic to have so many partitions ? Because this introduces more complexity in the system - how do we prioritize those 32 partitions in one task ? In the config, we can simply put something like broadcastTopic#all to allow convenient configuration.
          Hide
          twbecker Tommy Becker added a comment -

          I had started to implement this same feature a while back. But in my solution I did not make any changes to either the Consumer or the Chooser. Instead I simply made the RunLoop aware that a single SSP could map to multiple task instances. When an envelope for an SSP comes in, it loops over all all tasks that SSP is assigned to and passes the envelope to each of them. Again, I never really tested my implementation so it's very possible my approach was flawed, but I'd like to understand how

          Show
          twbecker Tommy Becker added a comment - I had started to implement this same feature a while back. But in my solution I did not make any changes to either the Consumer or the Chooser. Instead I simply made the RunLoop aware that a single SSP could map to multiple task instances. When an envelope for an SSP comes in, it loops over all all tasks that SSP is assigned to and passes the envelope to each of them. Again, I never really tested my implementation so it's very possible my approach was flawed, but I'd like to understand how
          Hide
          closeuris Yan Fang added a comment -

          When an envelope for an SSP comes in, it loops over all all tasks that SSP is assigned to and passes the envelope to each of them.

          Aha, yeah, this is a good idea! It works under the assumption that, all the tasks in the same container consume the broadcast stream at the same speed and always read the same SSP messages. This assumption is not always true: If two tasks run in different containers at the beginning and so have different offsets for the broadcast stream (because, say, one container fails and then is brought up), they are consuming the same stream in different speed. When they are brought to one container, feeding them with the same messages seems not ideal.

          Instead I simply made the RunLoop aware that a single SSP could map to multiple task instances.

          curious how you let the RunLoop aware which SSP could map to multiple task instances?

          Show
          closeuris Yan Fang added a comment - When an envelope for an SSP comes in, it loops over all all tasks that SSP is assigned to and passes the envelope to each of them. Aha, yeah, this is a good idea! It works under the assumption that, all the tasks in the same container consume the broadcast stream at the same speed and always read the same SSP messages. This assumption is not always true: If two tasks run in different containers at the beginning and so have different offsets for the broadcast stream (because, say, one container fails and then is brought up), they are consuming the same stream in different speed. When they are brought to one container, feeding them with the same messages seems not ideal. Instead I simply made the RunLoop aware that a single SSP could map to multiple task instances. curious how you let the RunLoop aware which SSP could map to multiple task instances?
          Hide
          twbecker Tommy Becker added a comment -

          This assumption is not always true: If two tasks run in different containers at the beginning and so have different offsets for the broadcast stream (because, say, one container fails and then is brought up), they are consuming the same stream in different speed.

          Yes, this is the core thing I missed; the solution assumes the number of containers is static.

          curious how you let the RunLoop aware which SSP could map to multiple task instances?

          I had config convention similar to what you propose, where you could identify which streams are to be broadcast streams. Then I modified the GroupByPartition grouper to exclude the broadcast ssps from it's initial grouping, then add them to each tasks ssp set. Finally I changed RunLoop to build up a Map[SSP, Set[TaskInstance]] from the tasks it is given.

          Show
          twbecker Tommy Becker added a comment - This assumption is not always true: If two tasks run in different containers at the beginning and so have different offsets for the broadcast stream (because, say, one container fails and then is brought up), they are consuming the same stream in different speed. Yes, this is the core thing I missed; the solution assumes the number of containers is static. curious how you let the RunLoop aware which SSP could map to multiple task instances? I had config convention similar to what you propose, where you could identify which streams are to be broadcast streams. Then I modified the GroupByPartition grouper to exclude the broadcast ssps from it's initial grouping, then add them to each tasks ssp set. Finally I changed RunLoop to build up a Map[SSP, Set [TaskInstance] ] from the tasks it is given.
          Hide
          closeuris Yan Fang added a comment -

          Thank you, Tommy.

          GroupByPartition grouper to exclude the broadcast ssps from it's initial grouping, then add them to each tasks ssp set

          How does the GroupByPartition know which SSP it should exclude? Learning from you helps me polish the design.

          Show
          closeuris Yan Fang added a comment - Thank you, Tommy. GroupByPartition grouper to exclude the broadcast ssps from it's initial grouping, then add them to each tasks ssp set How does the GroupByPartition know which SSP it should exclude? Learning from you helps me polish the design.
          Hide
          naveenatceg Naveen Somasundaram added a comment - - edited

          Hey Yan Fang, Thanks for the design! After reading Jakob Homan's proposal, I was thinking of a third approach, which will also probably involves close the design of SAMZA-353.
          How about we had a way to mark a stream as broadcast stream at grouper level ?

          grouper.broadcastTopics=kafka.PageViewEvent#0, kafka.adviews#[1-4]
          

          This way we don't break the API as such, but more like we are now making the grouper more configurable (We can extend the current default grouper a new broadcast grouper or something similar - and we set the broadcast grouper to the default). The grouper fetches this config, and with this config, it assigns the same task to all the tasks. First one with partition 0 to all of them, and second one through four to all of them.

          Of course, this will work only when we address SAMZA-353, and make sure we allow duplicates, without error-ing out (Which I am not sure where that happens right now - this might have been fixed with the new TaskModels, because the new Map is TaskInstance -> SSPs, we might already be allowing duplicates, but that needs to investigated further at code level).

          What are your thoughts ? Let me mull over it more (especially checkpointing)

          Show
          naveenatceg Naveen Somasundaram added a comment - - edited Hey Yan Fang , Thanks for the design! After reading Jakob Homan 's proposal, I was thinking of a third approach, which will also probably involves close the design of SAMZA-353 . How about we had a way to mark a stream as broadcast stream at grouper level ? grouper.broadcastTopics=kafka.PageViewEvent#0, kafka.adviews#[1-4] This way we don't break the API as such, but more like we are now making the grouper more configurable (We can extend the current default grouper a new broadcast grouper or something similar - and we set the broadcast grouper to the default). The grouper fetches this config, and with this config, it assigns the same task to all the tasks. First one with partition 0 to all of them, and second one through four to all of them. Of course, this will work only when we address SAMZA-353 , and make sure we allow duplicates, without error-ing out (Which I am not sure where that happens right now - this might have been fixed with the new TaskModels, because the new Map is TaskInstance -> SSPs, we might already be allowing duplicates, but that needs to investigated further at code level). What are your thoughts ? Let me mull over it more (especially checkpointing)
          Hide
          navina Navina Ramesh added a comment -

          Yan Fang : To add to what Naveen suggested:
          Another alternative would be make the StreamConfig accessible to the Grouper implementations. This will break the API. But it can make the grouper interface more powerful. I need to think this through a bit. Just thought of noting it out here.

          Show
          navina Navina Ramesh added a comment - Yan Fang : To add to what Naveen suggested: Another alternative would be make the StreamConfig accessible to the Grouper implementations. This will break the API. But it can make the grouper interface more powerful. I need to think this through a bit. Just thought of noting it out here.
          Hide
          navina Navina Ramesh added a comment -

          So broad-stream-partition0 will only be processed once, either in task 1 or task 2. Therefore, my suggestion is that, when the consumer returns the result, it should also return the taskName information, such as task 1 -> Map, task 2 -> Map

          If a container has more than 1 task and you fetch a message from a broadcast stream partition, will you be invoking each of the tasks in order ? Just need a clarification.

          Is it a little more clear?

          Yes. Thank you for explaining. I think there is a typo in the document as well.

          change to Consumer API and Chooser API will also help for multiple-partition subscribe

          I don't think you have mentioned how your design affects the Chooser API. Can you please explain or point me to the section if I missed it?

          Should we encourage the broadcast topic to have so many partitions ?

          I think we should keep things simple. We can either enforce only 1 partition for any broadcast stream or all partitions will be consumed by all tasks. I think design in this JIRA should easily enable SAMZA-353 feature. If you think about both the use-cases, I think you can come up with an efficient and simplistic configuration.

          I have another question. In a system which consumes from a broadcast stream, how will we calibrate the throughput of a job (messages processed per second) ? The same message is handled more than once in different tasks.

          Show
          navina Navina Ramesh added a comment - So broad-stream-partition0 will only be processed once, either in task 1 or task 2. Therefore, my suggestion is that, when the consumer returns the result, it should also return the taskName information, such as task 1 -> Map, task 2 -> Map If a container has more than 1 task and you fetch a message from a broadcast stream partition, will you be invoking each of the tasks in order ? Just need a clarification. Is it a little more clear? Yes. Thank you for explaining. I think there is a typo in the document as well. change to Consumer API and Chooser API will also help for multiple-partition subscribe I don't think you have mentioned how your design affects the Chooser API. Can you please explain or point me to the section if I missed it? Should we encourage the broadcast topic to have so many partitions ? I think we should keep things simple. We can either enforce only 1 partition for any broadcast stream or all partitions will be consumed by all tasks. I think design in this JIRA should easily enable SAMZA-353 feature. If you think about both the use-cases, I think you can come up with an efficient and simplistic configuration. I have another question. In a system which consumes from a broadcast stream, how will we calibrate the throughput of a job (messages processed per second) ? The same message is handled more than once in different tasks.
          Hide
          twbecker Tommy Becker added a comment -

          GroupByPartition now takes the Config as a constructor param and gets the broadcast streams from that. That change was easy since the GroupByPartitionFactory already has the config.

          Show
          twbecker Tommy Becker added a comment - GroupByPartition now takes the Config as a constructor param and gets the broadcast streams from that. That change was easy since the GroupByPartitionFactory already has the config.
          Hide
          twbecker Tommy Becker added a comment -

          As noted above this is what I did. The SystemStreamPartitionGrouperFactory already has the config, so what API does it break?

          Show
          twbecker Tommy Becker added a comment - As noted above this is what I did. The SystemStreamPartitionGrouperFactory already has the config, so what API does it break?
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Hi, Yan Fang, sorry to chime in late here. Here are my points in addition:

          1. Instead of changing SystemConsumer API, why not change the implementation of SystemConsumers s.t. we allow M:1 mapping between SystemConsumer instance to Task instance? That should satisfy the multi-subscriber use case as well. SystemConsumers.register() method has to be changed but that is internal API.
          2. I do see the requirement to change / break the current MessageChooser API in order to accommodate the multi-subscriber per SSP in a clean way. The MessageChooser.choose API only returns the IncomingMessageEnvelope, which makes it difficult to identify which task this message should go to w/o knowing from which consumer this message is coming from. The only option w/o breaking the MessageChooser API seems to require a hell-lot of bookkeeping in the SystemConsumers to remember the lastest offset each SystemConsumer is at and decorate/deliver the incoming message w/ the SystemConsumer that expects the exact offset. Yet, there could be multiple deliveries if all consumers are at the exact offset and moving at exact the same speed. So, I would propose the add a MulticastMessageChoose API and make it configurable s.t. it is required when broadcast topic is configured.
          3. I like Naveen Somasundaram's idea of making the grouper configurable better, because it would allow the implementation of SystemStreamPartitionGrouper to determine which SSPs are global w/o changing the SystemStreamPartitionGrouper API and the logic can still be within this group() function, instead of modifying JobCoordinator to add the global topics later.

          Just my a few cents.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Hi, Yan Fang , sorry to chime in late here. Here are my points in addition: Instead of changing SystemConsumer API, why not change the implementation of SystemConsumers s.t. we allow M:1 mapping between SystemConsumer instance to Task instance? That should satisfy the multi-subscriber use case as well. SystemConsumers.register() method has to be changed but that is internal API. I do see the requirement to change / break the current MessageChooser API in order to accommodate the multi-subscriber per SSP in a clean way. The MessageChooser.choose API only returns the IncomingMessageEnvelope, which makes it difficult to identify which task this message should go to w/o knowing from which consumer this message is coming from. The only option w/o breaking the MessageChooser API seems to require a hell-lot of bookkeeping in the SystemConsumers to remember the lastest offset each SystemConsumer is at and decorate/deliver the incoming message w/ the SystemConsumer that expects the exact offset. Yet, there could be multiple deliveries if all consumers are at the exact offset and moving at exact the same speed. So, I would propose the add a MulticastMessageChoose API and make it configurable s.t. it is required when broadcast topic is configured. I like Naveen Somasundaram 's idea of making the grouper configurable better, because it would allow the implementation of SystemStreamPartitionGrouper to determine which SSPs are global w/o changing the SystemStreamPartitionGrouper API and the logic can still be within this group() function, instead of modifying JobCoordinator to add the global topics later. Just my a few cents.
          Hide
          navina Navina Ramesh added a comment -

          Thanks for pointing it out Tommy Becker ! I was looking at SystemStreamPartitionGrouper interface and not the SystemStreamPartitionGrouperFactory interface.
          You are right. There is a way to provide broadcast stream grouping without breaking the API.

          Show
          navina Navina Ramesh added a comment - Thanks for pointing it out Tommy Becker ! I was looking at SystemStreamPartitionGrouper interface and not the SystemStreamPartitionGrouperFactory interface. You are right. There is a way to provide broadcast stream grouping without breaking the API.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited

          Yet, there could be multiple deliveries if all consumers are at the exact offset and moving at exact the same speed.

          On a second thought, the above issue may be solvable by just picking one of the consumers at the same offset, decorate the incoming message w/ the consumer info, and advance the chosen consumer's offset. However, it would still be much cleaner if the MessageChooser.choose returns incoming message w/ the consumer info together.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited Yet, there could be multiple deliveries if all consumers are at the exact offset and moving at exact the same speed. On a second thought, the above issue may be solvable by just picking one of the consumers at the same offset, decorate the incoming message w/ the consumer info, and advance the chosen consumer's offset. However, it would still be much cleaner if the MessageChooser.choose returns incoming message w/ the consumer info together.
          Hide
          closeuris Yan Fang added a comment - - edited

          GroupByPartition now takes the Config as a constructor param and gets the broadcast streams from that. That change was easy since the GroupByPartitionFactory already has the config.

          Thank you Tommy Becker for pointing this out. I think Navina and I both overlooked this one.

          This way we don't break the API as such, but more like we are now making the grouper more configurable (We can extend the current default grouper a new broadcast grouper or something similar - and we set the broadcast grouper to the default). The grouper fetches this config, and with this config, it assigns the same task to all the tasks. First one with partition 0 to all of them, and second one through four to all of them.

          Naveen Somasundaram, agreed on this idea. (guess +1 from Yi Pan (Data Infrastructure) too). As mentioned in the above comment, since the GrouperFactory already has the Config parameter, I don't think we need to change the Grouper API at all. Then we do not need to change the JobCoordinator either. The Grouper class takes care of assigning the broadcast stream to all the tasks.

          If a container has more than 1 task and you fetch a message from a broadcast stream partition, will you be invoking each of the tasks in order ? Just need a clarification.

          I have another question. In a system which consumes from a broadcast stream, how will we calibrate the throughput of a job (messages processed per second) ? The same message is handled more than once in different tasks.

          Those two questions are related. Both need a further discussion. Because there are two ideas now (similar thing mentioned in SAMZA-353's design doc):

          1) "Alter the SamzaContainer to consume from multiple positions within a single SystemStreamPartition."(from SAMZA-353 design doc). An alternative is that, we treat the boardcast stream in different tasks as different SSPs. In Kafka, (from my experiment), one consumer can fetch the same topic from different offsets and treat them as two different FetchRequest. Just need to verify with Guozhang Wang.

          2) "Impose an ordering on offsets, and always start consuming from the lowest offset that's required for all TaskNames within a single container for a given SystemStreamPartition. The container can then filter any input messages that a given
          StreamTask has already processed in cases where another task in the container might be farther behind." (from SAMZA-353 design doc).

          Method 1 is more intuitive and is doable. But as I mentioned, may need to change the Consumer API to let the consumer return the messages from the same SSP but different offsets. (the poll() method).

          The good thing about method 2 is that it does not need to change existing APIs. It has a few things to consider:
          1) has the assumption that all offsets are ordered. I think this is not a big concern. When we have a system which does not have ordering offset, just do not support broadcast stream and this does not influence other implementations.

          2) Another concern is that, assume broadcast stream has offset 100 in task 1, and offset 50 in task 2. When we process, we start from offset 50 and have the task 1 ignores msgs from offset 50-100. How is the priority of the streams? Do we only process the broadstream from 50-100 first, or in a RoundRobin order with all other partitions? Do we prioritize task 2 and process it until the broadstream reaches offset 100, then we can do one-broadstream-msg-for-two-tasks logic?

          Instead of changing SystemConsumer API, why not change the implementation of SystemConsumers s.t. we allow M:1 mapping between SystemConsumer instance to Task instance? That should satisfy the multi-subscriber use case as well. SystemConsumers.register() method has to be changed but that is internal API.

          Yi Pan (Data Infrastructure), Do you mean SystemConsumer : Task instances = M : 1 ( currently SystemConsumer : Task instances = 1 : M )? Or do you mean we allow the consumer to register the same SSP from different tasks as different SSPs? If the latter one, yes, this is doable. We do not need to change the register() API. However, the problem is in the poll() method, which only returns the SSP-> List Map, does not have the information which task this SSP belongs.

          The MessageChooser.choose API only returns the IncomingMessageEnvelope, which makes it difficult to identify which task this message should go to w/o knowing from which consumer this message is coming from.

          Navina Ramesh, this is what I mean by changing the Chooser API. I did not put it in the design doc is because I did not realize we need to do this as well. Will update it later.

          The only option w/o breaking the MessageChooser API seems to require a hell-lot of bookkeeping in the SystemConsumers to remember the lastest offset each SystemConsumer is at and decorate/deliver the incoming message w/ the SystemConsumer that expects the exact offset. Yet, there could be multiple deliveries if all consumers are at the exact offset and moving at exact the same speed.

          I think this is related to your (1) comment - you are suggesting multiple consumers - this needs further discussion. But yes, you are right, the SystemConsumer needs to bookkeep the information of the incoming messages, such as latest offset/taskName, and currently I do not see any place to have this information.

          So, I would propose the add a MulticastMessageChoose API and make it configurable s.t. it is required when broadcast topic is configured.

          What kind of new methods will this need?

          Show
          closeuris Yan Fang added a comment - - edited GroupByPartition now takes the Config as a constructor param and gets the broadcast streams from that. That change was easy since the GroupByPartitionFactory already has the config. Thank you Tommy Becker for pointing this out. I think Navina and I both overlooked this one. This way we don't break the API as such, but more like we are now making the grouper more configurable (We can extend the current default grouper a new broadcast grouper or something similar - and we set the broadcast grouper to the default). The grouper fetches this config, and with this config, it assigns the same task to all the tasks. First one with partition 0 to all of them, and second one through four to all of them. Naveen Somasundaram , agreed on this idea. (guess +1 from Yi Pan (Data Infrastructure) too). As mentioned in the above comment, since the GrouperFactory already has the Config parameter, I don't think we need to change the Grouper API at all. Then we do not need to change the JobCoordinator either. The Grouper class takes care of assigning the broadcast stream to all the tasks. If a container has more than 1 task and you fetch a message from a broadcast stream partition, will you be invoking each of the tasks in order ? Just need a clarification. I have another question. In a system which consumes from a broadcast stream, how will we calibrate the throughput of a job (messages processed per second) ? The same message is handled more than once in different tasks. Those two questions are related. Both need a further discussion. Because there are two ideas now (similar thing mentioned in SAMZA-353 's design doc): 1) "Alter the SamzaContainer to consume from multiple positions within a single SystemStreamPartition."(from SAMZA-353 design doc). An alternative is that, we treat the boardcast stream in different tasks as different SSPs. In Kafka, (from my experiment), one consumer can fetch the same topic from different offsets and treat them as two different FetchRequest. Just need to verify with Guozhang Wang . 2) "Impose an ordering on offsets, and always start consuming from the lowest offset that's required for all TaskNames within a single container for a given SystemStreamPartition. The container can then filter any input messages that a given StreamTask has already processed in cases where another task in the container might be farther behind." (from SAMZA-353 design doc). Method 1 is more intuitive and is doable. But as I mentioned, may need to change the Consumer API to let the consumer return the messages from the same SSP but different offsets. (the poll() method). The good thing about method 2 is that it does not need to change existing APIs. It has a few things to consider: 1) has the assumption that all offsets are ordered. I think this is not a big concern. When we have a system which does not have ordering offset, just do not support broadcast stream and this does not influence other implementations. 2) Another concern is that, assume broadcast stream has offset 100 in task 1, and offset 50 in task 2. When we process, we start from offset 50 and have the task 1 ignores msgs from offset 50-100. How is the priority of the streams? Do we only process the broadstream from 50-100 first, or in a RoundRobin order with all other partitions? Do we prioritize task 2 and process it until the broadstream reaches offset 100, then we can do one-broadstream-msg-for-two-tasks logic? Instead of changing SystemConsumer API, why not change the implementation of SystemConsumers s.t. we allow M:1 mapping between SystemConsumer instance to Task instance? That should satisfy the multi-subscriber use case as well. SystemConsumers.register() method has to be changed but that is internal API. Yi Pan (Data Infrastructure) , Do you mean SystemConsumer : Task instances = M : 1 ( currently SystemConsumer : Task instances = 1 : M )? Or do you mean we allow the consumer to register the same SSP from different tasks as different SSPs? If the latter one, yes, this is doable. We do not need to change the register() API. However, the problem is in the poll() method, which only returns the SSP-> List Map, does not have the information which task this SSP belongs. The MessageChooser.choose API only returns the IncomingMessageEnvelope, which makes it difficult to identify which task this message should go to w/o knowing from which consumer this message is coming from. Navina Ramesh , this is what I mean by changing the Chooser API. I did not put it in the design doc is because I did not realize we need to do this as well. Will update it later. The only option w/o breaking the MessageChooser API seems to require a hell-lot of bookkeeping in the SystemConsumers to remember the lastest offset each SystemConsumer is at and decorate/deliver the incoming message w/ the SystemConsumer that expects the exact offset. Yet, there could be multiple deliveries if all consumers are at the exact offset and moving at exact the same speed. I think this is related to your (1) comment - you are suggesting multiple consumers - this needs further discussion. But yes, you are right, the SystemConsumer needs to bookkeep the information of the incoming messages, such as latest offset/taskName, and currently I do not see any place to have this information. So, I would propose the add a MulticastMessageChoose API and make it configurable s.t. it is required when broadcast topic is configured. What kind of new methods will this need?
          Hide
          guozhang Guozhang Wang added a comment -

          Yan Fang Yeah with the current simple consumer it can send the fetch request with arbitrary starting offsets; in the new consumer that combines both high-level and simple consumer logic, we can do sth. like:

          consumer.seek(offset1)
          messages1 = consumer.pool()
          consumer.seek(offset2)
          messages2 = consumer.pool()
          
          Show
          guozhang Guozhang Wang added a comment - Yan Fang Yeah with the current simple consumer it can send the fetch request with arbitrary starting offsets; in the new consumer that combines both high-level and simple consumer logic, we can do sth. like: consumer.seek(offset1) messages1 = consumer.pool() consumer.seek(offset2) messages2 = consumer.pool()
          Hide
          navina Navina Ramesh added a comment -

          I did not put it in the design doc is because I did not realize we need to do this as well. Will update it later.

          Ok. Thanks, Yan Fang !

          Show
          navina Navina Ramesh added a comment - I did not put it in the design doc is because I did not realize we need to do this as well. Will update it later. Ok. Thanks, Yan Fang !
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Yi Pan (Data Infrastructure), Do you mean SystemConsumer : Task instances = M : 1 ( currently SystemConsumer : Task instances = 1 : M )? Or do you mean we allow the consumer to register the same SSP from different tasks as different SSPs? If the latter one, yes, this is doable. We do not need to change the register() API. However, the problem is in the poll() method, which only returns the SSP-> List Map, does not have the information which task this SSP belongs.

          Yes, you are right. Sorry I was confused between SystemConsumer vs a single subscription to SSP. What I meant to say is "allow the consumer to register the same SSP from different tasks". That does need to break/change the SystemConsumer API.

          What kind of new methods will this need?

          I was just thinking of enrich the return information to be a pair:

          // The return value is a pair of (msg, subName) from the MulticastMessageChooser, subName can be 1:1 to taskName if we only support 1 sub in each task, or n:1 if we consider to support multiple subscriptions in each task
          Pair<IncomingMessageEnvelope, String> choose();
          
          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Yi Pan (Data Infrastructure), Do you mean SystemConsumer : Task instances = M : 1 ( currently SystemConsumer : Task instances = 1 : M )? Or do you mean we allow the consumer to register the same SSP from different tasks as different SSPs? If the latter one, yes, this is doable. We do not need to change the register() API. However, the problem is in the poll() method, which only returns the SSP-> List Map, does not have the information which task this SSP belongs. Yes, you are right. Sorry I was confused between SystemConsumer vs a single subscription to SSP. What I meant to say is "allow the consumer to register the same SSP from different tasks". That does need to break/change the SystemConsumer API. What kind of new methods will this need? I was just thinking of enrich the return information to be a pair: // The return value is a pair of (msg, subName) from the MulticastMessageChooser, subName can be 1:1 to taskName if we only support 1 sub in each task, or n:1 if we consider to support multiple subscriptions in each task Pair<IncomingMessageEnvelope, String > choose();
          Hide
          closeuris Yan Fang added a comment -

          Is it in the 0.9.0 Consumer redesign?

          Show
          closeuris Yan Fang added a comment - Is it in the 0.9.0 Consumer redesign?
          Hide
          closeuris Yan Fang added a comment -

          What I meant to say is "allow the consumer to register the same SSP from different tasks". That does need to break/change the SystemConsumer API.

          This does not break the register part, but breaks the poll() part as I mentioned because we can only return A Map with SSP->List<msg> mapping. Not sure how we can return a Map where one SSP have two different lists of msgs - the broadcast stream has two offsets and so the msgs from it are different.

          I was just thinking of enrich the return information to be a pair

          Hmm, this sounds promising as well. If users enable the global stream, they are required to use the MulticastMessageChooser. Otherwise, they can use existing MessageChooser. This does not break current implementation.

          Show
          closeuris Yan Fang added a comment - What I meant to say is "allow the consumer to register the same SSP from different tasks". That does need to break/change the SystemConsumer API. This does not break the register part, but breaks the poll() part as I mentioned because we can only return A Map with SSP->List<msg> mapping. Not sure how we can return a Map where one SSP have two different lists of msgs - the broadcast stream has two offsets and so the msgs from it are different. I was just thinking of enrich the return information to be a pair Hmm, this sounds promising as well. If users enable the global stream, they are required to use the MulticastMessageChooser. Otherwise, they can use existing MessageChooser. This does not break current implementation.
          Hide
          guozhang Guozhang Wang added a comment -

          Yup.

          Show
          guozhang Guozhang Wang added a comment - Yup.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          This does not break the register part, but breaks the poll() part as I mentioned because we can only return A Map with SSP->List<msg> mapping. Not sure how we can return a Map where one SSP have two different lists of msgs - the broadcast stream has two offsets and so the msgs from it are different.

          Yes, that is the problem. How about that we add MulticastSystemConsumer as an extended SystemConsumer interface as well and instantiate this SystemConsumer class if the user configured to use broadcast topic? This may not be too bad since SystemConsumer is an interface to the underlying messaging systems and the systems do not support this can choose not to implement this additional interface.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - This does not break the register part, but breaks the poll() part as I mentioned because we can only return A Map with SSP->List<msg> mapping. Not sure how we can return a Map where one SSP have two different lists of msgs - the broadcast stream has two offsets and so the msgs from it are different. Yes, that is the problem. How about that we add MulticastSystemConsumer as an extended SystemConsumer interface as well and instantiate this SystemConsumer class if the user configured to use broadcast topic? This may not be too bad since SystemConsumer is an interface to the underlying messaging systems and the systems do not support this can choose not to implement this additional interface.
          Hide
          closeuris Yan Fang added a comment -

          Updated the design doc based on Yi Pan (Data Infrastructure), Tommy Becker, Navina Ramesh , Guozhang Wang and Naveen Somasundaram's feedback. The changes are all in the "Implementation" Section. They are "SystemStreamPartition Grouping", "Consumer", "Message Chooser" and "Configuration".

          I know, some discussion may already happen in the SAMZA-353. But since most of you did not attend that discussion, I still think it's valuable to rethink and come up with a more concrete design doc for the real implementation.

          Thank you.

          Show
          closeuris Yan Fang added a comment - Updated the design doc based on Yi Pan (Data Infrastructure) , Tommy Becker , Navina Ramesh , Guozhang Wang and Naveen Somasundaram 's feedback. The changes are all in the "Implementation" Section. They are "SystemStreamPartition Grouping", "Consumer", "Message Chooser" and "Configuration". I know, some discussion may already happen in the SAMZA-353 . But since most of you did not attend that discussion, I still think it's valuable to rethink and come up with a more concrete design doc for the real implementation. Thank you.
          Hide
          closeuris Yan Fang added a comment -

          First attempt RB: https://reviews.apache.org/r/34974/

          1. basically using the similar approach Tommy Becker mentioned: the same ssp is assigned to multiple taskInstances. At the same time, modified the taskInstance to allow it to skip already-processed-messages. This is useful because when users relocate containers, difference taskInstances may have different offsets for the broadcast stream. Using the oldest offset when the SSP has multiple offsets.

          2. also rewrite the Grouper classes with Java. added global streams when grouping.

          3. touched SAMZA-569 – added a OffsetComparator for SystemAdmin

          4. fixed SAMZA-699 – changed to LinkedHashSet to preserve the order

          5. modified the offsetManager to preserve the relation between taskName-SSPs-offsets.

          Looking for feedbacks! Thank you.

          Show
          closeuris Yan Fang added a comment - First attempt RB: https://reviews.apache.org/r/34974/ 1. basically using the similar approach Tommy Becker mentioned: the same ssp is assigned to multiple taskInstances. At the same time, modified the taskInstance to allow it to skip already-processed-messages. This is useful because when users relocate containers, difference taskInstances may have different offsets for the broadcast stream. Using the oldest offset when the SSP has multiple offsets. 2. also rewrite the Grouper classes with Java. added global streams when grouping. 3. touched SAMZA-569 – added a OffsetComparator for SystemAdmin 4. fixed SAMZA-699 – changed to LinkedHashSet to preserve the order 5. modified the offsetManager to preserve the relation between taskName-SSPs-offsets. Looking for feedbacks! Thank you.
          Hide
          closeuris Yan Fang added a comment -

          Rebased and attached the file. Thanks. Looking for feedbacks.

          Show
          closeuris Yan Fang added a comment - Rebased and attached the file. Thanks. Looking for feedbacks.
          Hide
          navina Navina Ramesh added a comment -

          Thanks, Yan Fang ! I will into your patch today.

          Show
          navina Navina Ramesh added a comment - Thanks, Yan Fang ! I will into your patch today.
          Hide
          bkirwi Ben Kirwin added a comment -

          Wow, very glad to see progress on this! I haven't looked at the patch, but the design sounds great.

          It sounds like, by allowing multiple partitions, this isn't just limited to broadcast streams – any assignment the grouper comes up with should be supported. Does that sound right? (Of course, it's possible that the default groupers will only support broadcast streams for now – but a custom grouper could do more unusual things.)

          Show
          bkirwi Ben Kirwin added a comment - Wow, very glad to see progress on this! I haven't looked at the patch, but the design sounds great. It sounds like, by allowing multiple partitions, this isn't just limited to broadcast streams – any assignment the grouper comes up with should be supported. Does that sound right? (Of course, it's possible that the default groupers will only support broadcast streams for now – but a custom grouper could do more unusual things.)
          Hide
          closeuris Yan Fang added a comment -

          Yes, I think so. If we define customized groupers, there should not be anything in the design that prevents us to do so. Just have not tested.

          Show
          closeuris Yan Fang added a comment - Yes, I think so. If we define customized groupers, there should not be anything in the design that prevents us to do so. Just have not tested.
          Hide
          closeuris Yan Fang added a comment -

          Cool. Thanks, Navina.

          Show
          closeuris Yan Fang added a comment - Cool. Thanks, Navina.
          Hide
          navina Navina Ramesh added a comment - - edited

          Yan Fang Left you some comments in the RB. Thanks!

          Show
          navina Navina Ramesh added a comment - - edited Yan Fang Left you some comments in the RB. Thanks!
          Hide
          closeuris Yan Fang added a comment -

          Updates the patch according to Navina Ramesh's comments in RB. Thanks.

          Show
          closeuris Yan Fang added a comment - Updates the patch according to Navina Ramesh 's comments in RB. Thanks.
          Hide
          closeuris Yan Fang added a comment -

          synced with latest master

          Show
          closeuris Yan Fang added a comment - synced with latest master
          Hide
          navina Navina Ramesh added a comment -

          Thanks, Yan Fang !
          Apart from some javadoc warnings, I am getting this error when I build on my linux box:
          /export/apps/jdk/JDK-1_8_0_5/bin/java: symbol lookup error: /tmp/librocksdbjni1005757688295885396..so: undefined symbol: clock_gettime

          I don't get this on my mac though. Have you seen this before? Maybe something to do with rocksdb compatibility with RHEL6.1 ?

          Show
          navina Navina Ramesh added a comment - Thanks, Yan Fang ! Apart from some javadoc warnings, I am getting this error when I build on my linux box: /export/apps/jdk/JDK-1_8_0_5/bin/java: symbol lookup error: /tmp/librocksdbjni1005757688295885396..so: undefined symbol: clock_gettime I don't get this on my mac though. Have you seen this before? Maybe something to do with rocksdb compatibility with RHEL6.1 ?
          Hide
          closeuris Yan Fang added a comment -

          Navina Ramesh, yes, have the same problem in Linux, with java 7 and java 8.

          There is a bug filed https://github.com/facebook/rocksdb/issues/606

          Since the fix will be in rocksdb's 3.11.1 release, which has not been published, wondering if we want to wait and update the rocksdb before our 0.10.0 release. If yes, we should push Naveena or Chris to release it .

          Show
          closeuris Yan Fang added a comment - Navina Ramesh , yes, have the same problem in Linux, with java 7 and java 8. There is a bug filed https://github.com/facebook/rocksdb/issues/606 Since the fix will be in rocksdb's 3.11.1 release, which has not been published, wondering if we want to wait and update the rocksdb before our 0.10.0 release. If yes, we should push Naveena or Chris to release it .
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          OK. It seems that we really need to get rocksDB 3.11.1 for Samza 0.10.0. Does any of us know the release process of RocksDB java package? Naveen Somasundaram, is there any documentation for the release process somewhere?

          Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - OK. It seems that we really need to get rocksDB 3.11.1 for Samza 0.10.0. Does any of us know the release process of RocksDB java package? Naveen Somasundaram , is there any documentation for the release process somewhere? Thanks!
          Hide
          navina Navina Ramesh added a comment - - edited

          Yan Fang I tried the patch on my mac and it works well! Left some comments on the RB.

          Yi Pan (Data Infrastructure) / Naveen Somasundaram Any updates on the RocksDB release?

          Show
          navina Navina Ramesh added a comment - - edited Yan Fang I tried the patch on my mac and it works well! Left some comments on the RB. Yi Pan (Data Infrastructure) / Naveen Somasundaram Any updates on the RocksDB release?
          Hide
          closeuris Yan Fang added a comment -

          Navina Ramesh, thanks for the review. Fixed all issues you mentioned in the RB except the "isBroadcast" flag.

          For the "isBroadcast" flag, I tried to add it in the SystemStreamPartition, but since this is an API change, it is not trivial: all the system code is using the "system.stream.partition" format, which does not contain the "isBroadcast" information, we will have to change all the places which use this format; it is also possible that users are using this format to pass around the SystemStreamPartition, we may break their code if we change to something like "system.stream.partition.isBroadcast". So I decide not to make this change so far. What do you think?

          Show
          closeuris Yan Fang added a comment - Navina Ramesh , thanks for the review. Fixed all issues you mentioned in the RB except the "isBroadcast" flag. For the "isBroadcast" flag, I tried to add it in the SystemStreamPartition, but since this is an API change, it is not trivial: all the system code is using the "system.stream.partition" format, which does not contain the "isBroadcast" information, we will have to change all the places which use this format; it is also possible that users are using this format to pass around the SystemStreamPartition, we may break their code if we change to something like "system.stream.partition.isBroadcast". So I decide not to make this change so far. What do you think?
          Hide
          navina Navina Ramesh added a comment -

          Yan Fang Which code are you referring to that use the "system.stream.partition" format? If I understand it right, the only change to the API will be an addition of a member field and a getter for it. It may change the hashcode as well, which should be acceptable. Can you please give me an example or Line number, which you think will affect the user-experience?

          Thanks!

          Show
          navina Navina Ramesh added a comment - Yan Fang Which code are you referring to that use the "system.stream.partition" format? If I understand it right, the only change to the API will be an addition of a member field and a getter for it. It may change the hashcode as well, which should be acceptable. Can you please give me an example or Line number, which you think will affect the user-experience? Thanks!
          Hide
          closeuris Yan Fang added a comment - - edited

          Need to change at least the following two parts:

          1. the sspToString and stringToSsp methods, those are used to serialize/deserialize the json to pass the JobModel between AM and containers. See here.

          2. also affect the checkpoint . See here, and here. This "looks" ok. I just have not looked at it very carefully.

          In users experience:

          1. not sure if we want to include the "isBroadcast" in the toString method, this may also affect the user's code which is based on the toString method.

          2. and hope they are not using the sspToString and stringToSsp methods (they are not in the API though).

          Show
          closeuris Yan Fang added a comment - - edited Need to change at least the following two parts: 1. the sspToString and stringToSsp methods, those are used to serialize/deserialize the json to pass the JobModel between AM and containers. See here . 2. also affect the checkpoint . See here , and here . This "looks" ok. I just have not looked at it very carefully. In users experience: 1. not sure if we want to include the "isBroadcast" in the toString method, this may also affect the user's code which is based on the toString method. 2. and hope they are not using the sspToString and stringToSsp methods (they are not in the API though).
          Hide
          navina Navina Ramesh added a comment -

          1. the sspToString and stringToSsp methods, those are used to serialize/deserialize the json to pass the JobModel between AM and containers. See here.

          I feel changing this method should be fine. If you want to provide backward compatibility, don't serialize isBroadcast flag when set to false. But I agree. It does look inconsistent.

          also affect the checkpoint . See here, and here. This "looks" ok. I just have not looked at it very carefully.

          These changes should be ok since they are internal to samza itself.

          and hope they are not using the sspToString and stringToSsp methods (they are not in the API though).

          Yeah. That is samza-core. We cannot guarantee compatibility if users decide to use internal api directly. I am not concerned about these Util methods.
          But toString is a valid point.

          I am not keen on this flag. I brought it up because I would rather break compatibility when adding a major feature than make the change after the users ask for it.

          We can table this for the time-being. Feel free to commit it

          Show
          navina Navina Ramesh added a comment - 1. the sspToString and stringToSsp methods, those are used to serialize/deserialize the json to pass the JobModel between AM and containers. See here. I feel changing this method should be fine. If you want to provide backward compatibility, don't serialize isBroadcast flag when set to false. But I agree. It does look inconsistent. also affect the checkpoint . See here, and here. This "looks" ok. I just have not looked at it very carefully. These changes should be ok since they are internal to samza itself. and hope they are not using the sspToString and stringToSsp methods (they are not in the API though). Yeah. That is samza-core. We cannot guarantee compatibility if users decide to use internal api directly. I am not concerned about these Util methods. But toString is a valid point. I am not keen on this flag. I brought it up because I would rather break compatibility when adding a major feature than make the change after the users ask for it. We can table this for the time-being. Feel free to commit it
          Hide
          closeuris Yan Fang added a comment -

          Thanks, Navina Ramesh. Committed it. (Without the isBroadcast flag). Will add it if users want it very much.

          Show
          closeuris Yan Fang added a comment - Thanks, Navina Ramesh . Committed it. (Without the isBroadcast flag). Will add it if users want it very much.
          Hide
          navina Navina Ramesh added a comment -

          Works for me!

          Show
          navina Navina Ramesh added a comment - Works for me!

            People

            • Assignee:
              closeuris Yan Fang
              Reporter:
              closeuris Yan Fang
            • Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development