From 10813eb60dda828a8f93a862b17662807abf3430 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sun, 5 Apr 2015 17:12:43 -0700 Subject: [PATCH] KAFKA-2091. Expose a Partitioner interface in the new producer. --- .../kafka/clients/producer/KafkaProducer.java | 3 +- .../kafka/clients/producer/MockProducer.java | 4 +- .../apache/kafka/clients/producer/Partitioner.java | 39 ++++++++++ .../kafka/clients/producer/ProducerConfig.java | 6 ++ .../producer/internals/DefaultPartitioner.java | 90 ++++++++++++++++++++++ .../clients/producer/internals/Partitioner.java | 89 --------------------- .../producer/internals/PartitionerTest.java | 7 +- 7 files changed, 142 insertions(+), 96 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java delete mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index ab26342..01b3929 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; -import org.apache.kafka.clients.producer.internals.Partitioner; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.common.Cluster; @@ -205,7 +204,7 @@ public class KafkaProducer implements Producer { MetricsReporter.class); reporters.add(new JmxReporter(jmxPrefix)); this.metrics = new Metrics(metricConfig, reporters, time); - this.partitioner = new Partitioner(); + this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 6913090..693b356 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; -import org.apache.kafka.clients.producer.internals.Partitioner; +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.clients.producer.internals.ProduceRequestResult; import org.apache.kafka.common.*; @@ -40,7 +40,7 @@ import org.apache.kafka.common.*; public class MockProducer implements Producer { private final Cluster cluster; - private final Partitioner partitioner = new Partitioner(); + private final Partitioner partitioner = new DefaultPartitioner(); private final List> sent; private final Deque completions; private boolean autoComplete; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java new file mode 100644 index 0000000..3128d6d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import org.apache.kafka.common.Cluster; + + +/** + * A plugin interface to allow customer paritioner class + * @see DefaultPartitioner + */ +public interface Partitioner { + + + /** + * Compute the partition for the given record. + * + * @param topic The topic name + * @param key The key to partition on (or null if no key) + * @param partition The partition to use (or null if none) + * @param cluster The current cluster metadata + */ + public int partition(String topic, byte[] key, Integer partition, Cluster cluster); + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index fa9daae..7951398 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -109,6 +109,11 @@ public class ProducerConfig extends AbstractConfig { + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, " + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load."; + /** partitioner.class */ + public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; + private static final String PARTITIONER_CLASS_DOC = "Partitioner class is an implementation of Partitioner interface. " + + "It provides partitioning implementation based on given key and partition. This setting defaults to org.apache.kafka.producer.DefaultPartitioner."; + /** client.id */ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; @@ -184,6 +189,7 @@ public class ProducerConfig extends AbstractConfig { .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) + .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java new file mode 100644 index 0000000..e8cca10 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.clients.producer.Partitioner; + +/** + * The default partitioning strategy: + *
    + *
  • If a partition is specified in the record, use it + *
  • If no partition is specified but a key is present choose a partition based on a hash of the key + *
  • If no partition or key is present choose a partition in a round-robin fashion + */ +public class DefaultPartitioner implements Partitioner { + + private final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); + + /** + * A cheap way to deterministically convert a number to a positive value. When the input is + * positive, the original value is returned. When the input number is negative, the returned + * positive value is the original value bit AND against 0x7fffffff which is not its absolutely + * value. + * + * Note: changing this method in the future will possibly cause partition selection not to be + * compatible with the existing messages already placed on a partition. + * + * @param number a given number + * @return a positive number. + */ + private static int toPositive(int number) { + return number & 0x7fffffff; + } + + /** + * Compute the partition for the given record. + * + * @param topic The topic name + * @param key The key to partition on (or null if no key) + * @param partition The partition to use (or null if none) + * @param cluster The current cluster metadata + */ + public int partition(String topic, byte[] key, Integer partition, Cluster cluster) { + List partitions = cluster.partitionsForTopic(topic); + int numPartitions = partitions.size(); + if (partition != null) { + // they have given us a partition, use it + if (partition < 0 || partition >= numPartitions) + throw new IllegalArgumentException("Invalid partition given with record: " + partition + + " is not in the range [0..." + + numPartitions + + "]."); + return partition; + } else if (key == null) { + int nextValue = counter.getAndIncrement(); + List availablePartitions = cluster.availablePartitionsForTopic(topic); + if (availablePartitions.size() > 0) { + int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); + return availablePartitions.get(part).partition(); + } else { + // no partitions are available, give a non-available partition + return DefaultPartitioner.toPositive(nextValue) % numPartitions; + } + } else { + // hash the key to choose a partition + return DefaultPartitioner.toPositive(Utils.murmur2(key)) % numPartitions; + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java deleted file mode 100644 index 93e7991..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.producer.internals; - -import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.utils.Utils; - -/** - * The default partitioning strategy: - *
      - *
    • If a partition is specified in the record, use it - *
    • If no partition is specified but a key is present choose a partition based on a hash of the key - *
    • If no partition or key is present choose a partition in a round-robin fashion - */ -public class Partitioner { - - private final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); - - /** - * A cheap way to deterministically convert a number to a positive value. When the input is - * positive, the original value is returned. When the input number is negative, the returned - * positive value is the original value bit AND against 0x7fffffff which is not its absolutely - * value. - * - * Note: changing this method in the future will possibly cause partition selection not to be - * compatible with the existing messages already placed on a partition. - * - * @param number a given number - * @return a positive number. - */ - private static int toPositive(int number) { - return number & 0x7fffffff; - } - - /** - * Compute the partition for the given record. - * - * @param topic The topic name - * @param key The key to partition on (or null if no key) - * @param partition The partition to use (or null if none) - * @param cluster The current cluster metadata - */ - public int partition(String topic, byte[] key, Integer partition, Cluster cluster) { - List partitions = cluster.partitionsForTopic(topic); - int numPartitions = partitions.size(); - if (partition != null) { - // they have given us a partition, use it - if (partition < 0 || partition >= numPartitions) - throw new IllegalArgumentException("Invalid partition given with record: " + partition - + " is not in the range [0..." - + numPartitions - + "]."); - return partition; - } else if (key == null) { - int nextValue = counter.getAndIncrement(); - List availablePartitions = cluster.availablePartitionsForTopic(topic); - if (availablePartitions.size() > 0) { - int part = Partitioner.toPositive(nextValue) % availablePartitions.size(); - return availablePartitions.get(part).partition(); - } else { - // no partitions are available, give a non-available partition - return Partitioner.toPositive(nextValue) % numPartitions; - } - } else { - // hash the key to choose a partition - return Partitioner.toPositive(Utils.murmur2(key)) % numPartitions; - } - } - -} diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java index 5dadd0e..7c7542f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -18,6 +18,7 @@ import static org.junit.Assert.assertTrue; import java.util.List; +import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -26,7 +27,7 @@ import org.junit.Test; public class PartitionerTest { private byte[] key = "key".getBytes(); - private Partitioner partitioner = new Partitioner(); + private Partitioner partitioner = new DefaultPartitioner(); private Node node0 = new Node(0, "localhost", 99); private Node node1 = new Node(1, "localhost", 100); private Node node2 = new Node(2, "localhost", 101); -- 1.9.5 (Apple Git-50.3)