Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35384

Expose TaskIOMetricGroup to custom Partitioner via init Context

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.9.4
    • None
    • Runtime / Metrics
    • None

    Description

      I am trying to implement a custom range partitioner in the Flink Iceberg sink. Want to publish some counter metrics for certain scenarios. This is like the network metrics exposed in `TaskIOMetricGroup`.

      We can implement the range partitioner using the public interface from `DataStream`.

          public <K> DataStream<T> partitionCustom(
                  Partitioner<K> partitioner, KeySelector<T, K> keySelector)
      

      We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the pubic `Partitioner` interface, where we can implement the custom range partitioner.

      `Partitioner` interface is a functional interface today. we can add a new default `setup` method without breaking the backward compatibility.

      @Public
      @FunctionalInterface
      public interface Partitioner<K> extends java.io.Serializable, Function {
          *default void setup(TaskIOMetricGroup metrics) {}*
          int partition(K key, int numPartitions);
      }
      

      I know public interface requires a FLIP process. will do that if the community agree with this feature request.

      Personally, `numPartitions` should be passed in the `setup` method too. But it is a breaking change that is NOT worth the benefit right now.

      @Public
      @FunctionalInterface
      public interface Partitioner<K> extends java.io.Serializable, Function {
          public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
          int partition(K key);
      }
      

      That would be similar to `StreamPartitioner#setup()` method that we would need to modify for passing the metrics group.

      @Internal
      public abstract class StreamPartitioner<T>
              implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
          @Override
          public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
              this.numberOfChannels = numberOfChannels;
          }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            stevenz3wu Steven Zhen Wu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: