diff --git a/build.gradle b/build.gradle index 58a6396..033a206 100644 --- a/build.gradle +++ b/build.gradle @@ -309,6 +309,7 @@ project(':clients') { archivesBaseName = "kafka-clients" dependencies { + compile "org.slf4j:slf4j-api:1.7.6" testCompile 'com.novocode:junit-interface:0.9' } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index e4bc972..0f294b7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -44,6 +44,8 @@ import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.SystemTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Kafka client that publishes records to the Kafka cluster. @@ -55,6 +57,8 @@ import org.apache.kafka.common.utils.SystemTime; */ public class KafkaProducer implements Producer { + private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); + private final Partitioner partitioner; private final int maxRequestSize; private final long metadataFetchTimeoutMs; @@ -84,6 +88,7 @@ public class KafkaProducer implements Producer { } private KafkaProducer(ProducerConfig config) { + log.trace("Starting the Kafka producer"); this.metrics = new Metrics(new MetricConfig(), Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")), new SystemTime()); @@ -113,8 +118,10 @@ public class KafkaProducer implements Producer { config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), new SystemTime()); - this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true); + this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true); this.ioThread.start(); + config.logUnused(); + log.debug("Kafka producer started"); } private static List parseAndValidateAddresses(List urls) { @@ -123,7 +130,7 @@ public class KafkaProducer implements Producer { if (url != null && url.length() > 0) { String[] pieces = url.split(":"); if (pieces.length != 2) - throw new ConfigException("Invalid url in metadata.broker.list: " + url); + throw new ConfigException("Invalid url in " + ProducerConfig.BROKER_LIST_CONFIG + ": " + url); try { InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); if (address.isUnresolved()) @@ -214,10 +221,12 @@ public class KafkaProducer implements Producer { int partition = partitioner.partition(record, cluster); ensureValidSize(record.key(), record.value()); TopicPartition tp = new TopicPartition(record.topic(), partition); + log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); this.sender.wakeup(); return future; } catch (Exception e) { + log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); return new FutureFailure(e); @@ -255,6 +264,7 @@ public class KafkaProducer implements Producer { */ @Override public void close() { + log.trace("Closing the Kafka producer."); this.accumulator.close(); this.sender.initiateClose(); try { @@ -263,6 +273,7 @@ public class KafkaProducer implements Producer { throw new KafkaException(e); } this.metrics.close(); + log.debug("The Kafka producer has closed."); } private static class FutureFailure implements Future { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java index 034bf33..c3181b3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer; @@ -97,4 +93,10 @@ public final class ProducerRecord { return partition; } + @Override + public String toString() { + String key = this.key == null ? "null" : ("byte[" + this.key.length + "]"); + String value = this.value == null ? "null" : ("byte[" + this.value.length + "]"); + return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 62613a3..e4c1cf0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -19,6 +19,8 @@ import java.util.Set; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A class encapsulating some of the logic around metadata. @@ -30,6 +32,8 @@ import org.apache.kafka.common.errors.TimeoutException; */ public final class Metadata { + private static final Logger log = LoggerFactory.getLogger(Metadata.class); + private final long refreshBackoffMs; private final long metadataExpireMs; private long lastRefresh; @@ -81,11 +85,12 @@ public final class Metadata { topics.add(topic); forceUpdate = true; try { + log.trace("Requesting metadata update for topic {}.", topic); wait(maxWaitMs); } catch (InterruptedException e) { /* this is fine, just try again */ } - long ellapsed = System.currentTimeMillis() - begin; - if (ellapsed > maxWaitMs) + long elapsed = System.currentTimeMillis() - begin; + if (elapsed > maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } else { return cluster; @@ -127,6 +132,7 @@ public final class Metadata { this.lastRefresh = now; this.cluster = cluster; notifyAll(); + log.debug("Updated cluster metadata to {}", cluster); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ce5cf27..6990274 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -33,6 +33,8 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} @@ -43,6 +45,8 @@ import org.apache.kafka.common.utils.Utils; */ public final class RecordAccumulator { + private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class); + private volatile boolean closed; private int drainIndex; private final int batchSize; @@ -126,6 +130,7 @@ public final class RecordAccumulator { // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); + log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size); synchronized (dq) { RecordBatch first = dq.peekLast(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index eb16f6d..4ce1b00 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -20,6 +20,8 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A batch of records that is or will be sent. @@ -27,6 +29,9 @@ import org.apache.kafka.common.record.MemoryRecords; * This class is not thread safe and external synchronization must be used when modifying it */ public final class RecordBatch { + + private static final Logger log = LoggerFactory.getLogger(RecordBatch.class); + public int recordCount = 0; public volatile int attempts = 0; public final long created; @@ -64,22 +69,27 @@ public final class RecordBatch { /** * Complete the request * - * @param offset The offset + * @param baseOffset The base offset of the messages assigned by the server * @param errorCode The error code or 0 if no error */ - public void done(long offset, RuntimeException exception) { - this.produceFuture.done(topicPartition, offset, exception); + public void done(long baseOffset, RuntimeException exception) { + this.produceFuture.done(topicPartition, baseOffset, exception); + log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", + topicPartition, + baseOffset, + exception); // execute callbacks for (int i = 0; i < this.thunks.size(); i++) { try { Thunk thunk = this.thunks.get(i); + long offset = this.produceFuture.baseOffset() + thunk.relativeOffset; + log.trace("Executing callback for record {}", offset); if (exception == null) - thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset), - null); + thunk.callback.onCompletion(new RecordMetadata(topicPartition, offset), null); else thunk.callback.onCompletion(null, exception); } catch (Exception e) { - e.printStackTrace(); + log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e); } } } @@ -96,4 +106,9 @@ public final class RecordBatch { this.relativeOffset = relativeOffset; } } + + @Override + public String toString() { + return "RecordBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")"; + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index e373265..05c6920 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -45,6 +45,8 @@ import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata @@ -52,6 +54,8 @@ import org.apache.kafka.common.utils.Utils; */ public class Sender implements Runnable { + private static final Logger log = LoggerFactory.getLogger(Sender.class); + /* the state of each nodes connection */ private final NodeStates nodeStates; @@ -138,15 +142,19 @@ public class Sender implements Runnable { * The main run loop for the sender thread */ public void run() { + log.trace("Starting Kafka producer I/O thread."); + // main loop, runs until close is called while (running) { try { run(time.milliseconds()); } catch (Exception e) { - e.printStackTrace(); + log.error("Uncaught error in kafka producer I/O thread: ", e); } } + log.trace("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); + // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. @@ -155,12 +163,14 @@ public class Sender implements Runnable { try { unsent = run(time.milliseconds()); } catch (Exception e) { - e.printStackTrace(); + log.error("Uncaught error in kafka producer I/O thread: ", e); } } while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0); // close all the connections this.selector.close(); + + log.trace("Shutdown of Kafka producer I/O thread has completed."); } /** @@ -184,6 +194,13 @@ public class Sender implements Runnable { // create produce requests List batches = this.accumulator.drain(sendable, this.maxRequestSize); List requests = collate(cluster, batches); + + if (ready.size() > 0) { + log.trace("Partitions with complete batches: {}", ready); + log.trace("Partitions ready to initiate a request: {}", sendable); + log.trace("Created {} requests: {}", requests.size(), requests); + } + for (int i = 0; i < requests.size(); i++) { InFlightRequest request = requests.get(i); this.inFlightRequests.add(request); @@ -194,7 +211,7 @@ public class Sender implements Runnable { try { this.selector.poll(100L, sends); } catch (IOException e) { - e.printStackTrace(); + log.error("Unexpected error during I/O in producer network thread", e); } // handle responses, connections, and disconnections @@ -218,8 +235,10 @@ public class Sender implements Runnable { return; if (nodeStates.isConnected(node.id())) { + Set topics = metadata.topics(); + log.debug("Sending metadata update request for topics {} to node {}", topics, node.id()); this.metadataFetchInProgress = true; - InFlightRequest request = metadataRequest(node.id(), metadata.topics()); + InFlightRequest request = metadataRequest(node.id(), topics); sends.add(request.request); this.inFlightRequests.add(request); } else if (nodeStates.canConnect(node.id(), now)) { @@ -308,6 +327,7 @@ public class Sender implements Runnable { */ private void initiateConnect(Node node, long now) { try { + log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.nodeStates.connecting(node.id(), now); } catch (IOException e) { @@ -315,6 +335,7 @@ public class Sender implements Runnable { nodeStates.disconnected(node.id()); /* maybe the problem is our metadata, update it */ metadata.forceUpdate(); + log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } } @@ -324,6 +345,7 @@ public class Sender implements Runnable { private void handleDisconnects(List disconnects, long now) { // clear out the in-flight requests for the disconnected broker for (int node : disconnects) { + log.debug("Node {} disconnected.", node); for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { if (request.batches != null) { for (RecordBatch batch : request.batches.values()) { @@ -347,8 +369,10 @@ public class Sender implements Runnable { * Record any connections that completed in our node state */ private void handleConnects(List connects) { - for (Integer id : connects) + for (Integer id : connects) { + log.debug("Completed connection to node {}", id); this.nodeStates.connected(id); + } } /** @@ -359,6 +383,7 @@ public class Sender implements Runnable { for (NetworkSend send : sends) { Deque requests = this.inFlightRequests.requestQueue(send.destination()); InFlightRequest request = requests.peekFirst(); + log.trace("Completed send of request to node {}: {}", request.request.destination(), request.request); if (!request.expectResponse) { requests.pollFirst(); if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) { @@ -382,12 +407,16 @@ public class Sender implements Runnable { short apiKey = req.request.header().apiKey(); Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); correlate(req.request.header(), header); - if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) + if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) { + log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId()); handleProduceResponse(req, body, now); - else if (req.request.header().apiKey() == ApiKeys.METADATA.id) + } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) { + log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header() + .correlationId()); handleMetadataResponse(body, now); - else + } else { throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey()); + } } } @@ -399,6 +428,8 @@ public class Sender implements Runnable { // created which means we will get errors and no nodes until it exists if (cluster.nodes().size() > 0) this.metadata.update(cluster, now); + else + log.trace("Ignoring empty metadata response."); } /** @@ -422,6 +453,7 @@ public class Sender implements Runnable { RecordBatch batch = request.batches.get(new TopicPartition(topic, partition)); if (canRetry(batch, error)) { // retry + log.warn("Got error for topic-partition {}, retrying. Error: {}", topic, partition, error); this.accumulator.reenqueue(batch, now); } else { // tell the user the result of their request @@ -620,6 +652,11 @@ public class Sender implements Runnable { this.request = request; this.expectResponse = expectResponse; } + + @Override + public String toString() { + return "InFlightRequest(expectResponse=" + expectResponse + ", batches=" + batches + ", request=" + request + ")"; + } } /** diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index 5caaaae..426bd1e 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -73,9 +73,9 @@ public final class Cluster { */ public static Cluster bootstrap(List addresses) { List nodes = new ArrayList(); - int nodeId = Integer.MIN_VALUE; + int nodeId = -1; for (InetSocketAddress address : addresses) - nodes.add(new Node(nodeId++, address.getHostName(), address.getPort())); + nodes.add(new Node(nodeId--, address.getHostName(), address.getPort())); return new Cluster(nodes, new ArrayList(0)); } @@ -117,4 +117,9 @@ public final class Cluster { return this.partitionsByTopic.get(topic); } + @Override + public String toString() { + return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")"; + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index 4197e50..0e47ff3 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common; @@ -86,7 +82,7 @@ public class Node { @Override public String toString() { - return "Node(" + id + ", " + host + ", " + port + ")"; + return "Node(" + (id < 0 ? "" : id + ", ") + host + ", " + port + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java index 08d66f1..b15aa2c 100644 --- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java +++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common; @@ -71,4 +67,29 @@ public class PartitionInfo { return inSyncReplicas; } + @Override + public String toString() { + return String.format("Partition(topic = %s, partition = %d, leader = %d, replicas = %s, isr = %s", + topic, + partition, + leader.id(), + fmtNodeIds(replicas), + fmtNodeIds(inSyncReplicas)); + } + + /* Extract the node ids from each item in the array and format for display */ + private String fmtNodeIds(Node[] nodes) { + StringBuilder b = new StringBuilder("["); + for (int i = 0; i < nodes.length - 1; i++) { + b.append(Integer.toString(nodes[i].id())); + b.append(','); + } + if (nodes.length > 0) { + b.append(Integer.toString(nodes[nodes.length - 1].id())); + b.append(','); + } + b.append("]"); + return b.toString(); + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index c3148e5..c989e25 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.config; @@ -25,7 +21,8 @@ import java.util.Set; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.Utils; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A convenient base class for configurations to extend. @@ -34,10 +31,17 @@ import org.apache.kafka.common.utils.Utils; */ public class AbstractConfig { + private final Logger log = LoggerFactory.getLogger(getClass()); + + /* configs for which values have been requested, used to detect unused configs */ private final Set used; - private final Map values; + + /* the original values passed in by the user */ private final Map originals; + /* the parsed values */ + private final Map values; + @SuppressWarnings("unchecked") public AbstractConfig(ConfigDef definition, Map originals) { /* check that all the keys are really strings */ @@ -47,6 +51,7 @@ public class AbstractConfig { this.originals = (Map) originals; this.values = definition.parse(this.originals); this.used = Collections.synchronizedSet(new HashSet()); + logAll(); } protected Object get(String key) { @@ -83,10 +88,30 @@ public class AbstractConfig { public Set unused() { Set keys = new HashSet(originals.keySet()); - keys.remove(used); + keys.removeAll(used); return keys; } + private void logAll() { + StringBuilder b = new StringBuilder(); + b.append(getClass().getSimpleName()); + b.append(" values: "); + b.append(Utils.NL); + for (Map.Entry entry : this.values.entrySet()) { + b.append('\t'); + b.append(entry.getKey()); + b.append(" = "); + b.append(entry.getValue()); + b.append(Utils.NL); + } + log.info(b.toString()); + } + + public void logUnused() { + for (String key : unused()) + log.warn("The configuration {} = {} was supplied but isn't a known config.", key, this.values.get(key)); + } + /** * Get a configured instance of the give class specified by the given configuration key. If the object implements * Configurable configure it using the configuration. diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index e08c349..c867c8d 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics; @@ -36,13 +32,16 @@ import javax.management.ObjectName; import javax.management.ReflectionException; import org.apache.kafka.common.KafkaException; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Register metrics in JMX as dynamic mbeans based on the metric names */ public class JmxReporter implements MetricsReporter { + private static final Logger log = LoggerFactory.getLogger(JmxReporter.class); + private final String prefix; private final Map mbeans = new HashMap(); @@ -160,7 +159,7 @@ public class JmxReporter implements MetricsReporter { list.add(new Attribute(name, getAttribute(name))); return list; } catch (Exception e) { - e.printStackTrace(); + log.error("Error getting JMX attribute: ", e); return new AttributeList(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index f1e474c..ac36460 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; import org.apache.kafka.common.KafkaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A selector interface for doing non-blocking multi-connection network I/O. @@ -58,6 +60,8 @@ import org.apache.kafka.common.KafkaException; */ public class Selector implements Selectable { + private static final Logger log = LoggerFactory.getLogger(Selector.class); + private final java.nio.channels.Selector selector; private final Map keys; private final List completedSends; @@ -140,17 +144,12 @@ public class Selector implements Selectable { */ @Override public void close() { - for (SelectionKey key : this.selector.keys()) { - try { - close(key); - } catch (IOException e) { - e.printStackTrace(); - } - } + for (SelectionKey key : this.selector.keys()) + close(key); try { this.selector.close(); } catch (IOException e) { - e.printStackTrace(); + log.error("Exception closing selector:", e); } } @@ -201,9 +200,7 @@ public class Selector implements Selectable { Transmissions transmissions = transmissions(key); SocketChannel channel = channel(key); try { - /* - * complete any connections that have finished their handshake - */ + /* complete any connections that have finished their handshake */ if (key.isConnectable()) { channel.finishConnect(); key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); @@ -222,9 +219,7 @@ public class Selector implements Selectable { } } - /* - * write to any sockets that have space in their buffer and for which we have data - */ + /* write to any sockets that have space in their buffer and for which we have data */ if (key.isWritable()) { transmissions.send.writeTo(channel); if (transmissions.send.remaining() <= 0) { @@ -238,7 +233,7 @@ public class Selector implements Selectable { if (!key.isValid()) close(key); } catch (IOException e) { - e.printStackTrace(); + log.error("Error in I/O: ", e); close(key); } } @@ -294,7 +289,7 @@ public class Selector implements Selectable { /** * Begin closing this connection */ - private void close(SelectionKey key) throws IOException { + private void close(SelectionKey key) { SocketChannel channel = channel(key); Transmissions trans = transmissions(key); if (trans != null) { @@ -304,8 +299,12 @@ public class Selector implements Selectable { } key.attach(null); key.cancel(); - channel.socket().close(); - channel.close(); + try { + channel.socket().close(); + channel.close(); + } catch (IOException e) { + log.error("Exception closing connection to node {}:", trans.id, e); + } } /** diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java index 457abb1..66cc2fe 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.requests; @@ -25,7 +21,6 @@ import org.apache.kafka.common.protocol.Protocol; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Struct; - /** * The header for a request in the Kafka protocol */ @@ -82,4 +77,9 @@ public class RequestHeader { public int sizeOf() { return header.sizeOf(); } + + @Override + public String toString() { + return header.toString(); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java index c5e9020..27cbf39 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.requests; @@ -21,7 +17,6 @@ import java.nio.ByteBuffer; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.protocol.types.Struct; - /** * A send object for a kafka request */ @@ -52,4 +47,9 @@ public class RequestSend extends NetworkSend { return body; } + @Override + public String toString() { + return "RequestSend(header=" + header.toString() + ", body=" + body.toString() + ")"; + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java index 9ff793f..57247c8 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java @@ -1,32 +1,33 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A wrapper for Thread that sets things up nicely */ public class KafkaThread extends Thread { - public KafkaThread(String name, Runnable runnable, boolean daemon) { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public KafkaThread(final String name, Runnable runnable, boolean daemon) { super(runnable, name); setDaemon(daemon); setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { - e.printStackTrace(); + log.error("Uncaught exception in " + name + ": ", e); } }); } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 9c34e7d..0c6b365 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.utils; @@ -21,9 +17,10 @@ import java.nio.ByteBuffer; import org.apache.kafka.common.KafkaException; - public class Utils { + public static String NL = System.getProperty("line.separator"); + /** * Turn the given UTF8 byte array into a string * diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 3dd562c..fb759d9 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -474,6 +474,6 @@ class ReplicaManager(val config: KafkaConfig, info("Shut down") replicaFetcherManager.shutdown() checkpointHighWatermarks() - info("Shutted down completely") + info("Shut down completely") } }