Thanks for reviewing, Navina.
I think usecase #3 and #4 are very similar. I have seen many instances of #4 coming up (reg. bootstrap stream) which will work well with global state implementation.
Yes, agreed. Since there are still use case 1 and 2, I am thinking it's still worth implementing this feature. Though we have workaround in mailing list, prefer to provide this out-of-box. It's a "major" not "critical" feature.
a. Do all broadcast streams have only 1 partition?
No. It's just an example. We can have any number of broadcast SSPs as long as users config them.
b. How does this affect the consumer’s messagechooser priority? does it provide more priority to broadcast stream by default ? In general, my question is how will each task proceed at the same rate. We could have hot partitions and those tasks may not react to the broadcast stream at the same time as other tasks.
In current consumer's implementation, there is no way that we can guarantee "each task proceed at the same rate" if more than one tasks are in the same container. Because we only have one consumer per system per container, and messageChooser is per container. So it's not possible that two tasks receive the same messages at the same time.
I do not differentiate the broadcast stream with the normal stream in the consumer's level. So if users want to give the broadcast stream higher priority, they can set the priority config systems.kafka.streams.broadcastStream.samza.priority=2.
c. Is the broadcast stream also intended to make config changes at a task level? Isn’t that a functionality at the JC?
I was following the same config fashion as task.input, so not exactly sure which level it should go. In my opinion, reading this config actually happens in the JobCoordinator class.
3. bq. However, this is the feature we will need for the broadcast stream. Because all the tasks will have the broadcast stream. When more than two tasks are assigned to the same container, the two broadcast streams have different offsets, the consumer needs to consumer the same stream more than once, with different offsets.
> Can you explain this better?
Again, assume there is only one system in the job.
Currently, the consumer has two important methods, register(SSP, offset), and poll() which returns Map<SSP, List>.
So if we have two tasks:
task 1 has stream1-partition0, stream2-partition0, broad-stream-partition0
task 2 has stream1-partition1, stream2-partition1, broad-stream-partition0
If those two tasks are in the same container, the consumer will register all those SSPs (there is only one consumer). Since the consumer only returns a Map, when it returns a <broad-stream-partition0, list> , it can not tell it's for task 1 or task 2. What really will happen is that, it only returns a Map with five keys - stream1-partition0, stream2-partition0, stream1-partition1, stream2-partition1, broad-stream-partition0. So broad-stream-partition0 will only be processed once, either in task 1 or task 2. Therefore, my suggestion is that, when the consumer returns the result, it should also return the taskName information, such as task 1 -> Map, task 2 -> Map. This requires us to change the Consumer API, as well as the Chooser API. Is it a little more clear?
As I also mentioned in the design doc, this change to Consumer API and Chooser API will also help for multiple-partition subscribe. Because when we assign one partition to more than one task, if those tasks are in the same container, we will come across the same problem.
Of course, another way is to have as many consumers as the task number, but this seems not single-thread solution...
Why is partition number needed here? Are you suggesting that the tasks can consume from one partition of the broadcast stream only?
I think if users only want a few partitions, they should contain the partition number.
If I have a broadcast topic with 32 partitions and I want all tasks to consume from all of them, then specifying the config will be tedious.
This is quite interesting. Should we encourage the broadcast topic to have so many partitions ? Because this introduces more complexity in the system - how do we prioritize those 32 partitions in one task ? In the config, we can simply put something like broadcastTopic#all to allow convenient configuration.