Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.13.3, 1.11.6, 1.12.7, 1.14.3
-
None
Description
In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect. Flink treats 'default' and 'round-robin' as the same strategy.
//代码占位符 public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner( ReadableConfig tableOptions, ClassLoader classLoader) { return tableOptions .getOptional(SINK_PARTITIONER) .flatMap( (String partitioner) -> { switch (partitioner) { case SINK_PARTITIONER_VALUE_FIXED: return Optional.of(new FlinkFixedPartitioner<>()); case SINK_PARTITIONER_VALUE_DEFAULT: case SINK_PARTITIONER_VALUE_ROUND_ROBIN: return Optional.empty(); // Default fallback to full class name of the partitioner. default: return Optional.of( initializePartitioner(partitioner, classLoader)); } }); }
They both use kafka's default partitioner, but the actual There are two scenarios for the partition on DefaultPartitioner:
1. Random when there is no key
2. When there is a key, take the modulo according to the key
// org.apache.kafka.clients.producer.internals.DefaultPartitioner public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { // Random when there is no key return stickyPartitionCache.partition(topic, cluster); } List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }
Therefore, KafkaConnector does not have a round-robin strategy.But we can borrow from kafka's RoundRobinPartitioner
//代码占位符 public class RoundRobinPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<String, ?> configs) {} /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { return new AtomicInteger(0); }); return counter.getAndIncrement(); } public void close() {} }