From 15a78afdb0de07a84aaa09dfd96f172fbde6aa55 Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 8 Mar 2015 21:01:40 -0700 Subject: [PATCH] Patch for KAFKA-1660 add a close method with timeout to producer. --- .../kafka/clients/producer/KafkaProducer.java | 36 +++++++-- .../kafka/clients/producer/MockProducer.java | 5 ++ .../apache/kafka/clients/producer/Producer.java | 7 ++ .../producer/internals/RecordAccumulator.java | 13 ++++ .../kafka/clients/producer/internals/Sender.java | 27 +++++-- .../kafka/common/serialization/Serializer.java | 4 +- .../producer/internals/RecordAccumulatorTest.java | 26 +++++++ .../integration/kafka/api/ProducerSendTest.scala | 86 +++++++++++++++++++++- 8 files changed, 190 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7397e56..6fa503a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -218,7 +218,7 @@ public class KafkaProducer implements Producer { this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); @@ -527,17 +527,41 @@ public class KafkaProducer implements Producer { /** * Close this producer. This method blocks until all previously sent requests complete. + * This method is equivalent to close(0, TimeUnit.MILLISECONDS). * @throws InterruptException If the thread is interrupted while blocked */ @Override public void close() { - log.trace("Closing the Kafka producer."); + close(0, TimeUnit.MILLISECONDS); + } + + /** + * This method waits up to timeout for the producer to complete previous send requests. + * If producer was not able to finish before timeout, this method will fail the incomplete send requests + * and close the producer forcefully. When InterruptException is thrown, user should retry, otherwise + * there might be metric leak. + * @param timeout The max time to wait for producer complete send requests. + * Wait forever when timeout is 0. + * Does not wait when timeout is negative. + * @param timeUnit The time unit for timeout + * @throws InterruptException If the thread is interrupted while blocked + */ + @Override + public void close(long timeout, TimeUnit timeUnit) { + log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); this.sender.initiateClose(); - try { - this.ioThread.join(); - } catch (InterruptedException e) { - throw new InterruptException(e); + if (timeout >= 0) { + try { + this.ioThread.join(timeUnit.toMillis(timeout)); + } catch (InterruptedException e) { + throw new KafkaException(e); + } } + + if (this.ioThread.isAlive()) { + this.sender.forceClose(); + } + this.metrics.close(); this.keySerializer.close(); this.valueSerializer.close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 6913090..3c34610 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.Partitioner; @@ -146,6 +147,10 @@ public class MockProducer implements Producer { public void close() { } + @Override + public void close(long timeout, TimeUnit timeUnit) { + } + /** * Get the list of sent records since the last call to {@link #clear()} */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 5b3e75e..cbf6e51 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; @@ -67,4 +68,10 @@ public interface Producer extends Closeable { */ public void close(); + /** + * Tries to close the producer cleanly until timeout is expired,force closes the producer after the timeout expires + * discarding any pending messages. + */ + public void close(long timeout, TimeUnit unit); + } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index d5c79e2..1faaad4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.MetricName; @@ -355,6 +356,18 @@ public final class RecordAccumulator { } /** + * This function is only called when sender is closed forcefully. It will fail all the + * incomplete batches and return. + */ + public void failAllIncompleteBatches() { + for (RecordBatch batch : incomplete.all()) { + incomplete.remove(batch); + batch.done(-1L, new InterruptException("Producer is closed forcefully.", new InterruptedException())); + } + this.batches.clear(); + } + + /** * Close this accumulator and force all the record buffers to be drained */ public void close() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index ed9c63a..121bb92 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -83,6 +83,9 @@ public class Sender implements Runnable { /* true while the sender thread is still running */ private volatile boolean running; + /* true when the caller wants to ignore all unsent/inflight messages and force close. */ + private volatile boolean forceClose; + /* metrics */ private final SenderMetrics sensors; @@ -132,12 +135,18 @@ public class Sender implements Runnable { // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { - try { - run(time.milliseconds()); - } catch (Exception e) { - log.error("Uncaught error in kafka producer I/O thread: ", e); + if (!this.forceClose) { + while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { + try { + run(time.milliseconds()); + } catch (Exception e) { + log.error("Uncaught error in kafka producer I/O thread: ", e); + } } + } else { + // We need to fail all the incomplete batches and wake up the threads waiting on + // the futures. + this.accumulator.failAllIncompleteBatches(); } this.client.close(); @@ -209,6 +218,14 @@ public class Sender implements Runnable { } /** + * Closes the sender without sending out any pending messages. + */ + public void forceClose() { + this.forceClose = true; + initiateClose(); + } + + /** * Handle a produce response */ private void handleProduceResponse(ClientResponse response, Map batches, long now) { diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java index c2fdc23..50f8703 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -38,7 +38,9 @@ public interface Serializer { public byte[] serialize(String topic, T data); /** - * Close this serializer + * Close this serializer. + * This method has to be idempotent if the serializer is used in KafkaProducer because it might be called + * multiple times. */ public void close(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index c1bc406..f63a542 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -26,7 +26,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -225,4 +228,27 @@ public class RecordAccumulatorTest { assertFalse(accum.hasUnsent()); } + @Test + public void testFailAllIncomplete() throws Exception { + long lingerMs = Long.MAX_VALUE; + final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + class TestCallback implements Callback { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + assertTrue(exception.getMessage().equals("Producer is closed forcefully.")); + numExceptionReceivedInCallback.incrementAndGet(); + } + } + for (int i = 0; i < 100; i++) + accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, new TestCallback()); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + + accum.failAllIncompleteBatches(); + assertEquals(numExceptionReceivedInCallback.get(), 100); + assertFalse(accum.hasUnsent()); + + } + } diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 3df4507..7dc1655 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -18,6 +18,7 @@ package kafka.api import java.lang.{Integer, IllegalArgumentException} +import java.util.concurrent.TimeUnit import org.apache.kafka.clients.producer._ import org.scalatest.junit.JUnit3Suite @@ -29,7 +30,7 @@ import kafka.utils.{TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.integration.KafkaServerTestHarness -import org.apache.kafka.common.errors.SerializationException +import org.apache.kafka.common.errors.{InterruptException, SerializationException} import java.util.Properties import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.serialization.ByteArraySerializer @@ -322,6 +323,87 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer.close() } } - + + /** + * Test close with timeout + */ + @Test + def testCloseWithTimeout() { + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val leader0 = leaders(0) + val leader1 = leaders(1) + + // create record + val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes) + + // Test closing from caller thread. + for(i <- 0 until 50) { + producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + val responses = (0 until numRecords) map (i => producer.send(record0)) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + producer.close(-1, TimeUnit.MILLISECONDS) + responses.foreach { future => + try { + future.get() + // No message should be sent successfully + assertTrue(false); + } catch { + case e: Exception => + assertEquals("org.apache.kafka.common.errors.InterruptException: Producer is closed forcefully.", e.getMessage) + } + } + val fetchResponse = if (leader0.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) + } + + // Test closing from sender thread. + class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback { + override def onCompletion(metadata: RecordMetadata, exception: Exception) { + // Trigger another batch in accumulator before close the producer. These messages should + // not be sent. + (0 until numRecords) map (i => producer.send(record1)) + // The close call will be called by all the message callbacks. This is to test idempotent. + producer.close(-1, TimeUnit.MILLISECONDS) + } + } + for(i <- 1 until 51) { + producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + // send message to partition 0 + var responses = (0 until numRecords) map (i => producer.send(record0)) + // send message to partition 1 + responses ++= ((0 until numRecords) map (i => producer.send(record1, new CloseCallback(producer)))) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + // flush the messages. + producer.flush() + assertTrue("All request are complete.", responses.forall(_.isDone())) + // Check the messages received by broker. + val fetchResponse0 = if (leader0.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + val fetchResponse1 = if (leader1.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + } + assertEquals("Fetch response to partition 0 should have 100 message returned for each iteration.", + i * numRecords, fetchResponse0.messageSet(topic, 0).size) + assertEquals("Fetch response to partition 1 should have 100 message returned for each iteration.", + i * numRecords, fetchResponse1.messageSet(topic, 1).size) + } + } finally { + if (producer != null) + producer.close() + } + } } -- 1.8.3.4 (Apple Git-47)