diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1ac6943..6d731ed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -218,11 +218,12 @@ public class KafkaProducer implements Producer { */ @Override public Future send(ProducerRecord record, Callback callback) { + TopicPartition tp = null; try { Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs); int partition = partitioner.partition(record, cluster); ensureValidSize(record.key(), record.value()); - TopicPartition tp = new TopicPartition(record.topic(), partition); + tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); this.sender.wakeup(); @@ -232,7 +233,7 @@ public class KafkaProducer implements Producer { } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) - callback.onCompletion(null, e); + callback.onCompletion(new RecordMetadata(tp, -1L, -1L), e); return new FutureFailure(e); } catch (InterruptedException e) { throw new KafkaException(e); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 038a05a..d345075 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -16,6 +16,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; @@ -86,7 +87,7 @@ public final class RecordBatch { if (exception == null) thunk.callback.onCompletion(thunk.future.get(), null); else - thunk.callback.onCompletion(null, exception); + thunk.callback.onCompletion(new RecordMetadata(topicPartition, -1L, -1L), exception); } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7b5d144..7222383 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -185,7 +185,7 @@ public class Sender implements Runnable { // should we update our metadata? List sends = new ArrayList(); - maybeUpdateMetadata(cluster, sends, now); + InFlightRequest metadataRequest = maybeUpdateMetadata(cluster, sends, now); // prune the list of ready topics to eliminate any that we aren't ready to send yet List sendable = processReadyPartitions(cluster, ready, now); @@ -194,10 +194,11 @@ public class Sender implements Runnable { List batches = this.accumulator.drain(sendable, this.maxRequestSize); List requests = collate(cluster, batches); - if (ready.size() > 0) { + if (ready.size() > 0 || metadataRequest != null) { log.trace("Partitions with complete batches: {}", ready); log.trace("Partitions ready to initiate a request: {}", sendable); - log.trace("Created {} requests: {}", requests.size(), requests); + log.trace("Created {} produce requests: {}", requests.size(), requests); + log.trace("Created metadata request: {}", metadataRequest); } for (int i = 0; i < requests.size(); i++) { @@ -222,26 +223,29 @@ public class Sender implements Runnable { /** * Add a metadata request to the list of sends if we need to make one + * @return A metadata request or null. */ - private void maybeUpdateMetadata(Cluster cluster, List sends, long now) { + private InFlightRequest maybeUpdateMetadata(Cluster cluster, List sends, long now) { + InFlightRequest request = null; if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) - return; + return request; Node node = selectMetadataDestination(cluster); if (node == null) - return; + return request; if (nodeStates.isConnected(node.id())) { Set topics = metadata.topics(); log.debug("Sending metadata update request for topics {} to node {}", topics, node.id()); this.metadataFetchInProgress = true; - InFlightRequest request = metadataRequest(node.id(), topics); + request = metadataRequest(node.id(), topics); sends.add(request.request); this.inFlightRequests.add(request); } else if (nodeStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one initiateConnect(node, now); } + return request; } /** @@ -345,6 +349,7 @@ public class Sender implements Runnable { nodeStates.disconnected(node); log.debug("Node {} disconnected.", node); for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { + log.trace("Cancelled request {}", request); ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey()); switch (requestKey) { case PRODUCE: diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 73ad6cd..dc03fd0 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -230,9 +230,20 @@ public class Struct { StringBuilder b = new StringBuilder(); b.append('{'); for (int i = 0; i < this.values.length; i++) { - b.append(this.schema.get(i).name); + Field f = this.schema.get(i); + b.append(f.name); b.append('='); - b.append(this.values[i]); + if (f.type() instanceof ArrayOf) { + Object[] arrayValue = (Object[]) this.values[i]; + b.append('['); + for (int j = 0; j < arrayValue.length; j++) { + b.append(arrayValue[j]); + if (j < arrayValue.length - 1) + b.append(','); + } + b.append(']'); + } else + b.append(this.values[i]); if (i < this.values.length - 1) b.append(','); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 6ac2e53..6fa4a58 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -29,6 +29,16 @@ public class ProduceResponse { this.errorCode = errorCode; this.baseOffset = baseOffset; } + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append('{'); + b.append("pid: " + partitionId); + b.append(",error: " + errorCode); + b.append(",offset: " + baseOffset); + b.append('}'); + return b.toString(); + } } private final Map> responses; @@ -54,4 +64,22 @@ public class ProduceResponse { public Map> responses() { return this.responses; } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append('{'); + boolean isFirst = true; + for (Map response : responses.values()) { + for (Map.Entry entry : response.entrySet()) { + if (isFirst) + isFirst = false; + else + b.append(','); + b.append(entry.getKey() + " : " + entry.getValue()); + } + } + b.append('}'); + return b.toString(); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 0c6b365..a2c27d2 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -15,11 +15,16 @@ package org.apache.kafka.common.utils; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Utils { public static String NL = System.getProperty("line.separator"); + private static final Logger log = LoggerFactory.getLogger(Utils.class); /** * Turn the given UTF8 byte array into a string @@ -241,4 +246,19 @@ public class Utils { return h; } + public static Callback errorLoggingCallback(final byte[] key, final byte[] value, final boolean logAsString) { + return new Callback() { + public void onCompletion(RecordMetadata metadata, Exception e) { + if (e != null) { + String keyString = (key == null) ? "null" : + logAsString ? new String(key) : key.length + " bytes"; + String valueString = (value == null) ? "null" : + logAsString ? new String(value) : value.length + " bytes"; + log.error("Error when sending message with key: " + keyString + ", value: " + valueString + + " to " + metadata.topic() + ":" + metadata.partition() + + " with error " + e.getClass().getName() + " : " + e.getMessage()); + } + } + }; + } } diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala index d1dc13b..14581cf 100644 --- a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala @@ -168,10 +168,12 @@ object MirrorMaker extends Logging { val producerId = Utils.abs(java.util.Arrays.hashCode(producerRecord.key())) % producers.size trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()), producerId)) val producer = producers(producerId) - producer.send(producerRecord, Utils.errorLoggingCallback(producerRecord.key(), producerRecord.value())) + producer.send(producerRecord, + org.apache.kafka.common.utils.Utils.errorLoggingCallback(producerRecord.key(), producerRecord.value(), false)) } else { val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size) - producers(producerId).send(producerRecord, Utils.errorLoggingCallback(producerRecord.key(), producerRecord.value())) + producers(producerId).send(producerRecord, + org.apache.kafka.common.utils.Utils.errorLoggingCallback(producerRecord.key(), producerRecord.value(), false)) trace("Sent message to producer " + producerId) } } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 33e05f0..6bfbac1 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -25,12 +25,10 @@ import java.util.concurrent.locks.Lock import java.lang.management._ import javax.management._ import scala.collection._ -import mutable.ListBuffer import scala.collection.mutable import java.util.Properties import kafka.common.KafkaException import kafka.common.KafkaStorageException -import org.apache.kafka.clients.producer.{RecordMetadata, Callback} /** @@ -541,25 +539,4 @@ object Utils extends Logging { lock.unlock() } } - - def errorLoggingCallback(key: Array[Byte], value: Array[Byte], logAsString: Boolean = false) = { - new Callback() { - def onCompletion(metadata: RecordMetadata, e: Exception) { - if (e != null) { - val keyString = if (key == null) - "null" - else { - if (logAsString) new String(key) else key.length + " bytes" - } - val valueString = if (value == null) - "null" - else { - if (logAsString) new String(value) else value.length + " bytes" - } - error("Error when sending message with key: " + keyString + ", value: " + valueString + - " with exception " + e.getMessage) - } - } - } - } } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index f061dba..f2ba126 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -26,9 +26,10 @@ import java.text.SimpleDateFormat import kafka.serializer._ import java.util._ import collection.immutable.List -import kafka.utils.{ VerifiableProperties, Logging, Utils } +import kafka.utils.{VerifiableProperties, Logging} import kafka.metrics.KafkaMetricsReporter import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.utils.Utils /** * Load test for the producer