SAMZA-71, the current checkpoint log's use of the job's initial partition count and grouping of checkpoint values limits our ability to support other partition strategies.
This task will change the checkpoint log to
- Not be tied directly to the partition count of the initial input streams of the job. Using the initial count will work well for a default value and is the best choice for jobs that won't have their input stream partition counts change. However, if new streams are added with more partitions, those excess partitions will be hash partitioned into the existing checkpoint log
- Store the checkpointed offsets directly rather than wrapped in a per-task instance map. This will let us change the task grouping strategy after a job has been created.
On startup, each container will read from all the partitions in the checkpoint log for which it has TPs and build the checkpoints from there.
This will be an incompatible change with existing logs.