From 4353f257a0765b628b13cee42c2f8efa009307a1 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Mon, 29 Dec 2014 14:49:05 -0800 Subject: [PATCH] KAFKA-1660: Adding tryClose(timeoutMillis) to producer. --- .../kafka/clients/producer/KafkaProducer.java | 21 ++++++++++++++++----- .../apache/kafka/clients/producer/MockProducer.java | 4 ++++ .../org/apache/kafka/clients/producer/Producer.java | 5 +++++ 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f61efb3..43b63ea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -364,17 +364,28 @@ public class KafkaProducer implements Producer { */ @Override public void close() { + tryClose(0); + } + + @Override + public void tryClose(long timeoutMillis) { log.trace("Closing the Kafka producer."); this.sender.initiateClose(); try { - this.ioThread.join(); + this.ioThread.join(timeoutMillis); } catch (InterruptedException e) { throw new KafkaException(e); } - this.metrics.close(); - this.keySerializer.close(); - this.valueSerializer.close(); - log.debug("The Kafka producer has closed."); + + if (!this.ioThread.isAlive()) { + this.metrics.close(); + this.keySerializer.close(); + this.valueSerializer.close(); + log.debug("The Kafka producer has closed."); + } else { + log.debug("kafka produce ioThread alive even after timoutMillis " + timeoutMillis + + ", giving up on closing the producer."); + } } private static class FutureFailure implements Future { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 34624c3..1d6a9d7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -144,6 +144,10 @@ public class MockProducer implements Producer { public void close() { } + @Override + public void tryClose(long timeoutMillis) { + } + /** * Get the list of sent records since the last call to {@link #clear()} */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 5baa606..1721661 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -62,4 +62,9 @@ public interface Producer extends Closeable { */ public void close(); + /** + * Tries to close the producer cleanly until timeout is expired, gives up after that. + */ + public void tryClose(long timeoutMillis); + } -- 1.9.3 (Apple Git-50)