From b35ec6fbb11d140424eece0ab8118879012eec15 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 27 May 2015 15:32:24 -0700 Subject: [PATCH] KAFKA-2091. Expose a Partitioner interface in the new producer. --- .../kafka/clients/producer/KafkaProducer.java | 82 ++++++++++++-------- .../kafka/clients/producer/MockProducer.java | 40 +++++++--- .../apache/kafka/clients/producer/Partitioner.java | 46 +++++++++++ .../kafka/clients/producer/ProducerConfig.java | 14 +++- .../producer/internals/DefaultPartitioner.java | 89 ++++++++++++++++++++++ .../clients/producer/internals/Partitioner.java | 89 ---------------------- .../producer/internals/DefaultPartitionerTest.java | 63 +++++++++++++++ .../producer/internals/PartitionerTest.java | 68 ----------------- 8 files changed, 289 insertions(+), 202 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java delete mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java delete mode 100644 clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 8e336a3..ded19d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; -import org.apache.kafka.clients.producer.internals.Partitioner; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.common.Cluster; @@ -73,11 +72,11 @@ import org.slf4j.LoggerFactory; * props.put("buffer.memory", 33554432); * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - * + * * Producer producer = new KafkaProducer(props); * for(int i = 0; i < 100; i++) * producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i))); - * + * * producer.close(); * } *

@@ -92,25 +91,25 @@ import org.slf4j.LoggerFactory; * we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. *

* If the request fails, the producer can automatically retry, though since we have specified retries - * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on + * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on * message delivery semantics for details). *

- * The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by + * The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by * the batch.size config. Making this larger can result in more batching, but requires more memory (since we will * generally have one of these buffers for each active partition). *

- * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you + * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you * want to reduce the number of requests you can set linger.ms to something greater than 0. This will - * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will - * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, - * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting - * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that - * records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load + * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will + * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, + * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting + * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that + * records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load * batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more * efficient requests when not under maximal load at the cost of a small amount of latency. *

* The buffer.memory controls the total amount of memory available to the producer for buffering. If records - * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is + * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is * exhausted additional send calls will block. For uses where you want to avoid any blocking you can set block.on.buffer.full=false which * will cause the send call to result in an exception. *

@@ -207,7 +206,7 @@ public class KafkaProducer implements Producer { MetricsReporter.class); reporters.add(new JmxReporter(jmxPrefix)); this.metrics = new Metrics(metricConfig, reporters, time); - this.partitioner = new Partitioner(); + this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); @@ -285,7 +284,7 @@ public class KafkaProducer implements Producer { } /** - * Asynchronously send a record to a topic. Equivalent to send(record, null). + * Asynchronously send a record to a topic. Equivalent to send(record, null). * See {@link #send(ProducerRecord, Callback)} for details. */ @Override @@ -309,7 +308,7 @@ public class KafkaProducer implements Producer { * or throw any exception that occurred while sending the record. *

* If you want to simulate a simple blocking call you can call the get() method immediately: - * + * *

      * {@code
      * byte[] key = "key".getBytes();
@@ -320,7 +319,7 @@ public class KafkaProducer implements Producer {
      * 

* Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. - * + * *

      * {@code
      * ProducerRecord record = new ProducerRecord("the-topic", key, value);
@@ -334,10 +333,10 @@ public class KafkaProducer implements Producer {
      *               });
      * }
      * 
- * + * * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example callback1 is guaranteed to execute before callback2: - * + * *
      * {@code
      * producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
@@ -349,15 +348,15 @@ public class KafkaProducer implements Producer {
      * they will delay the sending of messages from other threads. If you want to execute blocking or computationally
      * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
      * to parallelize processing.
-     * 
+     *
      * @param record The record to send
      * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
      *        indicates no callback)
-     *        
+     *
      * @throws InterruptException If the thread is interrupted while blocked
      * @throws SerializationException If the key or value are not valid objects given the configured serializers
      * @throws BufferExhaustedException If block.on.buffer.full=false and the buffer is full.
-     * 
+     *
      */
     @Override
     public Future send(ProducerRecord record, Callback callback) {
@@ -380,7 +379,7 @@ public class KafkaProducer implements Producer {
                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                         " specified in value.serializer");
             }
-            int partition = partitioner.partition(record.topic(), serializedKey, record.partition(), metadata.fetch());
+            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
@@ -452,12 +451,12 @@ public class KafkaProducer implements Producer {
                                               ProducerConfig.BUFFER_MEMORY_CONFIG +
                                               " configuration.");
     }
-    
+
     /**
-     * Invoking this method makes all buffered records immediately available to send (even if linger.ms is 
+     * Invoking this method makes all buffered records immediately available to send (even if linger.ms is
      * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
-     * of flush() is that any previously sent record will have completed (e.g. Future.isDone() == true). 
-     * A request is considered completed when it is successfully acknowledged 
+     * of flush() is that any previously sent record will have completed (e.g. Future.isDone() == true).
+     * A request is considered completed when it is successfully acknowledged
      * according to the acks configuration you have specified or else it results in an error.
      * 

* Other threads can continue sending records while one thread is blocked waiting for a flush call to complete, @@ -475,10 +474,10 @@ public class KafkaProducer implements Producer { * consumer.commit(); * } *

- * + * * Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur * we need to set retries=<large_number> in our config. - * + * * @throws InterruptException If the thread is interrupted while blocked */ @Override @@ -550,7 +549,7 @@ public class KafkaProducer implements Producer { public void close(long timeout, TimeUnit timeUnit) { close(timeout, timeUnit, false); } - + private void close(long timeout, TimeUnit timeUnit, boolean swallowException) { if (timeout < 0) throw new IllegalArgumentException("The timeout cannot be negative."); @@ -600,6 +599,27 @@ public class KafkaProducer implements Producer { throw new KafkaException("Failed to close kafka producer", firstException.get()); } + /** + * computes partition for given record. + * if the record has partition returns the value otherwise + * calls configured partitioner class to compute the partition. + */ + private int partition(ProducerRecord record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) { + Integer partition = record.partition(); + if (partition != null) { + List partitions = cluster.partitionsForTopic(record.topic()); + int numPartitions = partitions.size(); + // they have given us a partition, use it + if (partition < 0 || partition >= numPartitions) + throw new IllegalArgumentException("Invalid partition given with record: " + partition + + " is not in the range [0..." + + numPartitions + + "]."); + return partition; + } + return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); + } + private static class FutureFailure implements Future { private final ExecutionException exception; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 3c34610..e66491c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -27,7 +27,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; -import org.apache.kafka.clients.producer.internals.Partitioner; +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.clients.producer.internals.ProduceRequestResult; import org.apache.kafka.common.*; @@ -41,7 +41,7 @@ import org.apache.kafka.common.*; public class MockProducer implements Producer { private final Cluster cluster; - private final Partitioner partitioner = new Partitioner(); + private final Partitioner partitioner = new DefaultPartitioner(); private final List> sent; private final Deque completions; private boolean autoComplete; @@ -49,7 +49,7 @@ public class MockProducer implements Producer { /** * Create a mock producer - * + * * @param cluster The cluster holding metadata for this producer * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise * the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after @@ -66,7 +66,7 @@ public class MockProducer implements Producer { /** * Create a new mock producer with invented metadata the given autoComplete setting. - * + * * Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)} */ public MockProducer(boolean autoComplete) { @@ -75,7 +75,7 @@ public class MockProducer implements Producer { /** * Create a new auto completing mock producer - * + * * Equivalent to {@link #MockProducer(boolean) new MockProducer(true)} */ public MockProducer() { @@ -94,14 +94,14 @@ public class MockProducer implements Producer { /** * Adds the record to the list of sent records. - * + * * @see #history() */ @Override public synchronized Future send(ProducerRecord record, Callback callback) { int partition = 0; if (this.cluster.partitionsForTopic(record.topic()) != null) - partition = partitioner.partition(record.topic(), record.key(), record.partition(), this.cluster); + partition = partition(record, this.cluster); ProduceRequestResult result = new ProduceRequestResult(); FutureRecordMetadata future = new FutureRecordMetadata(result, 0); TopicPartition topicPartition = new TopicPartition(record.topic(), partition); @@ -129,7 +129,7 @@ public class MockProducer implements Producer { return offset; } } - + public synchronized void flush() { while (!this.completions.isEmpty()) completeNext(); @@ -168,7 +168,7 @@ public class MockProducer implements Producer { /** * Complete the earliest uncompleted call successfully. - * + * * @return true if there was an uncompleted call to complete */ public synchronized boolean completeNext() { @@ -177,7 +177,7 @@ public class MockProducer implements Producer { /** * Complete the earliest uncompleted call with the given error. - * + * * @return true if there was an uncompleted call to complete */ public synchronized boolean errorNext(RuntimeException e) { @@ -190,6 +190,26 @@ public class MockProducer implements Producer { } } + /** + * computes partition for given record. + */ + private int partition(ProducerRecord record, Cluster cluster) { + Integer partition = record.partition(); + if (partition != null) { + List partitions = cluster.partitionsForTopic(record.topic()); + int numPartitions = partitions.size(); + // they have given us a partition, use it + if (partition < 0 || partition >= numPartitions) + throw new IllegalArgumentException("Invalid partition given with record: " + partition + + " is not in the range [0..." + + numPartitions + + "]."); + return partition; + } + return this.partitioner.partition(record.topic(), null, record.key(), null, record.value(), cluster); + } + + private static class Completion { private final long offset; private final RecordMetadata metadata; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java new file mode 100644 index 0000000..383619d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.Cluster; + +/** + * Partitioner Interface + */ + +public interface Partitioner extends Configurable { + + /** + * Compute the partition for the given record. + * + * @param topic The topic name + * @param key The key to partition on (or null if no key) + * @param keyBytes The serialized key to partition on( or null if no key) + * @param value The value to partition on or null + * @param valueBytes The serialized value to partition on or null + * @param cluster The current cluster metadata + */ + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); + + /** + * This is called when partitioner is closed. + */ + public void close(); + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 187d000..023bd2e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -51,7 +51,7 @@ public class ProducerConfig extends AbstractConfig { /** metadata.max.age.ms */ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; - + /** batch.size */ public static final String BATCH_SIZE_CONFIG = "batch.size"; private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the " @@ -169,6 +169,11 @@ public class ProducerConfig extends AbstractConfig { public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the Serializer interface."; + /** partitioner.class */ + public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; + private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the Partitioner interface."; + + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -217,7 +222,8 @@ public class ProducerConfig extends AbstractConfig { Importance.LOW, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); + .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) + .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC); } public static Map addSerializerToConfig(Map configs, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java new file mode 100644 index 0000000..f81c496 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.Utils; + +/** + * The default partitioning strategy: + *
    + *
  • If a partition is specified in the record, use it + *
  • If no partition is specified but a key is present choose a partition based on a hash of the key + *
  • If no partition or key is present choose a partition in a round-robin fashion + */ +public class DefaultPartitioner implements Partitioner { + + private final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); + + /** + * A cheap way to deterministically convert a number to a positive value. When the input is + * positive, the original value is returned. When the input number is negative, the returned + * positive value is the original value bit AND against 0x7fffffff which is not its absolutely + * value. + * + * Note: changing this method in the future will possibly cause partition selection not to be + * compatible with the existing messages already placed on a partition. + * + * @param number a given number + * @return a positive number. + */ + private static int toPositive(int number) { + return number & 0x7fffffff; + } + + public void configure(Map configs) {} + + /** + * Compute the partition for the given record. + * + * @param topic The topic name + * @param key The key to partition on (or null if no key) + * @param keyBytes serialized key to partition on (or null if no key) + * @param value The value to partition on or null + * @param valueBytes serialized value to partition on or null + * @param cluster The current cluster metadata + */ + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + List partitions = cluster.partitionsForTopic(topic); + int numPartitions = partitions.size(); + if (keyBytes == null) { + int nextValue = counter.getAndIncrement(); + List availablePartitions = cluster.availablePartitionsForTopic(topic); + if (availablePartitions.size() > 0) { + int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); + return availablePartitions.get(part).partition(); + } else { + // no partitions are available, give a non-available partition + return DefaultPartitioner.toPositive(nextValue) % numPartitions; + } + } else { + // hash the keyBytes to choose a partition + return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions; + } + } + + public void close() {} + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java deleted file mode 100644 index 93e7991..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.producer.internals; - -import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.utils.Utils; - -/** - * The default partitioning strategy: - *
      - *
    • If a partition is specified in the record, use it - *
    • If no partition is specified but a key is present choose a partition based on a hash of the key - *
    • If no partition or key is present choose a partition in a round-robin fashion - */ -public class Partitioner { - - private final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); - - /** - * A cheap way to deterministically convert a number to a positive value. When the input is - * positive, the original value is returned. When the input number is negative, the returned - * positive value is the original value bit AND against 0x7fffffff which is not its absolutely - * value. - * - * Note: changing this method in the future will possibly cause partition selection not to be - * compatible with the existing messages already placed on a partition. - * - * @param number a given number - * @return a positive number. - */ - private static int toPositive(int number) { - return number & 0x7fffffff; - } - - /** - * Compute the partition for the given record. - * - * @param topic The topic name - * @param key The key to partition on (or null if no key) - * @param partition The partition to use (or null if none) - * @param cluster The current cluster metadata - */ - public int partition(String topic, byte[] key, Integer partition, Cluster cluster) { - List partitions = cluster.partitionsForTopic(topic); - int numPartitions = partitions.size(); - if (partition != null) { - // they have given us a partition, use it - if (partition < 0 || partition >= numPartitions) - throw new IllegalArgumentException("Invalid partition given with record: " + partition - + " is not in the range [0..." - + numPartitions - + "]."); - return partition; - } else if (key == null) { - int nextValue = counter.getAndIncrement(); - List availablePartitions = cluster.availablePartitionsForTopic(topic); - if (availablePartitions.size() > 0) { - int part = Partitioner.toPositive(nextValue) % availablePartitions.size(); - return availablePartitions.get(part).partition(); - } else { - // no partitions are available, give a non-available partition - return Partitioner.toPositive(nextValue) % numPartitions; - } - } else { - // hash the key to choose a partition - return Partitioner.toPositive(Utils.murmur2(key)) % numPartitions; - } - } - -} diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java new file mode 100644 index 0000000..977fa93 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.junit.Test; + +public class DefaultPartitionerTest { + private byte[] keyBytes = "key".getBytes(); + private Partitioner partitioner = new DefaultPartitioner(); + private Node node0 = new Node(0, "localhost", 99); + private Node node1 = new Node(1, "localhost", 100); + private Node node2 = new Node(2, "localhost", 101); + private Node[] nodes = new Node[] {node0, node1, node2}; + private String topic = "test"; + // Intentionally make the partition list not in partition order to test the edge cases. + private List partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes), + new PartitionInfo(topic, 2, node1, nodes, nodes), + new PartitionInfo(topic, 0, node0, nodes, nodes)); + private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions); + + @Test + public void testKeyPartitionIsStable() { + int partition = partitioner.partition("test", null, keyBytes, null, null, cluster); + assertEquals("Same key should yield same partition", partition, partitioner.partition("test", null, keyBytes, null, null, cluster)); + } + + @Test + public void testRoundRobinWithUnavailablePartitions() { + // When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition, + // and (2) the available partitions are selected in a round robin way. + int countForPart0 = 0; + int countForPart2 = 0; + for (int i = 1; i <= 100; i++) { + int part = partitioner.partition("test", null, null, null, null, cluster); + assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2); + if (part == 0) + countForPart0++; + else + countForPart2++; + } + assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java deleted file mode 100644 index 5dadd0e..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.producer.internals; - -import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.List; - -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.junit.Test; - -public class PartitionerTest { - - private byte[] key = "key".getBytes(); - private Partitioner partitioner = new Partitioner(); - private Node node0 = new Node(0, "localhost", 99); - private Node node1 = new Node(1, "localhost", 100); - private Node node2 = new Node(2, "localhost", 101); - private Node[] nodes = new Node[] {node0, node1, node2}; - private String topic = "test"; - // Intentionally make the partition list not in partition order to test the edge cases. - private List partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes), - new PartitionInfo(topic, 2, node1, nodes, nodes), - new PartitionInfo(topic, 0, node0, nodes, nodes)); - private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions); - - @Test - public void testUserSuppliedPartitioning() { - assertEquals("If the user supplies a partition we should use it.", 0, partitioner.partition("test", key, 0, cluster)); - } - - @Test - public void testKeyPartitionIsStable() { - int partition = partitioner.partition("test", key, null, cluster); - assertEquals("Same key should yield same partition", partition, partitioner.partition("test", key, null, cluster)); - } - - @Test - public void testRoundRobinWithUnavailablePartitions() { - // When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition, - // and (2) the available partitions are selected in a round robin way. - int countForPart0 = 0; - int countForPart2 = 0; - for (int i = 1; i <= 100; i++) { - int part = partitioner.partition("test", null, null, cluster); - assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2); - if (part == 0) - countForPart0++; - else - countForPart2++; - } - assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2); - } -} -- 2.3.2 (Apple Git-55)