Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.9.4
-
None
-
None
Description
I am trying to implement a custom range partitioner in the Flink Iceberg sink. Want to publish some counter metrics for certain scenarios. This is like the network metrics exposed in `TaskIOMetricGroup`.
We can implement the range partitioner using the public interface from `DataStream`.
public <K> DataStream<T> partitionCustom(
Partitioner<K> partitioner, KeySelector<T, K> keySelector)
We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the pubic `Partitioner` interface, where we can implement the custom range partitioner.
`Partitioner` interface is a functional interface today. we can add a new default `setup` method without breaking the backward compatibility.
@Public @FunctionalInterface public interface Partitioner<K> extends java.io.Serializable, Function { *default void setup(TaskIOMetricGroup metrics) {}* int partition(K key, int numPartitions); }
I know public interface requires a FLIP process. will do that if the community agree with this feature request.
Personally, `numPartitions` should be passed in the `setup` method too. But it is a breaking change that is NOT worth the benefit right now.
@Public @FunctionalInterface public interface Partitioner<K> extends java.io.Serializable, Function { public void setup(int numPartitions, TaskIOMetricGroup metrics) {} int partition(K key); }
That would be similar to `StreamPartitioner#setup()` method that we would need to modify for passing the metrics group.
@Internal public abstract class StreamPartitioner<T> implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable { @Override public void setup(int numberOfChannels, TaskIOMetricGroup metrics) { this.numberOfChannels = numberOfChannels; }