From f37a82ef41aefa5866230b012ed3cb85df35e4f7 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Mon, 29 Dec 2014 14:49:05 -0800 Subject: [PATCH] KAFKA-1660: Adding close(timeoutMillis, Timeunit unit) to producer. --- .../org/apache/kafka/clients/producer/KafkaProducer.java | 14 ++++++++++++-- .../org/apache/kafka/clients/producer/MockProducer.java | 5 +++++ .../java/org/apache/kafka/clients/producer/Producer.java | 7 +++++++ .../apache/kafka/clients/producer/internals/Sender.java | 13 ++++++++++++- 4 files changed, 36 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f61efb3..e7b33f5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -364,17 +364,27 @@ public class KafkaProducer implements Producer { */ @Override public void close() { + close(0, TimeUnit.MILLISECONDS); + } + + @Override + public void close(long timeout, TimeUnit timeUnit) { log.trace("Closing the Kafka producer."); this.sender.initiateClose(); try { - this.ioThread.join(); + this.ioThread.join(TimeUnit.MILLISECONDS.toMillis(timeout)); } catch (InterruptedException e) { throw new KafkaException(e); } + + if (this.ioThread.isAlive()) { + this.sender.forceClose(); + } + this.metrics.close(); this.keySerializer.close(); this.valueSerializer.close(); - log.debug("The Kafka producer has closed."); + log.trace("The Kafka producer has closed."); } private static class FutureFailure implements Future { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 34624c3..31d1746 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.Partitioner; @@ -144,6 +145,10 @@ public class MockProducer implements Producer { public void close() { } + @Override + public void close(long timeout, TimeUnit timeUnit) { + } + /** * Get the list of sent records since the last call to {@link #clear()} */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 5baa606..29f14f5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; @@ -62,4 +63,10 @@ public interface Producer extends Closeable { */ public void close(); + /** + * Tries to close the producer cleanly until timeout is expired,force closes the producer after the timeout expires + * discarding any pending messages. + */ + public void close(long timeout, TimeUnit unit); + } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 84a7a07..09f8c32 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -79,6 +79,9 @@ public class Sender implements Runnable { /* true while the sender thread is still running */ private volatile boolean running; + /* true when the caller wants to ignore all unsent/inflight messages and force close. */ + private volatile boolean forceClose; + /* metrics */ private final SenderMetrics sensors; @@ -123,7 +126,7 @@ public class Sender implements Runnable { // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { + while (!this.forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) { try { run(time.milliseconds()); } catch (Exception e) { @@ -199,6 +202,14 @@ public class Sender implements Runnable { this.wakeup(); } + /** + * Closes the sender completing without sending out any pending messages. + */ + public void forceClose(){ + this.forceClose = true; + initiateClose(); + } + private void handleDisconnect(ClientResponse response, long now) { log.trace("Cancelled request {} due to node {} being disconnected", response, response.request().request().destination()); int correlation = response.request().request().header().correlationId(); -- 1.9.3 (Apple Git-50)