Description
We currently provide auto-create for changelog streams. However, when there are more than 1 containers, it is possible that Samza creates a changelog stream with insufficient partitions.
Reason:
assume we have an input stream with 3 partitions and then we assign 3 containers for this job. According to the JobCoordinator, we will get:
Container(Model) | InputStream Partition | Changelog Partition |
---|---|---|
0 | 0 | 0 |
1 | 1 | 1 |
2 | 2 | 2 |
If Container 0 is brought up first, it calls
val maxChangeLogStreamPartitions = containerModel.getTasks.values .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId }) .getChangelogPartition.getPartitionId + 1
in SamzaContainer.
The maxChangeLogStreamPartition is 1. So we will auto-create a changelog stream with only 1 partitions.
Similarly, if the Container 2 is brought up first, we will get a stream with 2 partitions.