From ee9f1b435f2f1b1be804d786c53d386a73e6c615 Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Mon, 2 Mar 2015 10:30:16 -0800 Subject: [PATCH 1/3] Added random offset to client retry backoff. --- .../java/org/apache/kafka/clients/ClientUtils.java | 20 ++++++++++++++++++++ .../apache/kafka/clients/consumer/KafkaConsumer.java | 4 +++- .../apache/kafka/clients/producer/KafkaProducer.java | 3 ++- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index d0da5d7..c2b4906 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -15,6 +15,7 @@ package org.apache.kafka.clients; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Random; import org.apache.kafka.common.config.ConfigException; @@ -45,4 +46,23 @@ public class ClientUtils { throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); return addresses; } + + /** + * Add a small amount of jitter to retry backoff in Kafka clients to help avoid herd effect. + * + * See KAFKA-902. + */ + public static long randomizeBackoff(long retryBackoffMs) { + int jitter = 10; + + if (retryBackoffMs == 0L) { + // 0 is strict "don't back off" - don't add jitter + return 0L; + } else if (Long.MAX_VALUE - retryBackoffMs < jitter) { + // If backoff is huge, avoid overflow and don't bother adding jitter + return retryBackoffMs; + } else { + return retryBackoffMs + new Random().nextInt(jitter); + } + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 67ceb75..fc4551d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -24,6 +24,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -552,7 +553,8 @@ public class KafkaConsumer implements Consumer { MetricsReporter.class); reporters.add(new JmxReporter(jmxPrefix)); Metrics metrics = new Metrics(metricConfig, reporters, time); - this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.retryBackoffMs = ClientUtils.randomizeBackoff( + config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG)); this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), 0); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7397e56..3588b70 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -228,7 +228,8 @@ public class KafkaProducer implements Producer { reporters.add(new JmxReporter(jmxPrefix)); this.metrics = new Metrics(metricConfig, reporters, time); this.partitioner = new Partitioner(); - long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); + long retryBackoffMs = ClientUtils.randomizeBackoff( + config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG)); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); -- 1.9.3 (Apple Git-50) From 7413a2e6b676a9f81eb32f1e8ef90afaea01dbc0 Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Mon, 2 Mar 2015 10:55:54 -0800 Subject: [PATCH 2/3] Add simple unit test for ClientUtils.randomizeBackoff --- .../kafka/clients/consumer/KafkaConsumer.java | 1 - .../org/apache/kafka/clients/ClientUtilsTest.java | 23 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index fc4551d..aeb0807 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -24,7 +24,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java index 13ce519..1a1d59d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.config.ConfigException; import org.junit.Test; +import org.junit.Assert; import java.util.Arrays; @@ -39,4 +40,26 @@ public class ClientUtilsTest { private void check(String... url) { ClientUtils.parseAndValidateAddresses(Arrays.asList(url)); } + + @Test + public void testRandomizeBackoff() { + int nTrials = 30; + + // Should do the right thing with large inputs + for (int i = 0; i < nTrials; i++) { + Assert.assertTrue("randomizeBackoff should not cause backoff to roll over.", + ClientUtils.randomizeBackoff(Long.MAX_VALUE) > 0); + } + + // Should map 0 -> 0 + for (int i = 0; i < nTrials; i++) { + Assert.assertEquals("randomizeBackoff(0) -> 0", 0, ClientUtils.randomizeBackoff(0)); + } + + // Should not ever nudge small positive values into negative + for (int i = 0; i < nTrials; i++) { + Assert.assertTrue("Should never return negative value.", + ClientUtils.randomizeBackoff(1L) > 0); + } + } } \ No newline at end of file -- 1.9.3 (Apple Git-50) From b5d153e5bf155ca7cad0588e054390e6f41089d7 Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Tue, 3 Mar 2015 16:50:56 -0800 Subject: [PATCH 3/3] Added config for backoff jitter; added randomization to a few more client parameters; updated randomization function. --- .../java/org/apache/kafka/clients/ClientUtils.java | 27 ++++++++++++++-------- .../apache/kafka/clients/CommonClientConfigs.java | 3 +++ .../kafka/clients/consumer/ConsumerConfig.java | 17 +++++++++++++- .../kafka/clients/consumer/KafkaConsumer.java | 13 +++++++---- .../kafka/clients/producer/KafkaProducer.java | 14 +++++++---- .../kafka/clients/producer/ProducerConfig.java | 11 +++++++-- .../org/apache/kafka/clients/ClientUtilsTest.java | 21 +++++++++++++---- 7 files changed, 80 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index c2b4906..1115e1c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -48,21 +48,28 @@ public class ClientUtils { } /** - * Add a small amount of jitter to retry backoff in Kafka clients to help avoid herd effect. - * + * Used to add a small amount of jitter to various timing parameters such as producer + * retryBackoffMs. Helps mitigate herd effect. + * * See KAFKA-902. */ - public static long randomizeBackoff(long retryBackoffMs) { - int jitter = 10; - - if (retryBackoffMs == 0L) { + public static long addTimingJitter(long timeMs, int jitterMs) { + if (jitterMs == 0 || timeMs == 0L) { // 0 is strict "don't back off" - don't add jitter - return 0L; - } else if (Long.MAX_VALUE - retryBackoffMs < jitter) { + return timeMs; + } else if (Long.MAX_VALUE - timeMs < jitterMs) { // If backoff is huge, avoid overflow and don't bother adding jitter - return retryBackoffMs; + return timeMs; } else { - return retryBackoffMs + new Random().nextInt(jitter); + // Roughly scale upper bound on jitter with timeMs, but make sure there is some jitter + // no matter what. + jitterMs = Math.max(3, Math.min(jitterMs, (int) (0.1 * timeMs))); + Random random = new Random(); + int offset = random.nextInt(jitterMs); + boolean negativeOffset = random.nextBoolean() && timeMs - offset >= 0; + offset *= negativeOffset ? -1 : 1; + + return timeMs + offset; } } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 06fcfe6..f6d0a20 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -46,6 +46,9 @@ public class CommonClientConfigs { public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop."; + public static final String JITTER_MS_CONFIG = "timing.jitter.ms"; + public static final String JITTER_MS_DOC = "A random offset is added to various timing parameters such as client retry backoff to mitigate herd effect. This config places an upper bound on how much any given timing parameter will be offset."; + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; public static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The number of samples maintained to compute metrics."; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 5fb2100..0e905a1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -119,6 +119,11 @@ public class ConsumerConfig extends AbstractConfig { public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; /** + * timing.jitter.ms + */ + public static final String JITTER_MS_CONFIG = CommonClientConfigs.JITTER_MS_CONFIG; + + /** * metrics.sample.window.ms */ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; @@ -158,7 +163,11 @@ public class ConsumerConfig extends AbstractConfig { Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) - .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC) + .define(GROUP_ID_CONFIG, + Type.STRING, + "", + Importance.HIGH, + GROUP_ID_DOC) .define(SESSION_TIMEOUT_MS_CONFIG, Type.LONG, 30000, @@ -233,6 +242,12 @@ public class ConsumerConfig extends AbstractConfig { atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) + .define(JITTER_MS_CONFIG, + Type.INT, + 10, + atLeast(0), + Importance.LOW, + CommonClientConfigs.JITTER_MS_CONFIG) .define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "latest", diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index aeb0807..c6a0119 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -389,6 +389,7 @@ public class KafkaConsumer implements Consumer { private final Metadata metadata; private final Heartbeat heartbeat; private final NetworkClient client; + private final int jitterMs; private final int maxWaitMs; private final int minBytes; private final int fetchSize; @@ -526,6 +527,7 @@ public class KafkaConsumer implements Consumer { else this.rebalanceCallback = callback; this.time = new SystemTime(); + this.jitterMs = config.getInt(ConsumerConfig.JITTER_MS_CONFIG); this.maxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); this.minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG); this.fetchSize = config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG); @@ -552,16 +554,19 @@ public class KafkaConsumer implements Consumer { MetricsReporter.class); reporters.add(new JmxReporter(jmxPrefix)); Metrics metrics = new Metrics(metricConfig, reporters, time); - this.retryBackoffMs = ClientUtils.randomizeBackoff( - config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG)); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); + this.retryBackoffMs = ClientUtils.addTimingJitter( + config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG), this.jitterMs); + this.metadata = new Metadata( + this.retryBackoffMs, + ClientUtils.addTimingJitter(config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), this.jitterMs)); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), 0); String metricsGroup = "consumer"; Map metricsTags = new LinkedHashMap(); metricsTags.put("client-id", clientId); - long reconnectBackoffMs = config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG); + long reconnectBackoffMs = ClientUtils.addTimingJitter( + config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), this.jitterMs); int sendBuffer = config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG); int receiveBuffer = config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG); this.client = new NetworkClient(new Selector(metrics, time, metricsGroup, metricsTags), diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3588b70..8dce1d2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -134,6 +134,7 @@ public class KafkaProducer implements Producer { private final CompressionType compressionType; private final Sensor errors; private final Time time; + private final int jitterMs; private final Serializer keySerializer; private final Serializer valueSerializer; private final ProducerConfig producerConfig; @@ -216,6 +217,7 @@ public class KafkaProducer implements Producer { log.trace("Starting the Kafka producer"); this.producerConfig = config; this.time = new SystemTime(); + this.jitterMs = config.getInt(ProducerConfig.JITTER_MS_CONFIG); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); @@ -228,10 +230,13 @@ public class KafkaProducer implements Producer { reporters.add(new JmxReporter(jmxPrefix)); this.metrics = new Metrics(metricConfig, reporters, time); this.partitioner = new Partitioner(); - long retryBackoffMs = ClientUtils.randomizeBackoff( - config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG)); + long retryBackoffMs = ClientUtils.addTimingJitter( + config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.jitterMs); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); + this.metadata = new Metadata( + retryBackoffMs, + ClientUtils.addTimingJitter(config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), + this.jitterMs)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); @@ -252,7 +257,8 @@ public class KafkaProducer implements Producer { this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), - config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), + ClientUtils.addTimingJitter(config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), + this.jitterMs), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); this.sender = new Sender(client, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 122375c..ff341bd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -137,6 +137,9 @@ public class ProducerConfig extends AbstractConfig { /** retry.backoff.ms */ public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; + + /** timing.jitter.ms */ + public static final String JITTER_MS_CONFIG = CommonClientConfigs.JITTER_MS_CONFIG; /** compression.type */ public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; @@ -168,7 +171,8 @@ public class ProducerConfig extends AbstractConfig { static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) - .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) + .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, + atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) .define(ACKS_CONFIG, Type.STRING, @@ -191,7 +195,10 @@ public class ProducerConfig extends AbstractConfig { MAX_REQUEST_SIZE_DOC) .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) + .define(JITTER_MS_CONFIG, Type.INT, 10, atLeast(0), Importance.LOW, CommonClientConfigs.JITTER_MS_DOC) + .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", + Importance.LOW, + CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java index 1a1d59d..a0be275 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java @@ -44,22 +44,33 @@ public class ClientUtilsTest { @Test public void testRandomizeBackoff() { int nTrials = 30; + int jitterMs = 10; // Should do the right thing with large inputs for (int i = 0; i < nTrials; i++) { - Assert.assertTrue("randomizeBackoff should not cause backoff to roll over.", - ClientUtils.randomizeBackoff(Long.MAX_VALUE) > 0); + Assert.assertTrue("addTimingJitter should not cause backoff to roll over.", + ClientUtils.addTimingJitter(Long.MAX_VALUE, jitterMs) > 0); } // Should map 0 -> 0 for (int i = 0; i < nTrials; i++) { - Assert.assertEquals("randomizeBackoff(0) -> 0", 0, ClientUtils.randomizeBackoff(0)); + Assert.assertEquals("addTimingJitter(0) -> 0", 0, + ClientUtils.addTimingJitter(0, jitterMs)); } // Should not ever nudge small positive values into negative for (int i = 0; i < nTrials; i++) { - Assert.assertTrue("Should never return negative value.", - ClientUtils.randomizeBackoff(1L) > 0); + long jittered = ClientUtils.addTimingJitter(1L, jitterMs); + Assert.assertTrue("Should never return negative value: " + jittered, + jittered >= 0); + } + + // If jitterMs == 0, no jitter should be added + long timeMs = 100L; + for (int i = 0; i < nTrials; i++) { + Assert.assertEquals("If jitterMs == 0, then output should equal input.", + timeMs, + ClientUtils.addTimingJitter(timeMs, 0)); } } } \ No newline at end of file -- 1.9.3 (Apple Git-50)