diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 5bed607..b69866a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -65,7 +65,8 @@ public final class BufferPool { } /** - * Allocate a buffer of the given size + * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool + * is configured with blocking mode. * * @param size The buffer size to allocate in bytes * @return The buffer diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java index 368e8f3..678d1c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java @@ -20,11 +20,13 @@ import org.slf4j.LoggerFactory; public class ErrorLoggingCallback implements Callback { private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class); + private String topic; private byte[] key; private byte[] value; private boolean logAsString; - public ErrorLoggingCallback(byte[] key, byte[] value, boolean logAsString) { + public ErrorLoggingCallback(String topic, byte[] key, byte[] value, boolean logAsString) { + this.topic = topic; this.key = key; this.value = value; this.logAsString = logAsString; @@ -36,8 +38,8 @@ public class ErrorLoggingCallback implements Callback { logAsString ? new String(key) : key.length + " bytes"; String valueString = (value == null) ? "null" : logAsString ? new String(value) : value.length + " bytes"; - log.error("Error when sending message with key: " + keyString + ", value: " + valueString + - " with error " + e.getMessage()); + log.error("Error when sending message to topic {} with key: {}, value: {} with error: {}", + topic, keyString, valueString, e.getMessage()); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index db6e3a1..33d62a4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -79,6 +79,7 @@ public final class Metadata { public synchronized Cluster fetch(String topic, long maxWaitMs) { List partitions = null; long begin = System.currentTimeMillis(); + long remainingWaitMs = maxWaitMs; do { partitions = cluster.partitionsFor(topic); if (partitions == null) { @@ -86,12 +87,13 @@ public final class Metadata { forceUpdate = true; try { log.trace("Requesting metadata update for topic {}.", topic); - wait(maxWaitMs); + wait(remainingWaitMs); } catch (InterruptedException e) { /* this is fine, just try again */ } - long ellapsed = System.currentTimeMillis() - begin; - if (ellapsed >= maxWaitMs) + long elapsed = System.currentTimeMillis() - begin; + if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); + remainingWaitMs = maxWaitMs - elapsed; } else { return cluster; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 7a03f38..673b296 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -143,9 +143,9 @@ public final class RecordAccumulator { log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size); synchronized (dq) { - RecordBatch first = dq.peekLast(); - if (first != null) { - FutureRecordMetadata future = first.tryAppend(key, value, compression, callback); + RecordBatch last = dq.peekLast(); + if (last != null) { + FutureRecordMetadata future = last.tryAppend(key, value, compression, callback); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen // often... diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index c989e25..84a327e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -65,10 +65,14 @@ public class AbstractConfig { return (Integer) get(key); } - public Long getLong(String key) { + public long getLong(String key) { return (Long) get(key); } + public double getDouble(String key) { + return (Double) get(key); + } + @SuppressWarnings("unchecked") public List getList(String key) { return (List) get(key); diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 61257d1..67b349d 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -181,7 +181,7 @@ public class ConfigDef { else if (value instanceof String) return Arrays.asList(trimmed.split("\\s*,\\s*", -1)); else - throw new ConfigException(name, value, "Expected a comma seperated list."); + throw new ConfigException(name, value, "Expected a comma separated list."); case CLASS: if (value instanceof Class) return (Class) value; diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index f83189d..9839632 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -96,7 +96,7 @@ public class Selector implements Selectable { * @param sendBufferSize The send buffer for the new connection * @param receiveBufferSize The receive buffer for the new connection * @throws IllegalStateException if there is already a connection for that id - * @throws UnresolvedAddressException if DNS resolution fails on the hostname + * @throws IOException if DNS resolution fails on the hostname or if the broker is down */ @Override public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { @@ -111,6 +111,9 @@ public class Selector implements Selectable { channel.connect(address); } catch (UnresolvedAddressException e) { channel.close(); + throw new IOException("Can't resolve address: " + address, e); + } catch (IOException e) { + channel.close(); throw e; } SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index dc03fd0..8cecba5 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -150,7 +150,9 @@ public class Struct { } /** - * Create a struct for the schema of a container type (struct or array) + * Create a struct for the schema of a container type (struct or array). + * Note that for array type, this method assumes that the type is an array of schema and creates a struct + * of that schema. Arrays of other types can't be instantiated with this method. * * @param field The field to create an instance of * @return The struct diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 865996c..90e2dcf 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -125,7 +125,7 @@ public class SelectorTest { /** * Sending a request to a node with a bad hostname should result in an exception during connect */ - @Test(expected = UnresolvedAddressException.class) + @Test(expected = IOException.class) public void testNoRouteToHost() throws Exception { selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE); } diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala index 6f90549..a969a22 100644 --- a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala @@ -170,11 +170,11 @@ object MirrorMaker extends Logging { trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()), producerId)) val producer = producers(producerId) producer.send(producerRecord, - new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(), false)) + new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(), producerRecord.value(), false)) } else { val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size) producers(producerId).send(producerRecord, - new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(), false)) + new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(), producerRecord.value(), false)) trace("Sent message to producer " + producerId) } } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index f12a45b..3df0d13 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -220,7 +220,7 @@ object ProducerPerformance extends Logging { this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get() } else { this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes), - new ErrorLoggingCallback(null, bytes, if (config.seqIdMode) true else false)) + new ErrorLoggingCallback(topic, null, bytes, if (config.seqIdMode) true else false)) } }