From d7b943ab8b1d2c4bb8c3f4a729a2b631dae06002 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Tue, 14 Apr 2015 11:03:25 -0700 Subject: [PATCH] fix potential resource leak when KafkaProducer contructor failed in the middle --- .../java/org/apache/kafka/clients/ClientUtils.java | 16 ++ .../java/org/apache/kafka/clients/KafkaClient.java | 8 +- .../kafka/clients/consumer/KafkaConsumer.java | 178 +++++++++++-------- .../kafka/clients/producer/KafkaProducer.java | 193 ++++++++++++--------- .../kafka/clients/producer/internals/Sender.java | 7 +- .../org/apache/kafka/common/metrics/Metrics.java | 4 +- .../kafka/common/serialization/Deserializer.java | 7 +- .../kafka/common/serialization/Serializer.java | 7 +- .../kafka/clients/consumer/KafkaConsumerTest.java | 80 +++++++++ .../kafka/clients/producer/KafkaProducerTest.java | 80 +++++++++ 10 files changed, 409 insertions(+), 171 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index d0da5d7..0d68bf1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -12,16 +12,21 @@ */ package org.apache.kafka.clients; +import java.io.Closeable; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; public class ClientUtils { + private static final Logger log = LoggerFactory.getLogger(ClientUtils.class); public static List parseAndValidateAddresses(List urls) { List addresses = new ArrayList(); @@ -45,4 +50,15 @@ public class ClientUtils { throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); return addresses; } + + public static void closeQuietly(Closeable c, String name, AtomicReference firstException) { + if (c != null) { + try { + c.close(); + } catch (Throwable t) { + firstException.compareAndSet(null, t); + log.error("Failed to close " + name, t); + } + } + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 96ac6d0..1311f85 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.clients; +import java.io.Closeable; import java.util.List; import org.apache.kafka.common.Node; @@ -21,7 +22,7 @@ import org.apache.kafka.common.requests.RequestHeader; /** * The interface for {@link NetworkClient} */ -public interface KafkaClient { +public interface KafkaClient extends Closeable { /** * Check if we are currently ready to send another request to the given node but don't attempt to connect if we @@ -130,9 +131,4 @@ public interface KafkaClient { */ public void wakeup(); - /** - * Close the client and disconnect from all nodes - */ - public void close(); - } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 2124334..09ecb42 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -24,6 +24,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; @@ -32,6 +33,7 @@ import org.apache.kafka.clients.consumer.internals.Coordinator; import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -346,6 +348,8 @@ public class KafkaConsumer implements Consumer { private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private final Coordinator coordinator; + private final Deserializer keyDeserializer; + private final Deserializer valueDeserializer; private final Fetcher fetcher; private final Time time; @@ -437,74 +441,97 @@ public class KafkaConsumer implements Consumer { ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) { - log.debug("Starting the Kafka consumer"); - if (callback == null) - this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG, - ConsumerRebalanceCallback.class); - else - this.rebalanceCallback = callback; - this.time = new SystemTime(); - this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); - this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); - this.lastCommitAttemptMs = time.milliseconds(); - - MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) - .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); - String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); - String jmxPrefix = "kafka.consumer"; - if (clientId.length() <= 0) - clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); - List reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - reporters.add(new JmxReporter(jmxPrefix)); - this.metrics = new Metrics(metricConfig, reporters, time); - this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); - List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), 0); - - String metricGrpPrefix = "consumer"; - Map metricsTags = new LinkedHashMap(); - metricsTags.put("client-id", clientId); - this.client = new NetworkClient(new Selector(metrics, time, metricGrpPrefix, metricsTags), - this.metadata, - clientId, - 100, // a fixed large enough value will suffice - config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), - config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), - config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG)); - this.subscriptions = new SubscriptionState(); - this.coordinator = new Coordinator(this.client, - config.getString(ConsumerConfig.GROUP_ID_CONFIG), - this.retryBackoffMs, - config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), - config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), - this.metadata, - this.subscriptions, - metrics, - metricGrpPrefix, - metricsTags, - this.time); - this.fetcher = new Fetcher(this.client, - this.retryBackoffMs, - config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), - config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), - config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), - config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), - config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(), - keyDeserializer == null ? config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class) : keyDeserializer, - valueDeserializer == null ? config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class) : valueDeserializer, - this.metadata, - this.subscriptions, - metrics, - metricGrpPrefix, - metricsTags, - this.time); - - config.logUnused(); - - log.debug("Kafka consumer created"); + try { + log.debug("Starting the Kafka consumer"); + if (callback == null) + this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG, + ConsumerRebalanceCallback.class); + else + this.rebalanceCallback = callback; + this.time = new SystemTime(); + this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); + this.lastCommitAttemptMs = time.milliseconds(); + + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + TimeUnit.MILLISECONDS); + String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); + String jmxPrefix = "kafka.consumer"; + if (clientId.length() <= 0) + clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); + List reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(jmxPrefix)); + this.metrics = new Metrics(metricConfig, reporters, time); + this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); + List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + this.metadata.update(Cluster.bootstrap(addresses), 0); + + String metricGrpPrefix = "consumer"; + Map metricsTags = new LinkedHashMap(); + metricsTags.put("client-id", clientId); + this.client = new NetworkClient(new Selector(metrics, time, metricGrpPrefix, metricsTags), + this.metadata, + clientId, + 100, // a fixed large enough value will suffice + config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), + config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG)); + this.subscriptions = new SubscriptionState(); + this.coordinator = new Coordinator(this.client, + config.getString(ConsumerConfig.GROUP_ID_CONFIG), + this.retryBackoffMs, + config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), + config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), + this.metadata, + this.subscriptions, + metrics, + metricGrpPrefix, + metricsTags, + this.time); + + if (keyDeserializer == null) { + this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + Deserializer.class); + this.keyDeserializer.configure(config.originals(), false); + } else { + this.keyDeserializer = keyDeserializer; + } + if (valueDeserializer == null) { + this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + Deserializer.class); + this.valueDeserializer.configure(config.originals(), false); + } else { + this.valueDeserializer = valueDeserializer; + } + this.fetcher = new Fetcher(this.client, + this.retryBackoffMs, + config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), + config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), + config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), + config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), + config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(), + this.keyDeserializer, + this.valueDeserializer, + this.metadata, + this.subscriptions, + metrics, + metricGrpPrefix, + metricsTags, + this.time); + + config.logUnused(); + + log.debug("Kafka consumer created"); + } catch (Throwable t) { + // call close methods if internal objects are already constructed + // this is to prevent resource leak. see KAFKA-2121 + close(true); + // now propagate the exception + throw new KafkaException("Failed to construct kafka consumer", t); + } } /** @@ -806,13 +833,24 @@ public class KafkaConsumer implements Consumer { @Override public synchronized void close() { + close(false); + } + + private void close(boolean swallowException) { log.trace("Closing the Kafka consumer."); + AtomicReference firstException = new AtomicReference(); this.closed = true; - this.metrics.close(); - this.client.close(); + ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); + ClientUtils.closeQuietly(client, "consumer network client", firstException); + ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException); + ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException); log.debug("The Kafka consumer has closed."); + if (firstException.get() != null && !swallowException) { + throw new KafkaException("Failed to close kafka consumer", firstException.get()); + } } + private boolean shouldAutoCommit(long now) { return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b91e2c5..381fa29 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -18,6 +18,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; @@ -128,6 +129,7 @@ public class KafkaProducer implements Producer { private final long totalMemorySize; private final Metadata metadata; private final RecordAccumulator accumulator; + private final NetworkClient client; private final Sender sender; private final Metrics metrics; private final Thread ioThread; @@ -191,81 +193,89 @@ public class KafkaProducer implements Producer { @SuppressWarnings("unchecked") private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer) { - log.trace("Starting the Kafka producer"); - this.producerConfig = config; - this.time = new SystemTime(); - MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) - .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); - String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); - if (clientId.length() <= 0) - clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); - String jmxPrefix = "kafka.producer"; - List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - reporters.add(new JmxReporter(jmxPrefix)); - this.metrics = new Metrics(metricConfig, reporters, time); - this.partitioner = new Partitioner(); - long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); - this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); - this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); - this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); - this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - Map metricTags = new LinkedHashMap(); - metricTags.put("client-id", clientId); - this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), - this.totalMemorySize, - this.compressionType, - config.getLong(ProducerConfig.LINGER_MS_CONFIG), - retryBackoffMs, - config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), - metrics, - time, - metricTags); - List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); - - NetworkClient client = new NetworkClient(new Selector(this.metrics, time , "producer", metricTags), - this.metadata, - clientId, - config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), - config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), - config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), - config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); - this.sender = new Sender(client, - this.metadata, - this.accumulator, - config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), - (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), - config.getInt(ProducerConfig.RETRIES_CONFIG), - config.getInt(ProducerConfig.TIMEOUT_CONFIG), - this.metrics, - new SystemTime(), - clientId); - String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); - this.ioThread = new KafkaThread(ioThreadName, this.sender, true); - this.ioThread.start(); - - this.errors = this.metrics.sensor("errors"); - - if (keySerializer == null) { - this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - Serializer.class); - this.keySerializer.configure(config.originals(), true); - } else { - this.keySerializer = keySerializer; - } - if (valueSerializer == null) { - this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - Serializer.class); - this.valueSerializer.configure(config.originals(), false); - } else { - this.valueSerializer = valueSerializer; - } + try { + log.trace("Starting the Kafka producer"); + this.producerConfig = config; + this.time = new SystemTime(); + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + TimeUnit.MILLISECONDS); + String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); + if (clientId.length() <= 0) + clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); + String jmxPrefix = "kafka.producer"; + List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(jmxPrefix)); + this.metrics = new Metrics(metricConfig, reporters, time); + this.partitioner = new Partitioner(); + long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); + this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); + this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); + this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); + this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); + this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + Map metricTags = new LinkedHashMap(); + metricTags.put("client-id", clientId); + this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), + this.totalMemorySize, + this.compressionType, + config.getLong(ProducerConfig.LINGER_MS_CONFIG), + retryBackoffMs, + config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), + metrics, + time, + metricTags); + List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); + + this.client = new NetworkClient(new Selector(this.metrics, time, "producer", metricTags), + this.metadata, + clientId, + config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), + config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); + this.sender = new Sender(client, + this.metadata, + this.accumulator, + config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), + (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), + config.getInt(ProducerConfig.RETRIES_CONFIG), + config.getInt(ProducerConfig.TIMEOUT_CONFIG), + this.metrics, + new SystemTime(), + clientId); + String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); + this.ioThread = new KafkaThread(ioThreadName, this.sender, true); + this.ioThread.start(); + + this.errors = this.metrics.sensor("errors"); + + if (keySerializer == null) { + this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + Serializer.class); + this.keySerializer.configure(config.originals(), true); + } else { + this.keySerializer = keySerializer; + } + if (valueSerializer == null) { + this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + Serializer.class); + this.valueSerializer.configure(config.originals(), false); + } else { + this.valueSerializer = valueSerializer; + } - config.logUnused(); - log.debug("Kafka producer started"); + config.logUnused(); + log.debug("Kafka producer started"); + } catch (Throwable t) { + // call close methods if internal objects are already constructed + // this is to prevent resource leak. see KAFKA-2121 + close(true); + // now propagate the exception + throw new KafkaException("Failed to construct kafka producer", t); + } } private static int parseAcks(String acksString) { @@ -513,17 +523,36 @@ public class KafkaProducer implements Producer { */ @Override public void close() { + close(false); + } + + private void close(boolean swallowException) { log.trace("Closing the Kafka producer."); - this.sender.initiateClose(); - try { - this.ioThread.join(); - } catch (InterruptedException e) { - throw new InterruptException(e); + // this will keep track of the first encountered exception + AtomicReference firstException = new AtomicReference(); + if (this.sender != null) { + try { + this.sender.initiateClose(); + } catch (Throwable t) { + firstException.compareAndSet(null, t); + log.error("Failed to close sender", t); + } } - this.metrics.close(); - this.keySerializer.close(); - this.valueSerializer.close(); + if (this.ioThread != null) { + try { + this.ioThread.join(); + } catch (InterruptedException t) { + firstException.compareAndSet(null, t); + log.error("Interrupted while joining ioThread", t); + } + } + ClientUtils.closeQuietly(metrics, "producer metrics", firstException); + ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException); + ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException); log.debug("The Kafka producer has closed."); + if (firstException.get() != null && !swallowException) { + throw new KafkaException("Failed to close kafka producer", firstException.get()); + } } private static class FutureFailure implements Future { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 70954ca..b2db91c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -139,8 +139,11 @@ public class Sender implements Runnable { log.error("Uncaught error in kafka producer I/O thread: ", e); } } - - this.client.close(); + try { + this.client.close(); + } catch (Exception e) { + log.error("Failed to close network client", e); + } log.debug("Shutdown of Kafka producer I/O thread has completed."); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index b3d3d7c..5f6caf9 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.common.metrics; +import java.io.Closeable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -46,7 +47,7 @@ import org.apache.kafka.common.utils.Utils; * sensor.record(messageSize); * */ -public class Metrics { +public class Metrics implements Closeable { private final MetricConfig config; private final ConcurrentMap metrics; @@ -192,6 +193,7 @@ public class Metrics { /** * Close this metrics repository. */ + @Override public void close() { for (MetricsReporter reporter : this.reporters) reporter.close(); diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java index 13be6a3..9a57579 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.serialization; +import java.io.Closeable; import java.util.Map; /** @@ -21,7 +22,7 @@ import java.util.Map; * * A class that implements this interface is expected to have a constructor with no parameter. */ -public interface Deserializer { +public interface Deserializer extends Closeable { /** * Configure this class. @@ -38,8 +39,4 @@ public interface Deserializer { */ public T deserialize(String topic, byte[] data); - /** - * Close this deserializer - */ - public void close(); } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java index c2fdc23..c440540 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.serialization; +import java.io.Closeable; import java.util.Map; /** @@ -21,7 +22,7 @@ import java.util.Map; * * A class that implements this interface is expected to have a constructor with no parameter. */ -public interface Serializer { +public interface Serializer extends Closeable { /** * Configure this class. @@ -37,8 +38,4 @@ public interface Serializer { */ public byte[] serialize(String topic, T data); - /** - * Close this serializer - */ - public void close(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java new file mode 100644 index 0000000..68f3ecd --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +public class KafkaConsumerTest { + + public static class MockMetricsReporter implements MetricsReporter { + + private static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0); + + public MockMetricsReporter() { + + } + + @Override + public void init(List metrics) { + + } + + @Override + public void metricChange(KafkaMetric metric) { + + } + + @Override + public void close() { + CLOSE_COUNT.incrementAndGet(); + } + + @Override + public void configure(Map configs) { + + } + } + + @Test + public void testConstructorClose() throws Exception { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999"); + props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + + try { + KafkaConsumer consumer = new KafkaConsumer( + props, null, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } catch (KafkaException e) { + Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get()); + MockMetricsReporter.CLOSE_COUNT.set(0); + Assert.assertEquals("Failed to construct kafka consumer", e.getMessage()); + return; + } + Assert.fail("should have caught an exception and returned"); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java new file mode 100644 index 0000000..6244663 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +public class KafkaProducerTest { + + public static class MockMetricsReporter implements MetricsReporter { + + private static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0); + + public MockMetricsReporter() { + + } + + @Override + public void init(List metrics) { + + } + + @Override + public void metricChange(KafkaMetric metric) { + + } + + @Override + public void close() { + CLOSE_COUNT.incrementAndGet(); + } + + @Override + public void configure(Map configs) { + + } + } + + @Test + public void testConstructorClose() throws Exception { + Properties props = new Properties(); + props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999"); + props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + + try { + KafkaProducer producer = new KafkaProducer( + props, new ByteArraySerializer(), new ByteArraySerializer()); + } catch (KafkaException e) { + Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get()); + MockMetricsReporter.CLOSE_COUNT.set(0); + Assert.assertEquals("Failed to construct kafka producer", e.getMessage()); + return; + } + Assert.fail("should have caught an exception and returned"); + } +} -- 2.3.2 (Apple Git-55)