diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3d180e8..e3f8765 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -101,6 +101,8 @@ public class KafkaProducer implements Producer { new SystemTime()); List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), System.currentTimeMillis()); + int sendBuffer = config.getInt(ProducerConfig.SEND_BUFFER_CONFIG); + int receiveBuffer = config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG); this.sender = new Sender(new Selector(), this.metadata, this.accumulator, @@ -109,6 +111,8 @@ public class KafkaProducer implements Producer { config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG), config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG), + sendBuffer, + receiveBuffer, new SystemTime()); this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true); this.ioThread.start(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index dca9802..6430669 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -107,6 +107,11 @@ public class ProducerConfig extends AbstractConfig { public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; /** + * The size of the TCP receive buffer to use when reading data (you generally shouldn't need to change this) + */ + public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes"; + + /** * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server * has its own cap on record size which may be different from this. */ @@ -125,6 +130,9 @@ public class ProducerConfig extends AbstractConfig { */ public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full"; + /** + * Should we register the Kafka metrics as JMX mbeans? + */ public static final String ENABLE_JMX = "enable.jmx"; static { @@ -142,6 +150,7 @@ public class ProducerConfig extends AbstractConfig { .define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah") .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah") .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah") + .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah") .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") .define(BLOCK_ON_BUFFER_FULL, Type.BOOLEAN, true, "blah blah") diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 52d30a8..62613a3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer.internals; @@ -24,7 +20,6 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.TimeoutException; - /** * A class encapsulating some of the logic around metadata. *

@@ -134,4 +129,11 @@ public final class Metadata { notifyAll(); } + /** + * The last time metadata was updated. + */ + public synchronized long lastUpdate() { + return this.lastRefresh; + } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index d93a455..568e1d7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -56,6 +56,8 @@ public class Sender implements Runnable { private final long reconnectBackoffMs; private final short acks; private final int requestTimeout; + private final int socketSendBuffer; + private final int socketReceiveBuffer; private final InFlightRequests inFlightRequests; private final Metadata metadata; private final Time time; @@ -71,6 +73,8 @@ public class Sender implements Runnable { long reconnectBackoffMs, short acks, int requestTimeout, + int socketSendBuffer, + int socketReceiveBuffer, Time time) { this.nodeState = new HashMap(); this.accumulator = accumulator; @@ -82,6 +86,8 @@ public class Sender implements Runnable { this.running = true; this.requestTimeout = requestTimeout; this.acks = acks; + this.socketSendBuffer = socketSendBuffer; + this.socketReceiveBuffer = socketReceiveBuffer; this.inFlightRequests = new InFlightRequests(); this.correlation = 0; this.metadataFetchInProgress = false; @@ -237,9 +243,7 @@ public class Sender implements Runnable { */ private void initiateConnect(Node node, long now) { try { - selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 64 * 1024 * 1024, 64 * 1024 * 1024); // TODO - // socket - // buffers + selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); nodeState.put(node.id(), new NodeState(ConnectionState.CONNECTING, now)); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ @@ -253,6 +257,7 @@ public class Sender implements Runnable { * Handle any closed connections */ private void handleDisconnects(List disconnects) { + // clear out the in-flight requests for the disconnected broker for (int node : disconnects) { for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { if (request.batches != null) { @@ -265,6 +270,9 @@ public class Sender implements Runnable { state.state = ConnectionState.DISCONNECTED; } } + // we got a disconnect so we should probably refresh our metadata and see if that broker is dead + if (disconnects.size() > 0) + this.metadata.forceUpdate(); } /** @@ -335,6 +343,12 @@ public class Sender implements Runnable { Struct partRespStruct = (Struct) partResponse; int partition = (Integer) partRespStruct.get("partition"); short errorCode = (Short) partRespStruct.get("error_code"); + + // if we got an error we may need to refresh our metadata + if (invalidMetdataError(errorCode)) + metadata.forceUpdate(); + + // tell the user the result of their request long offset = (Long) partRespStruct.get("base_offset"); RecordBatch batch = request.batches.get(new TopicPartition(topic, partition)); batch.done(offset, Errors.forCode(errorCode).exception()); @@ -343,6 +357,11 @@ public class Sender implements Runnable { this.accumulator.deallocate(request.batches.values()); } + private boolean invalidMetdataError(short errorCode) { + return errorCode == Errors.LEADER_NOT_AVAILABLE.code() || errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() + || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code(); + } + /** * Validate that the response corresponds to the request we expect or else explode */ diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 41c028b..bea9621 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -1,29 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; - -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; @@ -46,6 +42,7 @@ import org.junit.Test; public class SenderTest { + private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); private MockSelector selector = new MockSelector(time); private int batchSize = 16 * 1024; @@ -53,7 +50,17 @@ public class SenderTest { private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time); - private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024, 0L, (short) -1, 10000, time); + private Sender sender = new Sender(selector, + metadata, + this.accumulator, + "", + 1024 * 1024, + 0L, + (short) -1, + 10000, + 64 * 1024, + 64 * 1024, + time); @Before public void setup() { @@ -62,7 +69,6 @@ public class SenderTest { @Test public void testSimple() throws Exception { - TopicPartition tp = new TopicPartition("test", 0); Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); sender.run(time.milliseconds()); assertEquals("We should have connected", 1, selector.connected().size()); @@ -83,6 +89,49 @@ public class SenderTest { assertEquals(offset, future.get().offset()); } + @Test + public void testMetadataRefreshOnNoLeaderException() throws Exception { + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + RequestSend request = completeSend(); + selector.clear(); + selector.completeReceive(produceResponse(request.header().correlationId(), + cluster.leaderFor(tp).id(), + tp.topic(), + tp.partition(), + 42, + Errors.NOT_LEADER_FOR_PARTITION.code())); + sender.run(time.milliseconds()); + completedWithError(future, Errors.NOT_LEADER_FOR_PARTITION); + assertTrue("Error triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); + } + + @Test + public void testMetadataRefreshOnDisconnect() throws Exception { + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + completeSend(); + selector.clear(); + selector.disconnect(cluster.leaderFor(tp).id()); + sender.run(time.milliseconds()); + completedWithError(future, Errors.NETWORK_EXCEPTION); + assertTrue("The disconnection triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); + } + + private void completedWithError(Future future, Errors error) throws Exception { + assertTrue("Request should be completed", future.isDone()); + try { + future.get(); + fail("Should have thrown an exception."); + } catch (ExecutionException e) { + assertEquals(error.exception().getClass(), e.getCause().getClass()); + } + } + + private RequestSend completeSend() { + while (selector.completedSends().size() == 0) + sender.run(time.milliseconds()); + return (RequestSend) selector.completedSends().get(0); + } + private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) { Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); Struct response = struct.instance("responses");