From 4c02d99fbccca577b95c81d3df0082626a133857 Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Mon, 2 Mar 2015 10:30:16 -0800 Subject: [PATCH 1/2] Added random offset to client retry backoff. --- .../java/org/apache/kafka/clients/ClientUtils.java | 20 ++++++++++++++++++++ .../apache/kafka/clients/consumer/KafkaConsumer.java | 4 +++- .../apache/kafka/clients/producer/KafkaProducer.java | 3 ++- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index d0da5d7..c2b4906 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -15,6 +15,7 @@ package org.apache.kafka.clients; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Random; import org.apache.kafka.common.config.ConfigException; @@ -45,4 +46,23 @@ public class ClientUtils { throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); return addresses; } + + /** + * Add a small amount of jitter to retry backoff in Kafka clients to help avoid herd effect. + * + * See KAFKA-902. + */ + public static long randomizeBackoff(long retryBackoffMs) { + int jitter = 10; + + if (retryBackoffMs == 0L) { + // 0 is strict "don't back off" - don't add jitter + return 0L; + } else if (Long.MAX_VALUE - retryBackoffMs < jitter) { + // If backoff is huge, avoid overflow and don't bother adding jitter + return retryBackoffMs; + } else { + return retryBackoffMs + new Random().nextInt(jitter); + } + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 67ceb75..fc4551d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -24,6 +24,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -552,7 +553,8 @@ public class KafkaConsumer implements Consumer { MetricsReporter.class); reporters.add(new JmxReporter(jmxPrefix)); Metrics metrics = new Metrics(metricConfig, reporters, time); - this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.retryBackoffMs = ClientUtils.randomizeBackoff( + config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG)); this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), 0); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7397e56..3588b70 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -228,7 +228,8 @@ public class KafkaProducer implements Producer { reporters.add(new JmxReporter(jmxPrefix)); this.metrics = new Metrics(metricConfig, reporters, time); this.partitioner = new Partitioner(); - long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); + long retryBackoffMs = ClientUtils.randomizeBackoff( + config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG)); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); -- 1.9.3 (Apple Git-50) From f2424e55d02b78c378ae7abd07d432f7708282fb Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Mon, 2 Mar 2015 10:55:54 -0800 Subject: [PATCH 2/2] Add simple unit test for ClientUtils.randomizeBackoff --- .../kafka/clients/consumer/KafkaConsumer.java | 1 - .../org/apache/kafka/clients/ClientUtilsTest.java | 23 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index fc4551d..aeb0807 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -24,7 +24,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java index 13ce519..1a1d59d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.config.ConfigException; import org.junit.Test; +import org.junit.Assert; import java.util.Arrays; @@ -39,4 +40,26 @@ public class ClientUtilsTest { private void check(String... url) { ClientUtils.parseAndValidateAddresses(Arrays.asList(url)); } + + @Test + public void testRandomizeBackoff() { + int nTrials = 30; + + // Should do the right thing with large inputs + for (int i = 0; i < nTrials; i++) { + Assert.assertTrue("randomizeBackoff should not cause backoff to roll over.", + ClientUtils.randomizeBackoff(Long.MAX_VALUE) > 0); + } + + // Should map 0 -> 0 + for (int i = 0; i < nTrials; i++) { + Assert.assertEquals("randomizeBackoff(0) -> 0", 0, ClientUtils.randomizeBackoff(0)); + } + + // Should not ever nudge small positive values into negative + for (int i = 0; i < nTrials; i++) { + Assert.assertTrue("Should never return negative value.", + ClientUtils.randomizeBackoff(1L) > 0); + } + } } \ No newline at end of file -- 1.9.3 (Apple Git-50)