diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index bedd2a9..43e8438 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -89,7 +89,7 @@ public class KafkaProducer implements Producer { } private KafkaProducer(ProducerConfig config) { - log.trace("Starting the Kafka producer"); + log.debug("Starting the Kafka producer"); this.metrics = new Metrics(new MetricConfig(), Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")), new SystemTime()); @@ -102,6 +102,7 @@ public class KafkaProducer implements Producer { this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), + config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, new SystemTime()); @@ -122,7 +123,7 @@ public class KafkaProducer implements Producer { this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true); this.ioThread.start(); config.logUnused(); - log.debug("Kafka producer started"); + log.info("Kafka producer started"); } private static List parseAndValidateAddresses(List urls) { @@ -269,7 +270,7 @@ public class KafkaProducer implements Producer { */ @Override public void close() { - log.trace("Closing the Kafka producer."); + log.debug("Closing the Kafka producer."); this.sender.initiateClose(); try { this.ioThread.join(); @@ -277,7 +278,7 @@ public class KafkaProducer implements Producer { throw new KafkaException(e); } this.metrics.close(); - log.debug("The Kafka producer has closed."); + log.info("The Kafka producer has closed."); } private static class FutureFailure implements Future { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index d8e35e7..307659c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -136,6 +136,12 @@ public class ProducerConfig extends AbstractConfig { public static final String MAX_RETRIES_CONFIG = "request.retries"; /** + * The amount of time to wait before attempting to resend produce request to a given topic partition. This avoids + * repeated sending-and-failing in a tight loop + */ + public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; + + /** * Should we register the Kafka metrics as JMX mbeans? */ public static final String ENABLE_JMX_CONFIG = "enable.jmx"; @@ -160,7 +166,8 @@ public class ProducerConfig extends AbstractConfig { .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") - .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), ""); + .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "") + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 500L, atLeast(0L), "blah blah"); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 6990274..4425fb6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -51,10 +51,12 @@ public final class RecordAccumulator { private int drainIndex; private final int batchSize; private final long lingerMs; - private final ConcurrentMap> batches; + private final long retryBackoffMs; private final BufferPool free; private final Time time; + private final ConcurrentMap> batches; + /** * Create a new record accumulator * @@ -63,16 +65,19 @@ public final class RecordAccumulator { * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some * latency for potentially better throughput due to more batching (and hence fewer, larger requests). + * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids + * exhausting all retries in a short period of time. * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of * memory * @param metrics The metrics * @param time The time instance to use */ - public RecordAccumulator(int batchSize, long totalSize, long lingerMs, boolean blockOnBufferFull, Metrics metrics, Time time) { + public RecordAccumulator(int batchSize, long totalSize, long lingerMs, long retryBackoffMs, boolean blockOnBufferFull, Metrics metrics, Time time) { this.drainIndex = 0; this.closed = false; this.batchSize = batchSize; this.lingerMs = lingerMs; + this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap>(); this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull); this.time = time; @@ -155,6 +160,7 @@ public final class RecordAccumulator { */ public void reenqueue(RecordBatch batch, long now) { batch.attempts++; + batch.retried = now; Deque deque = dequeFor(batch.topicPartition); synchronized (deque) { deque.addFirst(batch); @@ -181,6 +187,8 @@ public final class RecordAccumulator { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { + boolean allowed = batch.retried + retryBackoffMs <= now || batch.retried == batch.created; + if (!allowed) continue; boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining(); boolean expired = now - batch.created >= lingerMs; if (full | expired | exhausted | closed) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index c7fbf3c..bdc875a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -34,6 +34,7 @@ public final class RecordBatch { public int recordCount = 0; public volatile int attempts = 0; public final long created; + public long retried; public final MemoryRecords records; public final TopicPartition topicPartition; private final ProduceRequestResult produceFuture; @@ -41,6 +42,7 @@ public final class RecordBatch { public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { this.created = now; + this.retried = now; this.records = records; this.topicPartition = tp; this.produceFuture = new ProduceRequestResult(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7942623..23ad0a9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -142,7 +142,7 @@ public class Sender implements Runnable { * The main run loop for the sender thread */ public void run() { - log.trace("Starting Kafka producer I/O thread."); + log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { @@ -153,7 +153,7 @@ public class Sender implements Runnable { } } - log.trace("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); + log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, @@ -170,7 +170,7 @@ public class Sender implements Runnable { // close all the connections this.selector.close(); - log.trace("Shutdown of Kafka producer I/O thread has completed."); + log.debug("Shutdown of Kafka producer I/O thread has completed."); } /** @@ -216,8 +216,8 @@ public class Sender implements Runnable { // handle responses, connections, and disconnections handleSends(this.selector.completedSends()); - handleResponses(this.selector.completedReceives(), now); - handleDisconnects(this.selector.disconnected(), now); + handleResponses(this.selector.completedReceives(), time.milliseconds()); + handleDisconnects(this.selector.disconnected(), time.milliseconds()); handleConnects(this.selector.connected()); return ready.size(); @@ -348,15 +348,30 @@ public class Sender implements Runnable { nodeStates.disconnected(node); log.debug("Node {} disconnected.", node); for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { - if (request.batches != null) { - for (RecordBatch batch : request.batches.values()) { - if (canRetry(batch, Errors.NETWORK_EXCEPTION)) { - this.accumulator.reenqueue(batch, now); - } else { - batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response.")); - this.accumulator.deallocate(batch); + ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey()); + switch (requestKey) { + case PRODUCE: + for (RecordBatch batch : request.batches.values()) { + if (canRetry(batch, Errors.NETWORK_EXCEPTION)) { + log.warn("Destination node disconnected for topic-partition {}, retrying with {} times left.", + batch.topicPartition, this.retries - batch.attempts - 1); + this.accumulator.reenqueue(batch, now); + } else { + log.warn("Destination node disconnected for topic-partition {} and all retries exhausted, " + + "returning network exception", batch.topicPartition); + batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response.")); + this.accumulator.deallocate(batch); + } } - } + break; + case METADATA: + metadataFetchInProgress = false; + break; + default: + throw new IllegalArgumentException("Unexpected api key id: " + requestKey.id); + } + if (request.batches != null) { + } } } @@ -436,6 +451,7 @@ public class Sender implements Runnable { * Handle a produce response */ private void handleProduceResponse(InFlightRequest request, Struct response, long now) { + // TODO: better move the parsing logic to ProduceResponse for (Object topicResponse : (Object[]) response.get("responses")) { Struct topicRespStruct = (Struct) topicResponse; String topic = (String) topicRespStruct.get("topic"); @@ -453,9 +469,14 @@ public class Sender implements Runnable { RecordBatch batch = request.batches.get(new TopicPartition(topic, partition)); if (canRetry(batch, error)) { // retry - log.warn("Got error for topic-partition {}, retrying. Error: {}", topic, partition, error); + // TODO: better include correlation id from header + log.warn("Got error for topic-partition {}, retrying with {} times left. Error: {}", + batch.topicPartition, this.retries - batch.attempts - 1, error); this.accumulator.reenqueue(batch, now); } else { + if (error != Errors.NONE) + log.warn("Got error for topic-partition {} and all retries exhausted, returning the error {}", + batch.topicPartition, error); // tell the user the result of their request batch.done(offset, error.exception()); this.accumulator.deallocate(batch); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 21a2592..744d126 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -48,4 +48,26 @@ public enum ApiKeys { this.name = name; } + public static ApiKeys forId(int id) { + switch (id) { + case 0: + return PRODUCE; + case 1: + return FETCH; + case 2: + return LIST_OFFSETS; + case 3: + return METADATA; + case 4: + return LEADER_AND_ISR; + case 5: + return STOP_REPLICA; + case 6: + return OFFSET_COMMIT; + case 7: + return OFFSET_FETCH; + default: + throw new IllegalArgumentException("Unknown api key id: " + id); + } + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b58cdcd..4deff9d 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -337,7 +337,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg inLock(controllerContext.controllerLock) { if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() - deleteTopicManager.shutdown() + if (deleteTopicManager != null) + deleteTopicManager.shutdown() Utils.unregisterMBean(KafkaController.MBeanName) partitionStateMachine.shutdown() replicaStateMachine.shutdown() @@ -647,8 +648,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null - info("Controller shutdown complete") } + info("Controller shutdown complete") } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 10062af..80b38f3 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -162,7 +162,7 @@ class LogManager(val logDirs: Array[File], * Close all the logs */ def shutdown() { - debug("Shutting down.") + info("Shutting down.") try { // stop the cleaner first if(cleaner != null) @@ -179,7 +179,7 @@ class LogManager(val logDirs: Array[File], // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) } - debug("Shutdown complete.") + info("Shutdown complete.") } /**