diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3d180e8..029c40c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -96,11 +96,13 @@ public class KafkaProducer implements Producer { this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), - config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL), + config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, new SystemTime()); List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), System.currentTimeMillis()); + int sendBuffer = config.getInt(ProducerConfig.SEND_BUFFER_CONFIG); + int receiveBuffer = config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG); this.sender = new Sender(new Selector(), this.metadata, this.accumulator, @@ -108,7 +110,10 @@ public class KafkaProducer implements Producer { config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG), + config.getInt(ProducerConfig.RETRIES_CONFIG), config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG), + sendBuffer, + receiveBuffer, new SystemTime()); this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true); this.ioThread.start(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index dca9802..d54b9c9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -62,7 +62,7 @@ public class ProducerConfig extends AbstractConfig { /** * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent * faster than they can be delivered to the server the producer will either block or throw an exception based on the - * preference specified by {@link #BLOCK_ON_BUFFER_FULL}. + * preference specified by {@link #BLOCK_ON_BUFFER_FULL_CONFIG}. */ public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes"; @@ -107,6 +107,11 @@ public class ProducerConfig extends AbstractConfig { public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; /** + * The size of the TCP receive buffer to use when reading data (you generally shouldn't need to change this) + */ + public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes"; + + /** * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server * has its own cap on record size which may be different from this. */ @@ -123,9 +128,17 @@ public class ProducerConfig extends AbstractConfig { * this setting is true and we block, however users who want to guarantee we never block can turn this into an * error. */ - public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full"; + public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; + + /** + * The maximum number of times to attempt resending the request before giving up. + */ + public static final String RETRIES_CONFIG = "request.retries"; - public static final String ENABLE_JMX = "enable.jmx"; + /** + * Should we register the Kafka metrics as JMX mbeans? + */ + public static final String ENABLE_JMX_CONFIG = "enable.jmx"; static { /* TODO: add docs */ @@ -142,10 +155,12 @@ public class ProducerConfig extends AbstractConfig { .define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah") .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah") .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah") + .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah") .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") - .define(BLOCK_ON_BUFFER_FULL, Type.BOOLEAN, true, "blah blah") - .define(ENABLE_JMX, Type.BOOLEAN, true, ""); + .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") + .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") + .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), ""); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 52d30a8..62613a3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer.internals; @@ -24,7 +20,6 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.TimeoutException; - /** * A class encapsulating some of the logic around metadata. *

@@ -134,4 +129,11 @@ public final class Metadata { notifyAll(); } + /** + * The last time metadata was updated. + */ + public synchronized long lastUpdate() { + return this.lastRefresh; + } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index be8a4a3..625341e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -1,25 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.List; @@ -39,10 +34,9 @@ import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; - /** - * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} instances to be - * sent to the server. + * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} + * instances to be sent to the server. *

* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless * this behavior is explicitly disabled. @@ -152,6 +146,18 @@ public final class RecordAccumulator { } /** + * Re-enqueue the given record batch in the accumulator to retry + */ + public void reenqueue(RecordBatch batch, long now) { + batch.attempts++; + batch.lastAttempt = now; + Deque deque = dequeFor(batch.topicPartition); + synchronized (deque) { + deque.addFirst(batch); + } + } + + /** * Get a list of topic-partitions which are ready to be sent. *

* A partition is ready if ANY of the following are true: @@ -229,16 +235,10 @@ public final class RecordAccumulator { } /** - * Deallocate the list of record batches + * Deallocate the record batch */ - public void deallocate(Collection batches) { - ByteBuffer[] buffers = new ByteBuffer[batches.size()]; - int i = 0; - for (RecordBatch batch : batches) { - buffers[i] = batch.records.buffer(); - i++; - } - free.deallocate(buffers); + public void deallocate(RecordBatch batch) { + free.deallocate(batch.records.buffer()); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 7a440a3..3f54c3d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer.internals; @@ -25,7 +21,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; - /** * A batch of records that is or will be sent. * @@ -33,6 +28,8 @@ import org.apache.kafka.common.record.MemoryRecords; */ public final class RecordBatch { public int recordCount = 0; + public volatile int attempts = 0; + public volatile long lastAttempt = 0; public final long created; public final MemoryRecords records; public final TopicPartition topicPartition; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index d93a455..abd3e9c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -22,12 +22,14 @@ import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.NetworkException; +import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; @@ -48,19 +50,55 @@ import org.apache.kafka.common.utils.Time; */ public class Sender implements Runnable { - private final Map nodeState; + /* the state of each nodes connection */ + private final NodeStates nodeStates; + + /* the record accumulator that batches records */ private final RecordAccumulator accumulator; + + /* the selector used to perform network i/o */ private final Selectable selector; + + /* the client id used to identify this client in requests to the server */ private final String clientId; + + /* the maximum request size to attempt to send to the server */ private final int maxRequestSize; - private final long reconnectBackoffMs; + + /* the number of acknowledgements to request from the server */ private final short acks; + + /* the max time in ms for the server to wait for acknowlegements */ private final int requestTimeout; + + /* the number of times to retry a failed request before giving up */ + private final int retries; + + /* the socket send buffer size in bytes */ + private final int socketSendBuffer; + + /* the socket receive size buffer in bytes */ + private final int socketReceiveBuffer; + + /* the set of currently in-flight requests awaiting a response from the server */ private final InFlightRequests inFlightRequests; + + /* a reference to the current Cluster instance */ private final Metadata metadata; + + /* the clock instance used for getting the time */ private final Time time; + + /* the current node to attempt to use for metadata requests (will round-robin over nodes) */ + private int nodeIndex; + + /* the current correlation id to use when sending requests to servers */ private int correlation; + + /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ private boolean metadataFetchInProgress; + + /* true while the sender thread is still running */ private volatile boolean running; public Sender(Selectable selector, @@ -70,22 +108,28 @@ public class Sender implements Runnable { int maxRequestSize, long reconnectBackoffMs, short acks, + int retries, int requestTimeout, + int socketSendBuffer, + int socketReceiveBuffer, Time time) { - this.nodeState = new HashMap(); + this.nodeStates = new NodeStates(reconnectBackoffMs); this.accumulator = accumulator; this.selector = selector; this.maxRequestSize = maxRequestSize; - this.reconnectBackoffMs = reconnectBackoffMs; this.metadata = metadata; this.clientId = clientId; this.running = true; this.requestTimeout = requestTimeout; this.acks = acks; + this.retries = retries; + this.socketSendBuffer = socketSendBuffer; + this.socketReceiveBuffer = socketReceiveBuffer; this.inFlightRequests = new InFlightRequests(); this.correlation = 0; this.metadataFetchInProgress = false; this.time = time; + this.nodeIndex = new Random().nextInt(1024); } /** @@ -130,11 +174,7 @@ public class Sender implements Runnable { // should we update our metadata? List sends = new ArrayList(); - InFlightRequest metadataReq = maybeMetadataRequest(cluster, now); - if (metadataReq != null) { - sends.add(metadataReq.request); - this.inFlightRequests.add(metadataReq); - } + maybeUpdateMetadata(cluster, sends, now); // prune the list of ready topics to eliminate any that we aren't ready to send yet List sendable = processReadyPartitions(cluster, ready, now); @@ -158,43 +198,67 @@ public class Sender implements Runnable { // handle responses, connections, and disconnections handleSends(this.selector.completedSends()); handleResponses(this.selector.completedReceives(), now); - handleDisconnects(this.selector.disconnected()); + handleDisconnects(this.selector.disconnected(), now); handleConnects(this.selector.connected()); return ready.size(); } - private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) { + /** + * Add a metadata request to the list of sends if we need to make one + */ + private void maybeUpdateMetadata(Cluster cluster, List sends, long now) { if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) - return null; + return; - Node node = nextFreeNode(cluster); + Node node = selectMetadataDestination(cluster); if (node == null) - return null; + return; - NodeState state = nodeState.get(node.id()); - if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) { + if (nodeStates.isConnected(node.id())) { + this.metadataFetchInProgress = true; + InFlightRequest request = metadataRequest(node.id(), metadata.topics()); + sends.add(request.request); + this.inFlightRequests.add(request); + } else if (nodeStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one initiateConnect(node, now); - return null; - } else if (state.state == ConnectionState.CONNECTED) { - this.metadataFetchInProgress = true; - return metadataRequest(node.id(), metadata.topics()); - } else { - return null; } } /** * @return A node with no requests currently being sent or null if no such node exists */ - private Node nextFreeNode(Cluster cluster) { - for (int i = 0; i < cluster.nodes().size(); i++) { - Node node = cluster.nextNode(); - if (this.inFlightRequests.canSendMore(node.id())) + private Node selectMetadataDestination(Cluster cluster) { + List nodes = cluster.nodes(); + + // first look for a node to which we are connected and have no outstanding requests + boolean connectionInProgress = false; + for (int i = 0; i < nodes.size(); i++) { + Node node = nodes.get((i + this.nodeIndex) % nodes.size()); + if (nodeStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) { + // increment the starting index so we don't reuse this node for the next request + this.nodeIndex++; + return node; + } else if (nodeStates.isConnecting(node.id())) { + connectionInProgress = true; + } + } + + // if we have a connection that is being established now, just wait for that + if (connectionInProgress) + return null; + + // okay, no luck, pick a random unused node + for (int i = 0; i < nodes.size(); i++) { + Node node = nodes.get((i + this.nodeIndex) % nodes.size()); + if (this.inFlightRequests.canSendMore(node.id())) { + this.nodeIndex++; return node; + } } - return null; + + return null; // we failed to find a good destination } /** @@ -209,7 +273,7 @@ public class Sender implements Runnable { /** * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate - * metdata to be able to do so + * metadata to be able to do so */ private List processReadyPartitions(Cluster cluster, List ready, long now) { List sendable = new ArrayList(ready.size()); @@ -218,15 +282,11 @@ public class Sender implements Runnable { if (node == null) { // we don't know about this topic/partition or it has no leader, re-fetch metadata metadata.forceUpdate(); - } else { - NodeState state = nodeState.get(node.id()); - // TODO: encapsulate this logic somehow - if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) { - // we don't have a connection to this node right now, make one - initiateConnect(node, now); - } else if (state.state == ConnectionState.CONNECTED && inFlightRequests.canSendMore(node.id())) { - sendable.add(tp); - } + } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { + sendable.add(tp); + } else if (nodeStates.canConnect(node.id(), now)) { + // we don't have a connection to this node right now, make one + initiateConnect(node, now); } } return sendable; @@ -237,13 +297,11 @@ public class Sender implements Runnable { */ private void initiateConnect(Node node, long now) { try { - selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 64 * 1024 * 1024, 64 * 1024 * 1024); // TODO - // socket - // buffers - nodeState.put(node.id(), new NodeState(ConnectionState.CONNECTING, now)); + selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); + this.nodeStates.connecting(node.id(), now); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ - nodeState.put(node.id(), new NodeState(ConnectionState.DISCONNECTED, now)); + nodeStates.disconnected(node.id()); /* maybe the problem is our metadata, update it */ metadata.forceUpdate(); } @@ -252,19 +310,26 @@ public class Sender implements Runnable { /** * Handle any closed connections */ - private void handleDisconnects(List disconnects) { + private void handleDisconnects(List disconnects, long now) { + // clear out the in-flight requests for the disconnected broker for (int node : disconnects) { for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { if (request.batches != null) { - for (RecordBatch batch : request.batches.values()) - batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response.")); - this.accumulator.deallocate(request.batches.values()); + for (RecordBatch batch : request.batches.values()) { + if (canRetry(batch, Errors.NETWORK_EXCEPTION)) { + this.accumulator.reenqueue(batch, now); + } else { + batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response.")); + this.accumulator.deallocate(batch); + } + } } - NodeState state = this.nodeState.get(request.request.destination()); - if (state != null) - state.state = ConnectionState.DISCONNECTED; + nodeStates.disconnected(request.request.destination()); } } + // we got a disconnect so we should probably refresh our metadata and see if that broker is dead + if (disconnects.size() > 0) + this.metadata.forceUpdate(); } /** @@ -272,7 +337,7 @@ public class Sender implements Runnable { */ private void handleConnects(List connects) { for (Integer id : connects) - this.nodeState.get(id).state = ConnectionState.CONNECTED; + this.nodeStates.connected(id); } /** @@ -286,9 +351,10 @@ public class Sender implements Runnable { if (!request.expectResponse) { requests.pollFirst(); if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) { - for (RecordBatch batch : request.batches.values()) + for (RecordBatch batch : request.batches.values()) { batch.done(-1L, Errors.NONE.exception()); - this.accumulator.deallocate(request.batches.values()); + this.accumulator.deallocate(batch); + } } } } @@ -306,7 +372,7 @@ public class Sender implements Runnable { Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); correlate(req.request.header(), header); if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) - handleProduceResponse(req, body); + handleProduceResponse(req, body, now); else if (req.request.header().apiKey() == ApiKeys.METADATA.id) handleMetadataResponse(body, now); else @@ -327,7 +393,7 @@ public class Sender implements Runnable { /** * Handle a produce response */ - private void handleProduceResponse(InFlightRequest request, Struct response) { + private void handleProduceResponse(InFlightRequest request, Struct response, long now) { for (Object topicResponse : (Object[]) response.get("responses")) { Struct topicRespStruct = (Struct) topicResponse; String topic = (String) topicRespStruct.get("topic"); @@ -335,12 +401,38 @@ public class Sender implements Runnable { Struct partRespStruct = (Struct) partResponse; int partition = (Integer) partRespStruct.get("partition"); short errorCode = (Short) partRespStruct.get("error_code"); + + // if we got an error we may need to refresh our metadata + if (invalidMetdataError(errorCode)) + metadata.forceUpdate(); + + // tell the user the result of their request + Errors error = Errors.forCode(errorCode); long offset = (Long) partRespStruct.get("base_offset"); RecordBatch batch = request.batches.get(new TopicPartition(topic, partition)); - batch.done(offset, Errors.forCode(errorCode).exception()); + if (canRetry(batch, error)) { + this.accumulator.reenqueue(batch, now); + } else { + batch.done(offset, error.exception()); + this.accumulator.deallocate(batch); + } } } - this.accumulator.deallocate(request.batches.values()); + } + + /** + * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed + */ + private boolean canRetry(RecordBatch batch, Errors error) { + return batch.attempts < this.retries && error.exception() instanceof RetriableException; + } + + /** + * Does this error imply we need to refresh our metadata? + */ + private boolean invalidMetdataError(short errorCode) { + return errorCode == Errors.LEADER_NOT_AVAILABLE.code() || errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() + || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code(); } /** @@ -459,6 +551,53 @@ public class Sender implements Runnable { } } + private static class NodeStates { + private final long reconnectBackoffMs; + private final Map nodeState; + + public NodeStates(long reconnectBackoffMs) { + this.reconnectBackoffMs = reconnectBackoffMs; + this.nodeState = new HashMap(); + } + + public boolean canConnect(int node, long now) { + NodeState state = nodeState.get(node); + if (state == null) + return true; + else + return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs; + } + + public void connecting(int node, long now) { + nodeState.put(node, new NodeState(ConnectionState.CONNECTING, now)); + } + + public boolean isConnected(int node) { + NodeState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTED; + } + + public boolean isConnecting(int node) { + NodeState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTING; + } + + public void connected(int node) { + nodeState(node).state = ConnectionState.CONNECTED; + } + + public void disconnected(int node) { + nodeState(node).state = ConnectionState.DISCONNECTED; + } + + private NodeState nodeState(int node) { + NodeState state = this.nodeState.get(node); + if (state == null) + throw new IllegalStateException("No entry found for node " + node); + return state; + } + } + /** * An request that hasn't been fully processed yet */ diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index c17a8f8..5caaaae 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common; @@ -23,17 +19,12 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.kafka.common.utils.Utils; - /** * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster. */ public final class Cluster { - private final AtomicInteger counter = new AtomicInteger(0); private final List nodes; private final Map partitionsByTopicPartition; private final Map> partitionsByTopic; @@ -126,15 +117,4 @@ public final class Cluster { return this.partitionsByTopic.get(topic); } - /** - * Round-robin over the nodes in this cluster - */ - public Node nextNode() { - int size = nodes.size(); - if (size == 0) - throw new IllegalStateException("No known nodes."); - int idx = Utils.abs(counter.getAndIncrement()) % size; - return this.nodes.get(idx); - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java index 673f61d..e1dedfe 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java @@ -1,22 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class CorruptRecordException extends ApiException { +public class CorruptRecordException extends RetriableException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java index 0bde6b5..5d4e9b9 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.errors; -public class LeaderNotAvailableException extends RetryableException { +public class LeaderNotAvailableException extends RetriableException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java b/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java index 3a04159..b5a464c 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java @@ -1,22 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class NetworkException extends ApiException { +public class NetworkException extends RetriableException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java index 5adc72c..6e647dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.errors; -public class NotLeaderForPartitionException extends RetryableException { +public class NotLeaderForPartitionException extends RetriableException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java index d01698a..32f7e5d 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java @@ -1,22 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class OffsetOutOfRangeException extends ApiException { +public class OffsetOutOfRangeException extends RetriableException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java b/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java new file mode 100644 index 0000000..6c639a9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * A retryable exception is a transient exception that if retried may succeed. + */ +public abstract class RetriableException extends ApiException { + + private static final long serialVersionUID = 1L; + + public RetriableException(String message, Throwable cause) { + super(message, cause); + } + + public RetriableException(String message) { + super(message); + } + + public RetriableException(Throwable cause) { + super(cause); + } + + public RetriableException() { + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java b/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java deleted file mode 100644 index c7f2f22..0000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.errors; - -/** - * A retryable exception is an exception that is safe to retry. To be retryable an exception should be - *

    - *
  1. Transient, there is no point retrying a error due to a non-existant topic or message too large - *
  2. Idempotent, the exception is known to not change any state on the server - *
- * A client may choose to retry any request they like, but exceptions extending this class are always safe and sane to - * retry. - */ -public abstract class RetryableException extends ApiException { - - private static final long serialVersionUID = 1L; - - public RetryableException(String message, Throwable cause) { - super(message, cause); - } - - public RetryableException(String message) { - super(message); - } - - public RetryableException(Throwable cause) { - super(cause); - } - - public RetryableException() { - } - -} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java index dffd64d..dd4ffda 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java @@ -1,22 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class TimeoutException extends ApiException { +public class TimeoutException extends RetriableException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java index 73d1953..58eecca 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java @@ -1,22 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class UnknownTopicOrPartitionException extends ApiException { +public class UnknownTopicOrPartitionException extends RetriableException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 8ed4c73..f1e474c 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.network; @@ -32,7 +28,6 @@ import java.util.Set; import org.apache.kafka.common.KafkaException; - /** * A selector interface for doing non-blocking multi-connection network I/O. *

@@ -302,8 +297,11 @@ public class Selector implements Selectable { private void close(SelectionKey key) throws IOException { SocketChannel channel = channel(key); Transmissions trans = transmissions(key); - if (trans != null) + if (trans != null) { this.disconnected.add(trans.id); + trans.clearReceive(); + trans.clearSend(); + } key.attach(null); key.cancel(); channel.socket().close(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 1bbe83c..a3bf07e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer; @@ -25,7 +21,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; - import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.RecordBatch; import org.apache.kafka.common.TopicPartition; @@ -140,8 +135,8 @@ public class RecordAccumulatorTest { for (RecordBatch batch : batches) { for (LogEntry entry : batch.records) read++; + accum.deallocate(batch); } - accum.deallocate(batches); } for (Thread t : threads) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 41c028b..ce30b33 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -1,29 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; - -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; @@ -46,6 +42,7 @@ import org.junit.Test; public class SenderTest { + private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); private MockSelector selector = new MockSelector(time); private int batchSize = 16 * 1024; @@ -53,7 +50,18 @@ public class SenderTest { private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time); - private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024, 0L, (short) -1, 10000, time); + private Sender sender = new Sender(selector, + metadata, + this.accumulator, + "", + 1024 * 1024, + 0L, + (short) -1, + 0, + 10000, + 64 * 1024, + 64 * 1024, + time); @Before public void setup() { @@ -62,7 +70,6 @@ public class SenderTest { @Test public void testSimple() throws Exception { - TopicPartition tp = new TopicPartition("test", 0); Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); sender.run(time.milliseconds()); assertEquals("We should have connected", 1, selector.connected().size()); @@ -83,6 +90,92 @@ public class SenderTest { assertEquals(offset, future.get().offset()); } + @Test + public void testRetries() throws Exception { + // create a sender with retries = 1 + Sender sender = new Sender(selector, + metadata, + this.accumulator, + "", + 1024 * 1024, + 0L, + (short) -1, + 1, + 10000, + 64 * 1024, + 64 * 1024, + time); + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + RequestSend request1 = completeSend(sender); + selector.clear(); + selector.completeReceive(produceResponse(request1.header().correlationId(), + cluster.leaderFor(tp).id(), + tp.topic(), + tp.partition(), + -1, + Errors.REQUEST_TIMED_OUT.code())); + sender.run(time.milliseconds()); + selector.clear(); + sender.run(time.milliseconds()); + RequestSend request2 = completeSend(sender); + selector.completeReceive(produceResponse(request2.header().correlationId(), + cluster.leaderFor(tp).id(), + tp.topic(), + tp.partition(), + 42, + Errors.NONE.code())); + sender.run(time.milliseconds()); + assertTrue("Request should retry and complete", future.isDone()); + assertEquals(42, future.get().offset()); + } + + @Test + public void testMetadataRefreshOnNoLeaderException() throws Exception { + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + RequestSend request = completeSend(); + selector.clear(); + selector.completeReceive(produceResponse(request.header().correlationId(), + cluster.leaderFor(tp).id(), + tp.topic(), + tp.partition(), + -1, + Errors.NOT_LEADER_FOR_PARTITION.code())); + sender.run(time.milliseconds()); + completedWithError(future, Errors.NOT_LEADER_FOR_PARTITION); + assertTrue("Error triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); + } + + @Test + public void testMetadataRefreshOnDisconnect() throws Exception { + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + completeSend(); + selector.clear(); + selector.disconnect(cluster.leaderFor(tp).id()); + sender.run(time.milliseconds()); + completedWithError(future, Errors.NETWORK_EXCEPTION); + assertTrue("The disconnection triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); + } + + private void completedWithError(Future future, Errors error) throws Exception { + assertTrue("Request should be completed", future.isDone()); + try { + future.get(); + fail("Should have thrown an exception."); + } catch (ExecutionException e) { + assertEquals(error.exception().getClass(), e.getCause().getClass()); + } + } + + private RequestSend completeSend() { + return completeSend(sender); + } + + private RequestSend completeSend(Sender sender) { + while (selector.completedSends().size() == 0) + sender.run(time.milliseconds()); + return (RequestSend) selector.completedSends().get(0); + } + private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) { Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); Struct response = struct.instance("responses");