From 5f4551ff074fcfe26a4eb3d44c9e1685116bd701 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Tue, 14 Apr 2015 11:03:25 -0700 Subject: [PATCH 1/3] fix potential resource leak when KafkaProducer contructor failed in the middle --- .../kafka/clients/producer/KafkaProducer.java | 198 ++++++++++++--------- 1 file changed, 116 insertions(+), 82 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b91e2c5..cb99519 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -191,81 +191,89 @@ public class KafkaProducer implements Producer { @SuppressWarnings("unchecked") private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer) { - log.trace("Starting the Kafka producer"); - this.producerConfig = config; - this.time = new SystemTime(); - MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) - .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); - String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); - if (clientId.length() <= 0) - clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); - String jmxPrefix = "kafka.producer"; - List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - reporters.add(new JmxReporter(jmxPrefix)); - this.metrics = new Metrics(metricConfig, reporters, time); - this.partitioner = new Partitioner(); - long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); - this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); - this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); - this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); - this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - Map metricTags = new LinkedHashMap(); - metricTags.put("client-id", clientId); - this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), - this.totalMemorySize, - this.compressionType, - config.getLong(ProducerConfig.LINGER_MS_CONFIG), - retryBackoffMs, - config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), - metrics, - time, - metricTags); - List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); - - NetworkClient client = new NetworkClient(new Selector(this.metrics, time , "producer", metricTags), - this.metadata, - clientId, - config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), - config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), - config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), - config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); - this.sender = new Sender(client, - this.metadata, - this.accumulator, - config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), - (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), - config.getInt(ProducerConfig.RETRIES_CONFIG), - config.getInt(ProducerConfig.TIMEOUT_CONFIG), - this.metrics, - new SystemTime(), - clientId); - String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); - this.ioThread = new KafkaThread(ioThreadName, this.sender, true); - this.ioThread.start(); - - this.errors = this.metrics.sensor("errors"); - - if (keySerializer == null) { - this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - Serializer.class); - this.keySerializer.configure(config.originals(), true); - } else { - this.keySerializer = keySerializer; - } - if (valueSerializer == null) { - this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - Serializer.class); - this.valueSerializer.configure(config.originals(), false); - } else { - this.valueSerializer = valueSerializer; - } + try { + log.trace("Starting the Kafka producer"); + this.producerConfig = config; + this.time = new SystemTime(); + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + TimeUnit.MILLISECONDS); + String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); + if (clientId.length() <= 0) + clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); + String jmxPrefix = "kafka.producer"; + List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(jmxPrefix)); + this.metrics = new Metrics(metricConfig, reporters, time); + this.partitioner = new Partitioner(); + long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); + this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); + this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); + this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); + this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); + this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + Map metricTags = new LinkedHashMap(); + metricTags.put("client-id", clientId); + this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), + this.totalMemorySize, + this.compressionType, + config.getLong(ProducerConfig.LINGER_MS_CONFIG), + retryBackoffMs, + config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), + metrics, + time, + metricTags); + List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); + + NetworkClient client = new NetworkClient(new Selector(this.metrics, time, "producer", metricTags), + this.metadata, + clientId, + config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), + config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); + this.sender = new Sender(client, + this.metadata, + this.accumulator, + config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), + (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), + config.getInt(ProducerConfig.RETRIES_CONFIG), + config.getInt(ProducerConfig.TIMEOUT_CONFIG), + this.metrics, + new SystemTime(), + clientId); + String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); + this.ioThread = new KafkaThread(ioThreadName, this.sender, true); + this.ioThread.start(); + + this.errors = this.metrics.sensor("errors"); + + if (keySerializer == null) { + this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + Serializer.class); + this.keySerializer.configure(config.originals(), true); + } else { + this.keySerializer = keySerializer; + } + if (valueSerializer == null) { + this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + Serializer.class); + this.valueSerializer.configure(config.originals(), false); + } else { + this.valueSerializer = valueSerializer; + } - config.logUnused(); - log.debug("Kafka producer started"); + config.logUnused(); + log.debug("Kafka producer started"); + } catch(Throwable t) { + // call close methods if internal objects are already constructed + // this is to prevent resource leak. see KAFKA-2121 + close(); + // now propagate the exception + throw new RuntimeException("failed to construct kafka producer", t); + } } private static int parseAcks(String acksString) { @@ -514,15 +522,41 @@ public class KafkaProducer implements Producer { @Override public void close() { log.trace("Closing the Kafka producer."); - this.sender.initiateClose(); - try { - this.ioThread.join(); - } catch (InterruptedException e) { - throw new InterruptException(e); + if(this.sender != null) { + try { + this.sender.initiateClose(); + } catch(Throwable t) { + log.error("failed to close sender", t); + } + } + if(this.ioThread != null) { + try { + this.ioThread.join(); + } catch (InterruptedException e) { + log.error("interrupted while joining ioThread", e); + } + } + if(this.metrics != null) { + try { + this.metrics.close(); + } catch(Throwable t) { + log.error("failed to close metrics", t); + } + } + if(this.keySerializer != null) { + try { + this.keySerializer.close(); + } catch(Throwable t) { + log.error("failed to close keySerializer", t); + } + } + if(this.valueSerializer != null) { + try { + this.valueSerializer.close(); + } catch(Throwable t) { + log.error("failed to close valueSerializer", t); + } } - this.metrics.close(); - this.keySerializer.close(); - this.valueSerializer.close(); log.debug("The Kafka producer has closed."); } -- 2.3.2 (Apple Git-55) From 25746b67300151199df08c69b6b2d9fcaede3f1a Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Tue, 14 Apr 2015 14:30:35 -0700 Subject: [PATCH 2/3] add a unit test file --- .../kafka/clients/producer/KafkaProducerTest.java | 61 ++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java new file mode 100644 index 0000000..1cd78dd --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -0,0 +1,61 @@ +package org.apache.kafka.clients.producer; + +import junit.framework.Assert; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +public class KafkaProducerTest { + + public static class MockMetricsReporter implements MetricsReporter { + + private static final AtomicInteger closeCount = new AtomicInteger(0); + + public MockMetricsReporter() { + + } + + @Override + public void init(List metrics) { + + } + + @Override + public void metricChange(KafkaMetric metric) { + + } + + @Override + public void close() { + closeCount.incrementAndGet(); + } + + @Override + public void configure(Map configs) { + + } + } + + @Test + public void testConstructorClose() throws Exception { + Properties props = new Properties(); + props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999"); + props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + + try { + KafkaProducer producer = new KafkaProducer( + props, new ByteArraySerializer(), new ByteArraySerializer()); + } catch(RuntimeException e) { + Assert.assertEquals(1, MockMetricsReporter.closeCount.get()); + MockMetricsReporter.closeCount.set(0); + Assert.assertEquals("failed to construct kafka producer", e.getMessage()); + } + } +} -- 2.3.2 (Apple Git-55) From f2f4cee0eef15a98d80a0a5a6d7d3fd563df55fd Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Thu, 16 Apr 2015 09:47:23 -0700 Subject: [PATCH 3/3] changes based on Ewen's review feedbacks --- .../kafka/clients/producer/KafkaProducer.java | 43 +++++++++++++++------- .../kafka/clients/producer/KafkaProducerTest.java | 33 +++++++++++++---- 2 files changed, 55 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index cb99519..56e085a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -18,6 +18,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; @@ -267,12 +268,12 @@ public class KafkaProducer implements Producer { config.logUnused(); log.debug("Kafka producer started"); - } catch(Throwable t) { + } catch (Throwable t) { // call close methods if internal objects are already constructed // this is to prevent resource leak. see KAFKA-2121 - close(); + close(true); // now propagate the exception - throw new RuntimeException("failed to construct kafka producer", t); + throw new KafkaException("Failed to construct kafka producer", t); } } @@ -521,43 +522,57 @@ public class KafkaProducer implements Producer { */ @Override public void close() { + close(false); + } + + private void close(boolean swallowException) { log.trace("Closing the Kafka producer."); - if(this.sender != null) { + // this will keep track of the first encountered exception + AtomicReference firstException = new AtomicReference(); + if (this.sender != null) { try { this.sender.initiateClose(); - } catch(Throwable t) { + } catch (Throwable t) { + firstException.compareAndSet(null, t); log.error("failed to close sender", t); } } - if(this.ioThread != null) { + if (this.ioThread != null) { try { this.ioThread.join(); - } catch (InterruptedException e) { - log.error("interrupted while joining ioThread", e); + } catch (InterruptedException t) { + firstException.compareAndSet(null, t); + log.error("interrupted while joining ioThread", t); } } - if(this.metrics != null) { + if (this.metrics != null) { try { this.metrics.close(); - } catch(Throwable t) { + } catch (Throwable t) { + firstException.compareAndSet(null, t); log.error("failed to close metrics", t); } } - if(this.keySerializer != null) { + if (this.keySerializer != null) { try { this.keySerializer.close(); - } catch(Throwable t) { + } catch (Throwable t) { + firstException.compareAndSet(null, t); log.error("failed to close keySerializer", t); } } - if(this.valueSerializer != null) { + if (this.valueSerializer != null) { try { this.valueSerializer.close(); - } catch(Throwable t) { + } catch (Throwable t) { + firstException.compareAndSet(null, t); log.error("failed to close valueSerializer", t); } } log.debug("The Kafka producer has closed."); + if (firstException.get() != null && !swallowException) { + throw new KafkaException("Failed to close kafka producer", firstException.get()); + } } private static class FutureFailure implements Future { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 1cd78dd..6244663 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -1,9 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.clients.producer; -import junit.framework.Assert; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.Assert; import org.junit.Test; import java.util.List; @@ -15,7 +32,7 @@ public class KafkaProducerTest { public static class MockMetricsReporter implements MetricsReporter { - private static final AtomicInteger closeCount = new AtomicInteger(0); + private static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0); public MockMetricsReporter() { @@ -33,7 +50,7 @@ public class KafkaProducerTest { @Override public void close() { - closeCount.incrementAndGet(); + CLOSE_COUNT.incrementAndGet(); } @Override @@ -52,10 +69,12 @@ public class KafkaProducerTest { try { KafkaProducer producer = new KafkaProducer( props, new ByteArraySerializer(), new ByteArraySerializer()); - } catch(RuntimeException e) { - Assert.assertEquals(1, MockMetricsReporter.closeCount.get()); - MockMetricsReporter.closeCount.set(0); - Assert.assertEquals("failed to construct kafka producer", e.getMessage()); + } catch (KafkaException e) { + Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get()); + MockMetricsReporter.CLOSE_COUNT.set(0); + Assert.assertEquals("Failed to construct kafka producer", e.getMessage()); + return; } + Assert.fail("should have caught an exception and returned"); } } -- 2.3.2 (Apple Git-55)