Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26033

In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

    XMLWordPrintableJSON

Details

    Description

      In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect. Flink treats 'default' and 'round-robin' as the same strategy.

      //代码占位符
      public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
              ReadableConfig tableOptions, ClassLoader classLoader) {
          return tableOptions
                  .getOptional(SINK_PARTITIONER)
                  .flatMap(
                          (String partitioner) -> {
                              switch (partitioner) {
                                  case SINK_PARTITIONER_VALUE_FIXED:
                                      return Optional.of(new FlinkFixedPartitioner<>());
                                  case SINK_PARTITIONER_VALUE_DEFAULT:
                                  case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
                                      return Optional.empty();
                                      // Default fallback to full class name of the partitioner.
                                  default:
                                      return Optional.of(
                                              initializePartitioner(partitioner, classLoader));
                              }
                          });
      } 

      They both use kafka's default partitioner, but the actual There are two scenarios for the partition on DefaultPartitioner:
      1. Random when there is no key
      2. When there is a key, take the modulo according to the key

      // org.apache.kafka.clients.producer.internals.DefaultPartitioner
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
          if (keyBytes == null) {
              // Random when there is no key        
              return stickyPartitionCache.partition(topic, cluster);
          } 
          List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
          int numPartitions = partitions.size();
          // hash the keyBytes to choose a partition
          return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
      } 

      Therefore, KafkaConnector does not have a round-robin strategy.But we can borrow from kafka's RoundRobinPartitioner

      //代码占位符
      public class RoundRobinPartitioner implements Partitioner {
          private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
      
          public void configure(Map<String, ?> configs) {}
      
          /**
           * Compute the partition for the given record.
           *
           * @param topic The topic name
           * @param key The key to partition on (or null if no key)
           * @param keyBytes serialized key to partition on (or null if no key)
           * @param value The value to partition on or null
           * @param valueBytes serialized value to partition on or null
           * @param cluster The current cluster metadata
           */
          @Override
          public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
              List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
              int numPartitions = partitions.size();
              int nextValue = nextValue(topic);
              List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
              if (!availablePartitions.isEmpty()) {
                  int part = Utils.toPositive(nextValue) % availablePartitions.size();
                  return availablePartitions.get(part).partition();
              } else {
                  // no partitions are available, give a non-available partition
                  return Utils.toPositive(nextValue) % numPartitions;
              }
          }
      
          private int nextValue(String topic) {
              AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
                  return new AtomicInteger(0);
              });
              return counter.getAndIncrement();
          }
      
          public void close() {}
      
      } 

      Attachments

        Activity

          People

            tinny shizhengchao
            tinny shizhengchao
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: