From 8604d94b29384a88f5e8cbf6f7f17b045f43e31a Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Mon, 11 Aug 2014 10:48:36 -0700 Subject: [PATCH] add sticky partition support in the new producer --- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../kafka/clients/producer/MockProducer.java | 3 +- .../kafka/clients/producer/ProducerConfig.java | 7 +++ .../clients/producer/internals/Partitioner.java | 54 +++++++++++++++++++--- .../kafka/clients/producer/PartitionerTest.java | 36 +++++++++++---- 5 files changed, 83 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f58b850..1758ec6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -106,7 +106,7 @@ public class KafkaProducer implements Producer { MetricsReporter.class); reporters.add(new JmxReporter(jmxPrefix)); this.metrics = new Metrics(metricConfig, reporters, time); - this.partitioner = new Partitioner(); + this.partitioner = new Partitioner(config.getLong(ProducerConfig.PARTITION_STICKY_MS_CONFIG), this.time); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index c0f1d57..3d235c0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.SystemTime; /** @@ -43,7 +44,7 @@ import org.apache.kafka.common.TopicPartition; public class MockProducer implements Producer { private final Cluster cluster; - private final Partitioner partitioner = new Partitioner(); + private final Partitioner partitioner = new Partitioner(0, new SystemTime()); private final List sent; private final Deque completions; private boolean autoComplete; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index f9de4af..f1904de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -111,6 +111,12 @@ public class ProducerConfig extends AbstractConfig { + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, " + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load."; + /** partition.sticky.ms */ + public static final String PARTITION_STICKY_MS_CONFIG = "partition.sticky.ms"; + private static final String PARTITION_STICKY_MS_DOC = "For messages don't specify the partition or the partition key, the producer will select a partition in a round-robin fashion " + + "but sticky to it for the configured amount of time. A larger value can improve the compression ratio and reduce the number of socket connections to the brokers, " + + "but may not distribute messages as evenly among the partitions."; + /** client.id */ public static final String CLIENT_ID_CONFIG = "client.id"; private static final String CLIENT_ID_DOC = "The id string to pass to the server when making requests. The purpose of this is to be able to track the source " + "of requests beyond just ip/port by allowing a logical application name to be included with the request. The " @@ -180,6 +186,7 @@ public class ProducerConfig extends AbstractConfig { .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) + .define(PARTITION_STICKY_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, PARTITION_STICKY_MS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC) .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC) .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java index 40e8234..b768c34 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java @@ -18,11 +18,13 @@ package org.apache.kafka.clients.producer.internals; import java.util.List; import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -36,6 +38,24 @@ import org.apache.kafka.common.utils.Utils; public class Partitioner { private final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); + private final ConcurrentHashMap lastPartitionMap = new ConcurrentHashMap(); + private final long partitionStickyMs; + private final Time time; + + private static class PartitionAndTime { + public final int partition; + public final long lastPartitionTime; + + public PartitionAndTime(int partition, long lastPartitionTime) { + this.partition = partition; + this.lastPartitionTime = lastPartitionTime; + } + } + + public Partitioner(long partitionStickyMs, Time time) { + this.partitionStickyMs = partitionStickyMs; + this.time = time; + } /** * Compute the partition for the given record. @@ -55,18 +75,38 @@ public class Partitioner { + "]."); return record.partition(); } else if (record.key() == null) { - // choose the next available node in a round-robin fashion - for (int i = 0; i < numPartitions; i++) { - int partition = Utils.abs(counter.getAndIncrement()) % numPartitions; - if (partitions.get(partition).leader() != null) - return partition; + if (partitionStickyMs <= 0) { + // choose the next available node in a round-robin fashion + return nextPartition(numPartitions, partitions); + } else { + // select a partition in a round-robin fashion, but stick to it for some time. + PartitionAndTime lastPartitionAndTime = lastPartitionMap.get(record.topic()); + long now = time.milliseconds(); + if (lastPartitionAndTime == null || (now - lastPartitionAndTime.lastPartitionTime) >= partitionStickyMs) { + lastPartitionAndTime = new PartitionAndTime(nextPartition(numPartitions, partitions), now); + lastPartitionMap.put(record.topic(), lastPartitionAndTime); + } + return lastPartitionAndTime.partition; } - // no partitions are available, give a non-available partition - return Utils.abs(counter.getAndIncrement()) % numPartitions; } else { // hash the key to choose a partition return Utils.abs(Utils.murmur2(record.key())) % numPartitions; } } + /** + * Choose the next available node in a round-robin fashion. + * @param numPartitions total number of partitions + * @param partitions list of partitions + * @return the selected partition + */ + private int nextPartition(int numPartitions, List partitions) { + for (int i = 0; i < numPartitions; i++) { + int partition = Utils.abs(counter.getAndIncrement()) % numPartitions; + if (partitions.get(partition).leader() != null) + return partition; + } + // no partitions are available, give a non-available partition + return Utils.abs(counter.getAndIncrement()) % numPartitions; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java index f06e28c..bd68dd9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java @@ -16,26 +16,25 @@ */ package org.apache.kafka.clients.producer; -import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; - -import java.util.List; - - -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.internals.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.junit.Test; +import java.util.List; + +import static java.util.Arrays.asList; +import static org.junit.Assert.*; + public class PartitionerTest { private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); - private Partitioner partitioner = new Partitioner(); + private Partitioner partitioner = new Partitioner(0, new SystemTime()); private Node node0 = new Node(0, "localhost", 99); private Node node1 = new Node(1, "localhost", 100); private Node node2 = new Node(2, "localhost", 101); @@ -78,4 +77,21 @@ public class PartitionerTest { assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2); } } + + @Test + public void testStickyPartitioner() { + long stickyTimeMs = 1000; + Time mockTime = new MockTime(); + Partitioner stickyPartitioner = new Partitioner(stickyTimeMs, mockTime); + int lastPartition = stickyPartitioner.partition(new ProducerRecord("test", value), cluster); + + mockTime.sleep(500); + assertEquals("Partition shouldn't change with sticky time.", + lastPartition, + stickyPartitioner.partition(new ProducerRecord("test", value), cluster)); + + mockTime.sleep(500); + assertFalse("Partition should change after sticky time.", + lastPartition == stickyPartitioner.partition(new ProducerRecord("test", value), cluster)); + } } -- 1.8.3.4 (Apple Git-47)