diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java new file mode 100644 index 0000000..368e8f3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ErrorLoggingCallback implements Callback { + private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class); + private byte[] key; + private byte[] value; + private boolean logAsString; + + public ErrorLoggingCallback(byte[] key, byte[] value, boolean logAsString) { + this.key = key; + this.value = value; + this.logAsString = logAsString; + } + + public void onCompletion(RecordMetadata metadata, Exception e) { + if (e != null) { + String keyString = (key == null) ? "null" : + logAsString ? new String(key) : key.length + " bytes"; + String valueString = (value == null) ? "null" : + logAsString ? new String(value) : value.length + " bytes"; + log.error("Error when sending message with key: " + keyString + ", value: " + valueString + + " with error " + e.getMessage()); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7b5d144..d02ff39 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -185,7 +185,7 @@ public class Sender implements Runnable { // should we update our metadata? List sends = new ArrayList(); - maybeUpdateMetadata(cluster, sends, now); + InFlightRequest metadataRequest = maybeUpdateMetadata(cluster, sends, now); // prune the list of ready topics to eliminate any that we aren't ready to send yet List sendable = processReadyPartitions(cluster, ready, now); @@ -197,9 +197,12 @@ public class Sender implements Runnable { if (ready.size() > 0) { log.trace("Partitions with complete batches: {}", ready); log.trace("Partitions ready to initiate a request: {}", sendable); - log.trace("Created {} requests: {}", requests.size(), requests); + log.trace("Created {} produce requests: {}", requests.size(), requests); } + if (metadataRequest != null) + log.trace("Created metadata request: {}", metadataRequest); + for (int i = 0; i < requests.size(); i++) { InFlightRequest request = requests.get(i); this.inFlightRequests.add(request); @@ -222,26 +225,29 @@ public class Sender implements Runnable { /** * Add a metadata request to the list of sends if we need to make one + * @return A metadata request or null. */ - private void maybeUpdateMetadata(Cluster cluster, List sends, long now) { + private InFlightRequest maybeUpdateMetadata(Cluster cluster, List sends, long now) { + InFlightRequest request = null; if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) - return; + return request; Node node = selectMetadataDestination(cluster); if (node == null) - return; + return request; if (nodeStates.isConnected(node.id())) { Set topics = metadata.topics(); log.debug("Sending metadata update request for topics {} to node {}", topics, node.id()); this.metadataFetchInProgress = true; - InFlightRequest request = metadataRequest(node.id(), topics); + request = metadataRequest(node.id(), topics); sends.add(request.request); this.inFlightRequests.add(request); } else if (nodeStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one initiateConnect(node, now); } + return request; } /** @@ -345,6 +351,7 @@ public class Sender implements Runnable { nodeStates.disconnected(node); log.debug("Node {} disconnected.", node); for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { + log.trace("Cancelled request {} due to disconnected socket", request); ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey()); switch (requestKey) { case PRODUCE: diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 73ad6cd..dc03fd0 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -230,9 +230,20 @@ public class Struct { StringBuilder b = new StringBuilder(); b.append('{'); for (int i = 0; i < this.values.length; i++) { - b.append(this.schema.get(i).name); + Field f = this.schema.get(i); + b.append(f.name); b.append('='); - b.append(this.values[i]); + if (f.type() instanceof ArrayOf) { + Object[] arrayValue = (Object[]) this.values[i]; + b.append('['); + for (int j = 0; j < arrayValue.length; j++) { + b.append(arrayValue[j]); + if (j < arrayValue.length - 1) + b.append(','); + } + b.append(']'); + } else + b.append(this.values[i]); if (i < this.values.length - 1) b.append(','); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 6ac2e53..6fa4a58 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -29,6 +29,16 @@ public class ProduceResponse { this.errorCode = errorCode; this.baseOffset = baseOffset; } + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append('{'); + b.append("pid: " + partitionId); + b.append(",error: " + errorCode); + b.append(",offset: " + baseOffset); + b.append('}'); + return b.toString(); + } } private final Map> responses; @@ -54,4 +64,22 @@ public class ProduceResponse { public Map> responses() { return this.responses; } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append('{'); + boolean isFirst = true; + for (Map response : responses.values()) { + for (Map.Entry entry : response.entrySet()) { + if (isFirst) + isFirst = false; + else + b.append(','); + b.append(entry.getKey() + " : " + entry.getValue()); + } + } + b.append('}'); + return b.toString(); + } } diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala index d1dc13b..6f90549 100644 --- a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala @@ -24,6 +24,7 @@ import kafka.consumer._ import collection.mutable.ListBuffer import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} import java.util.concurrent.atomic.AtomicInteger +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback object MirrorMaker extends Logging { @@ -168,10 +169,12 @@ object MirrorMaker extends Logging { val producerId = Utils.abs(java.util.Arrays.hashCode(producerRecord.key())) % producers.size trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()), producerId)) val producer = producers(producerId) - producer.send(producerRecord, Utils.errorLoggingCallback(producerRecord.key(), producerRecord.value())) + producer.send(producerRecord, + new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(), false)) } else { val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size) - producers(producerId).send(producerRecord, Utils.errorLoggingCallback(producerRecord.key(), producerRecord.value())) + producers(producerId).send(producerRecord, + new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(), false)) trace("Sent message to producer " + producerId) } } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 33e05f0..6bfbac1 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -25,12 +25,10 @@ import java.util.concurrent.locks.Lock import java.lang.management._ import javax.management._ import scala.collection._ -import mutable.ListBuffer import scala.collection.mutable import java.util.Properties import kafka.common.KafkaException import kafka.common.KafkaStorageException -import org.apache.kafka.clients.producer.{RecordMetadata, Callback} /** @@ -541,25 +539,4 @@ object Utils extends Logging { lock.unlock() } } - - def errorLoggingCallback(key: Array[Byte], value: Array[Byte], logAsString: Boolean = false) = { - new Callback() { - def onCompletion(metadata: RecordMetadata, e: Exception) { - if (e != null) { - val keyString = if (key == null) - "null" - else { - if (logAsString) new String(key) else key.length + " bytes" - } - val valueString = if (value == null) - "null" - else { - if (logAsString) new String(value) else value.length + " bytes" - } - error("Error when sending message with key: " + keyString + ", value: " + valueString + - " with exception " + e.getMessage) - } - } - } - } } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index f061dba..f12a45b 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -26,9 +26,10 @@ import java.text.SimpleDateFormat import kafka.serializer._ import java.util._ import collection.immutable.List -import kafka.utils.{ VerifiableProperties, Logging, Utils } +import kafka.utils.{VerifiableProperties, Logging, Utils} import kafka.metrics.KafkaMetricsReporter import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback /** * Load test for the producer @@ -219,7 +220,7 @@ object ProducerPerformance extends Logging { this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get() } else { this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes), - Utils.errorLoggingCallback(null, bytes, if (config.seqIdMode) true else false)) + new ErrorLoggingCallback(null, bytes, if (config.seqIdMode) true else false)) } }