Github user tzulitai commented on the issue:
Nice following the discussions here Let me wrap up the discussion so far:
The old way -
void open(int partitions, int subtaskIndex, int numSubtasks);
int partition(T record, byte key, byte value, int numPartitions);
The (last) proposed new way -
void open(int subtaskIndex, int numSubtask);
int partition(T record, byte key, byte value, String targetTopic, int partitions)
and have an internal cache of partitioner informations: `Map<String, PartitionerInfo>`.
The `PartitionerInfo` can actually just be the partition id array, I don't think we need another wrapper class if we just need a single `FlinkKafkaPartitioner` per subtask for all (including dynamic) topics.
I like the proposal of the new partitioner, as then users do not need to provide multiple partitioners. Just the question with how well this works for the general use case, because then implementations of the new `partition` method need to handle different topics (which probably makes sense because we want to generally treat topics as dynamic anyways ..). The new way can also allow us to handle upscaled target topics in the future.
For migration, for the dummy wrapper delegation, I think we should just mimc the wrong, old behaviour. That was the behaviour it had always been anyways, so we should not try to alter the behaviour if the user is still using the old API. Deprecation and Javadoc message is responsible of pushing them to change to the new API.