diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 5bed607..b69866a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -65,7 +65,8 @@ public final class BufferPool { } /** - * Allocate a buffer of the given size + * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool + * is configured with blocking mode. * * @param size The buffer size to allocate in bytes * @return The buffer diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java index 368e8f3..678d1c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java @@ -20,11 +20,13 @@ import org.slf4j.LoggerFactory; public class ErrorLoggingCallback implements Callback { private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class); + private String topic; private byte[] key; private byte[] value; private boolean logAsString; - public ErrorLoggingCallback(byte[] key, byte[] value, boolean logAsString) { + public ErrorLoggingCallback(String topic, byte[] key, byte[] value, boolean logAsString) { + this.topic = topic; this.key = key; this.value = value; this.logAsString = logAsString; @@ -36,8 +38,8 @@ public class ErrorLoggingCallback implements Callback { logAsString ? new String(key) : key.length + " bytes"; String valueString = (value == null) ? "null" : logAsString ? new String(value) : value.length + " bytes"; - log.error("Error when sending message with key: " + keyString + ", value: " + valueString + - " with error " + e.getMessage()); + log.error("Error when sending message to topic {} with key: {}, value: {} with error: {}", + topic, keyString, valueString, e.getMessage()); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index db6e3a1..33d62a4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -79,6 +79,7 @@ public final class Metadata { public synchronized Cluster fetch(String topic, long maxWaitMs) { List partitions = null; long begin = System.currentTimeMillis(); + long remainingWaitMs = maxWaitMs; do { partitions = cluster.partitionsFor(topic); if (partitions == null) { @@ -86,12 +87,13 @@ public final class Metadata { forceUpdate = true; try { log.trace("Requesting metadata update for topic {}.", topic); - wait(maxWaitMs); + wait(remainingWaitMs); } catch (InterruptedException e) { /* this is fine, just try again */ } - long ellapsed = System.currentTimeMillis() - begin; - if (ellapsed >= maxWaitMs) + long elapsed = System.currentTimeMillis() - begin; + if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); + remainingWaitMs = maxWaitMs - elapsed; } else { return cluster; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 7a03f38..673b296 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -143,9 +143,9 @@ public final class RecordAccumulator { log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size); synchronized (dq) { - RecordBatch first = dq.peekLast(); - if (first != null) { - FutureRecordMetadata future = first.tryAppend(key, value, compression, callback); + RecordBatch last = dq.peekLast(); + if (last != null) { + FutureRecordMetadata future = last.tryAppend(key, value, compression, callback); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen // often... diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 565331d..98ae660 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -327,7 +327,7 @@ public class Sender implements Runnable { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.nodeStates.connecting(node.id(), now); - } catch (IOException e) { + } catch (Exception e) { /* attempt failed, we'll try again after the backoff */ nodeStates.disconnected(node.id()); /* maybe the problem is our metadata, update it */ diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index c989e25..84a327e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -65,10 +65,14 @@ public class AbstractConfig { return (Integer) get(key); } - public Long getLong(String key) { + public long getLong(String key) { return (Long) get(key); } + public double getDouble(String key) { + return (Double) get(key); + } + @SuppressWarnings("unchecked") public List getList(String key) { return (List) get(key); diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 61257d1..67b349d 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -181,7 +181,7 @@ public class ConfigDef { else if (value instanceof String) return Arrays.asList(trimmed.split("\\s*,\\s*", -1)); else - throw new ConfigException(name, value, "Expected a comma seperated list."); + throw new ConfigException(name, value, "Expected a comma separated list."); case CLASS: if (value instanceof Class) return (Class) value; diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java index cef75d8..281d5da 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java @@ -31,9 +31,9 @@ public interface Selectable { * @param address The address to connect to * @param sendBufferSize The send buffer for the socket * @param receiveBufferSize The receive buffer for the socket - * @throws IOException If we cannot begin connecting + * @throws Exception If we cannot begin connecting */ - public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException; + public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws Exception; /** * Begin disconnecting the connection identified by the given id diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index f83189d..519d8ff 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -96,10 +96,10 @@ public class Selector implements Selectable { * @param sendBufferSize The send buffer for the new connection * @param receiveBufferSize The receive buffer for the new connection * @throws IllegalStateException if there is already a connection for that id - * @throws UnresolvedAddressException if DNS resolution fails on the hostname + * @throws UnresolvedAddressException if DNS resolution fails on the hostname or IOException if the broker is down */ @Override - public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { + public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws Exception { SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); Socket socket = channel.socket(); @@ -109,7 +109,7 @@ public class Selector implements Selectable { socket.setTcpNoDelay(true); try { channel.connect(address); - } catch (UnresolvedAddressException e) { + } catch (Exception e) { channel.close(); throw e; } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index dc03fd0..8cecba5 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -150,7 +150,9 @@ public class Struct { } /** - * Create a struct for the schema of a container type (struct or array) + * Create a struct for the schema of a container type (struct or array). + * Note that for array type, this method assumes that the type is an array of schema and creates a struct + * of that schema. Arrays of other types can't be instantiated with this method. * * @param field The field to create an instance of * @return The struct diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 865996c..8997172 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -224,7 +224,7 @@ public class SelectorTest { } /* connect and wait for the connection to complete */ - private void blockingConnect(int node) throws IOException { + private void blockingConnect(int node) throws Exception { selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); while (!selector.connected().contains(node)) selector.poll(10000L, EMPTY); diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala index 6f90549..a969a22 100644 --- a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala @@ -170,11 +170,11 @@ object MirrorMaker extends Logging { trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()), producerId)) val producer = producers(producerId) producer.send(producerRecord, - new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(), false)) + new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(), producerRecord.value(), false)) } else { val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size) producers(producerId).send(producerRecord, - new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(), false)) + new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(), producerRecord.value(), false)) trace("Sent message to producer " + producerId) } } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index f12a45b..3df0d13 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -220,7 +220,7 @@ object ProducerPerformance extends Logging { this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get() } else { this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes), - new ErrorLoggingCallback(null, bytes, if (config.seqIdMode) true else false)) + new ErrorLoggingCallback(topic, null, bytes, if (config.seqIdMode) true else false)) } }