From e57e9edfcb1be1858cff1d710c64138922d6f477 Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 8 Mar 2015 21:01:40 -0700 Subject: [PATCH 1/5] Patch for KAFKA-1660 add a close method with timeout to producer. --- .../kafka/clients/producer/KafkaProducer.java | 36 +++++++-- .../kafka/clients/producer/MockProducer.java | 5 ++ .../apache/kafka/clients/producer/Producer.java | 7 ++ .../producer/internals/RecordAccumulator.java | 13 ++++ .../kafka/clients/producer/internals/Sender.java | 27 +++++-- .../kafka/common/serialization/Serializer.java | 4 +- .../producer/internals/RecordAccumulatorTest.java | 26 +++++++ .../integration/kafka/api/ProducerSendTest.scala | 86 +++++++++++++++++++++- 8 files changed, 190 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index ab26342..c64f6f8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -196,7 +196,7 @@ public class KafkaProducer implements Producer { this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); @@ -508,17 +508,41 @@ public class KafkaProducer implements Producer { /** * Close this producer. This method blocks until all previously sent requests complete. + * This method is equivalent to close(0, TimeUnit.MILLISECONDS). * @throws InterruptException If the thread is interrupted while blocked */ @Override public void close() { - log.trace("Closing the Kafka producer."); + close(0, TimeUnit.MILLISECONDS); + } + + /** + * This method waits up to timeout for the producer to complete previous send requests. + * If producer was not able to finish before timeout, this method will fail the incomplete send requests + * and close the producer forcefully. When InterruptException is thrown, user should retry, otherwise + * there might be metric leak. + * @param timeout The max time to wait for producer complete send requests. + * Wait forever when timeout is 0. + * Does not wait when timeout is negative. + * @param timeUnit The time unit for timeout + * @throws InterruptException If the thread is interrupted while blocked + */ + @Override + public void close(long timeout, TimeUnit timeUnit) { + log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); this.sender.initiateClose(); - try { - this.ioThread.join(); - } catch (InterruptedException e) { - throw new InterruptException(e); + if (timeout >= 0) { + try { + this.ioThread.join(timeUnit.toMillis(timeout)); + } catch (InterruptedException e) { + throw new KafkaException(e); + } } + + if (this.ioThread.isAlive()) { + this.sender.forceClose(); + } + this.metrics.close(); this.keySerializer.close(); this.valueSerializer.close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 6913090..3c34610 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.Partitioner; @@ -146,6 +147,10 @@ public class MockProducer implements Producer { public void close() { } + @Override + public void close(long timeout, TimeUnit timeUnit) { + } + /** * Get the list of sent records since the last call to {@link #clear()} */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 5b3e75e..cbf6e51 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; @@ -67,4 +68,10 @@ public interface Producer extends Closeable { */ public void close(); + /** + * Tries to close the producer cleanly until timeout is expired,force closes the producer after the timeout expires + * discarding any pending messages. + */ + public void close(long timeout, TimeUnit unit); + } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 88b4e4f..b42dc4e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -355,6 +356,18 @@ public final class RecordAccumulator { } /** + * This function is only called when sender is closed forcefully. It will fail all the + * incomplete batches and return. + */ + public void failAllIncompleteBatches() { + for (RecordBatch batch : incomplete.all()) { + incomplete.remove(batch); + batch.done(-1L, new InterruptException("Producer is closed forcefully.", new InterruptedException())); + } + this.batches.clear(); + } + + /** * Close this accumulator and force all the record buffers to be drained */ public void close() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 70954ca..f6d5b28 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -83,6 +83,9 @@ public class Sender implements Runnable { /* true while the sender thread is still running */ private volatile boolean running; + /* true when the caller wants to ignore all unsent/inflight messages and force close. */ + private volatile boolean forceClose; + /* metrics */ private final SenderMetrics sensors; @@ -132,12 +135,18 @@ public class Sender implements Runnable { // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { - try { - run(time.milliseconds()); - } catch (Exception e) { - log.error("Uncaught error in kafka producer I/O thread: ", e); + if (!this.forceClose) { + while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { + try { + run(time.milliseconds()); + } catch (Exception e) { + log.error("Uncaught error in kafka producer I/O thread: ", e); + } } + } else { + // We need to fail all the incomplete batches and wake up the threads waiting on + // the futures. + this.accumulator.failAllIncompleteBatches(); } this.client.close(); @@ -209,6 +218,14 @@ public class Sender implements Runnable { } /** + * Closes the sender without sending out any pending messages. + */ + public void forceClose() { + this.forceClose = true; + initiateClose(); + } + + /** * Handle a produce response */ private void handleProduceResponse(ClientResponse response, Map batches, long now) { diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java index c2fdc23..50f8703 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -38,7 +38,9 @@ public interface Serializer { public byte[] serialize(String topic, T data); /** - * Close this serializer + * Close this serializer. + * This method has to be idempotent if the serializer is used in KafkaProducer because it might be called + * multiple times. */ public void close(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index e379ac8..8d12e55 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -26,7 +26,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -227,4 +230,27 @@ public class RecordAccumulatorTest { assertFalse(accum.hasUnsent()); } + @Test + public void testFailAllIncomplete() throws Exception { + long lingerMs = Long.MAX_VALUE; + final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + class TestCallback implements Callback { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + assertTrue(exception.getMessage().equals("Producer is closed forcefully.")); + numExceptionReceivedInCallback.incrementAndGet(); + } + } + for (int i = 0; i < 100; i++) + accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, new TestCallback()); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + + accum.failAllIncompleteBatches(); + assertEquals(numExceptionReceivedInCallback.get(), 100); + assertFalse(accum.hasUnsent()); + + } + } diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 3df4507..7dc1655 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -18,6 +18,7 @@ package kafka.api import java.lang.{Integer, IllegalArgumentException} +import java.util.concurrent.TimeUnit import org.apache.kafka.clients.producer._ import org.scalatest.junit.JUnit3Suite @@ -29,7 +30,7 @@ import kafka.utils.{TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.integration.KafkaServerTestHarness -import org.apache.kafka.common.errors.SerializationException +import org.apache.kafka.common.errors.{InterruptException, SerializationException} import java.util.Properties import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.serialization.ByteArraySerializer @@ -322,6 +323,87 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer.close() } } - + + /** + * Test close with timeout + */ + @Test + def testCloseWithTimeout() { + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val leader0 = leaders(0) + val leader1 = leaders(1) + + // create record + val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes) + + // Test closing from caller thread. + for(i <- 0 until 50) { + producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + val responses = (0 until numRecords) map (i => producer.send(record0)) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + producer.close(-1, TimeUnit.MILLISECONDS) + responses.foreach { future => + try { + future.get() + // No message should be sent successfully + assertTrue(false); + } catch { + case e: Exception => + assertEquals("org.apache.kafka.common.errors.InterruptException: Producer is closed forcefully.", e.getMessage) + } + } + val fetchResponse = if (leader0.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) + } + + // Test closing from sender thread. + class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback { + override def onCompletion(metadata: RecordMetadata, exception: Exception) { + // Trigger another batch in accumulator before close the producer. These messages should + // not be sent. + (0 until numRecords) map (i => producer.send(record1)) + // The close call will be called by all the message callbacks. This is to test idempotent. + producer.close(-1, TimeUnit.MILLISECONDS) + } + } + for(i <- 1 until 51) { + producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + // send message to partition 0 + var responses = (0 until numRecords) map (i => producer.send(record0)) + // send message to partition 1 + responses ++= ((0 until numRecords) map (i => producer.send(record1, new CloseCallback(producer)))) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + // flush the messages. + producer.flush() + assertTrue("All request are complete.", responses.forall(_.isDone())) + // Check the messages received by broker. + val fetchResponse0 = if (leader0.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + val fetchResponse1 = if (leader1.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + } + assertEquals("Fetch response to partition 0 should have 100 message returned for each iteration.", + i * numRecords, fetchResponse0.messageSet(topic, 0).size) + assertEquals("Fetch response to partition 1 should have 100 message returned for each iteration.", + i * numRecords, fetchResponse1.messageSet(topic, 1).size) + } + } finally { + if (producer != null) + producer.close() + } + } } -- 1.8.3.4 (Apple Git-47) From 3344cdb3c8bc9181ab293ad89877d28e29d9c0da Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 8 Mar 2015 21:14:33 -0700 Subject: [PATCH 2/5] A minor fix. --- .../apache/kafka/clients/producer/internals/Sender.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index f6d5b28..5f85309 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -135,15 +135,14 @@ public class Sender implements Runnable { // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - if (!this.forceClose) { - while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { - try { - run(time.milliseconds()); - } catch (Exception e) { - log.error("Uncaught error in kafka producer I/O thread: ", e); - } + while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) { + try { + run(time.milliseconds()); + } catch (Exception e) { + log.error("Uncaught error in kafka producer I/O thread: ", e); } - } else { + } + if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. this.accumulator.failAllIncompleteBatches(); -- 1.8.3.4 (Apple Git-47) From 721ae17c5d6bfe92d3c8dd4d45ebe09d2eae834e Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 9 Mar 2015 12:56:15 -0700 Subject: [PATCH 3/5] Incorporated Guozhang's comments. --- .../kafka/clients/producer/KafkaProducer.java | 4 +- .../apache/kafka/clients/producer/Producer.java | 2 +- .../producer/internals/RecordAccumulator.java | 4 +- .../kafka/clients/producer/internals/Sender.java | 2 +- .../kafka/common/errors/InterruptException.java | 5 +++ .../producer/internals/RecordAccumulatorTest.java | 21 ++++----- .../integration/kafka/api/ProducerSendTest.scala | 52 +++++++++++++++------- 7 files changed, 57 insertions(+), 33 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index c64f6f8..0266bcd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -530,12 +530,12 @@ public class KafkaProducer implements Producer { @Override public void close(long timeout, TimeUnit timeUnit) { log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); - this.sender.initiateClose(); if (timeout >= 0) { + this.sender.initiateClose(); try { this.ioThread.join(timeUnit.toMillis(timeout)); } catch (InterruptedException e) { - throw new KafkaException(e); + throw new InterruptException(e); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index cbf6e51..7a1deea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -69,7 +69,7 @@ public interface Producer extends Closeable { public void close(); /** - * Tries to close the producer cleanly until timeout is expired,force closes the producer after the timeout expires + * Tries to close the producer cleanly until timeout is expired, force closes the producer after the timeout expires * discarding any pending messages. */ public void close(long timeout, TimeUnit unit); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index b42dc4e..ada686c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -359,10 +359,10 @@ public final class RecordAccumulator { * This function is only called when sender is closed forcefully. It will fail all the * incomplete batches and return. */ - public void failAllIncompleteBatches() { + public void abortIncompleteBatches() { for (RecordBatch batch : incomplete.all()) { incomplete.remove(batch); - batch.done(-1L, new InterruptException("Producer is closed forcefully.", new InterruptedException())); + batch.done(-1L, new InterruptException("Producer is closed forcefully.")); } this.batches.clear(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 5f85309..9a75b1a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -145,7 +145,7 @@ public class Sender implements Runnable { if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. - this.accumulator.failAllIncompleteBatches(); + this.accumulator.abortIncompleteBatches(); } this.client.close(); diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java index fee322f..3680f1b 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java @@ -31,4 +31,9 @@ public class InterruptException extends KafkaException { Thread.currentThread().interrupt(); } + public InterruptException(String message) { + super(message, new InterruptedException()); + Thread.currentThread().interrupt(); + } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 8d12e55..09c93e9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -63,11 +63,12 @@ public class RecordAccumulatorTest { private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); + RecordAccumulator accum = null; @Test public void testFull() throws Exception { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp1, key, value, CompressionType.NONE, null); @@ -91,7 +92,7 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -99,7 +100,7 @@ public class RecordAccumulatorTest { @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); @@ -117,7 +118,7 @@ public class RecordAccumulatorTest { @Test public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize + 1; List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { @@ -136,7 +137,7 @@ public class RecordAccumulatorTest { final int numThreads = 5; final int msgs = 10000; final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags); + accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { @@ -177,7 +178,7 @@ public class RecordAccumulatorTest { public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); // Just short of going over the limit so we trigger linger time int appends = 1024 / msgSize; @@ -210,7 +211,7 @@ public class RecordAccumulatorTest { @Test public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); @@ -231,10 +232,10 @@ public class RecordAccumulatorTest { } @Test - public void testFailAllIncomplete() throws Exception { + public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -247,7 +248,7 @@ public class RecordAccumulatorTest { RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); - accum.failAllIncompleteBatches(); + accum.abortIncompleteBatches(); assertEquals(numExceptionReceivedInCallback.get(), 100); assertFalse(accum.hasUnsent()); diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 7dc1655..3e22e67 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,23 +17,21 @@ package kafka.api -import java.lang.{Integer, IllegalArgumentException} +import java.util.Properties import java.util.concurrent.TimeUnit -import org.apache.kafka.clients.producer._ -import org.scalatest.junit.JUnit3Suite -import org.junit.Test -import org.junit.Assert._ - -import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer -import kafka.message.Message import kafka.integration.KafkaServerTestHarness -import org.apache.kafka.common.errors.{InterruptException, SerializationException} -import java.util.Properties +import kafka.message.Message +import kafka.server.KafkaConfig +import kafka.utils.{TestUtils, TestZKUtils} +import org.apache.kafka.clients.producer._ import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.errors.SerializationException import org.apache.kafka.common.serialization.ByteArraySerializer +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnit3Suite class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { @@ -325,10 +323,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with timeout + * Test close with timeout from caller thread */ @Test - def testCloseWithTimeout() { + def testCloseWithTimeoutFromCallerThread() { var producer: KafkaProducer[Array[Byte],Array[Byte]] = null try { // create topic @@ -363,6 +361,27 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) } + } finally { + if (producer != null) + producer.close() + } + } + + /** + * Test close with timeout from sender thread + */ + @Test + def testCloseWithTimeoutFromSenderThread() { + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val leader0 = leaders(0) + val leader1 = leaders(1) + + // create record + val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes) // Test closing from sender thread. class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback { @@ -374,7 +393,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer.close(-1, TimeUnit.MILLISECONDS) } } - for(i <- 1 until 51) { + for(i <- 0 until 50) { producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) // send message to partition 0 var responses = (0 until numRecords) map (i => producer.send(record0)) @@ -396,14 +415,13 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) } assertEquals("Fetch response to partition 0 should have 100 message returned for each iteration.", - i * numRecords, fetchResponse0.messageSet(topic, 0).size) + (i + 1) * numRecords, fetchResponse0.messageSet(topic, 0).size) assertEquals("Fetch response to partition 1 should have 100 message returned for each iteration.", - i * numRecords, fetchResponse1.messageSet(topic, 1).size) + (i + 1) * numRecords, fetchResponse1.messageSet(topic, 1).size) } } finally { if (producer != null) producer.close() } } - } -- 1.8.3.4 (Apple Git-47) From d551675d21f6bf740140be892460ebc063f8f5dc Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 24 Mar 2015 13:32:51 -0700 Subject: [PATCH 4/5] Modify according to the latest conclusion. --- .../kafka/clients/producer/KafkaProducer.java | 38 ++++++++++++++++------ 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 0266bcd..720a0f7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -513,34 +513,52 @@ public class KafkaProducer implements Producer { */ @Override public void close() { - close(0, TimeUnit.MILLISECONDS); + close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } /** * This method waits up to timeout for the producer to complete previous send requests. * If producer was not able to finish before timeout, this method will fail the incomplete send requests - * and close the producer forcefully. When InterruptException is thrown, user should retry, otherwise - * there might be metric leak. - * @param timeout The max time to wait for producer complete send requests. - * Wait forever when timeout is 0. - * Does not wait when timeout is negative. + * and close the producer forcefully. If timeout > 0, this method is a blocking call and will try to join + * the sender thread before it returns. If timeout = 0, this method is a non-blocking call and does not join + * the sender thread. If this method is invoked from callback with timeout > 0, a warning message will be logged + * and the producer will block forever. + * When InterruptException is thrown, user should retry, otherwise there might be metric leak. + * @param timeout The max time to wait for producer complete send requests. The value should be non negative. + * Does not wait when timeout is 0. * @param timeUnit The time unit for timeout * @throws InterruptException If the thread is interrupted while blocked + * @throws IllegalArgumentException If the timeout is negative. */ @Override public void close(long timeout, TimeUnit timeUnit) { log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); - if (timeout >= 0) { + // For timeout > 0, we ensure the sender thread is joined. + if (timeout > 0) { + if (Thread.currentThread() == this.ioThread) + log.warn("You see this warning because you are calling close(timeout, TimeUnit) from callback with timeout > 0." + + "Only close(0, TimeUnit) can be invoked from callback. Otherwise the producer will block on close forever."); this.sender.initiateClose(); try { this.ioThread.join(timeUnit.toMillis(timeout)); } catch (InterruptedException e) { throw new InterruptException(e); } - } - if (this.ioThread.isAlive()) { - this.sender.forceClose(); + // If elegant close timeout, we issue a force close. + if (this.ioThread.isAlive()) { + this.sender.forceClose(); + try { + this.ioThread.join(); + } catch (InterruptedException e) { + throw new InterruptException(e); + } + } + } else if (timeout == 0) { + if (this.ioThread.isAlive()) + this.sender.forceClose(); + } else { + throw new IllegalArgumentException("The timeout cannot be negative."); } this.metrics.close(); -- 1.8.3.4 (Apple Git-47) From dfa64d1d909bc8261d737f9de989815e8a95455b Mon Sep 17 00:00:00 2001 From: jqin Date: Fri, 27 Mar 2015 16:34:06 -0700 Subject: [PATCH 5/5] Patch for the finally passed KIP-15git status --- .../kafka/clients/producer/KafkaProducer.java | 55 +++++++++++++--------- .../kafka/clients/producer/internals/Sender.java | 1 - .../integration/kafka/api/ProducerSendTest.scala | 17 +++---- 3 files changed, 42 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 720a0f7..c3575a1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -508,7 +508,10 @@ public class KafkaProducer implements Producer { /** * Close this producer. This method blocks until all previously sent requests complete. - * This method is equivalent to close(0, TimeUnit.MILLISECONDS). + * This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS). + * [NOTE] If close() is called from callback, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) + * will be called instead. We are doing this because otherwise the sender thread will try to joint itself and + * block forever. * @throws InterruptException If the thread is interrupted while blocked */ @Override @@ -519,10 +522,14 @@ public class KafkaProducer implements Producer { /** * This method waits up to timeout for the producer to complete previous send requests. * If producer was not able to finish before timeout, this method will fail the incomplete send requests - * and close the producer forcefully. If timeout > 0, this method is a blocking call and will try to join - * the sender thread before it returns. If timeout = 0, this method is a non-blocking call and does not join - * the sender thread. If this method is invoked from callback with timeout > 0, a warning message will be logged - * and the producer will block forever. + * and close the producer forcefully. + * If timeout > 0, this method is a blocking call and will try to join the sender thread before it returns. + * If timeout = 0, this method is a non-blocking call and does not join the sender thread. + *

+ * If this method is invoked from callback with timeout > 0, a warning message will be logged and + * close(0, TimeUnit.MILLISECONDS) will be called instead. We are doing this because otherwise the sender + * thread will try to join itself and block forever. + *

* When InterruptException is thrown, user should retry, otherwise there might be metric leak. * @param timeout The max time to wait for producer complete send requests. The value should be non negative. * Does not wait when timeout is 0. @@ -532,33 +539,37 @@ public class KafkaProducer implements Producer { */ @Override public void close(long timeout, TimeUnit timeUnit) { + if (timeout < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); + log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); - // For timeout > 0, we ensure the sender thread is joined. + boolean invokedFromCallback = Thread.currentThread() == this.ioThread; + // Try to close elegantly. if (timeout > 0) { - if (Thread.currentThread() == this.ioThread) - log.warn("You see this warning because you are calling close(timeout, TimeUnit) from callback with timeout > 0." + - "Only close(0, TimeUnit) can be invoked from callback. Otherwise the producer will block on close forever."); - this.sender.initiateClose(); - try { - this.ioThread.join(timeUnit.toMillis(timeout)); - } catch (InterruptedException e) { - throw new InterruptException(e); + if (invokedFromCallback) { + log.warn("You see this warning because you are calling close(timeout, TimeUnit) from callback with timeout > 0. " + + "close(0, TimeUnit.MILLISECONDS) will be invoked instead to avoid deadlock."); + } else { + this.sender.initiateClose(); + try { + this.ioThread.join(timeUnit.toMillis(timeout)); + } catch (InterruptedException e) { + throw new InterruptException(e); + } } + } - // If elegant close timeout, we issue a force close. - if (this.ioThread.isAlive()) { - this.sender.forceClose(); + // Initiate a force close if sender thread is still alive. + if (this.ioThread.isAlive()) { + this.sender.forceClose(); + // Only join the sender thread when not calling from callback. + if (!invokedFromCallback) { try { this.ioThread.join(); } catch (InterruptedException e) { throw new InterruptException(e); } } - } else if (timeout == 0) { - if (this.ioThread.isAlive()) - this.sender.forceClose(); - } else { - throw new IllegalArgumentException("The timeout cannot be negative."); } this.metrics.close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9a75b1a..0dee33e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -186,7 +186,6 @@ public class Sender implements Runnable { now); sensors.updateProduceRequestMetrics(batches); List requests = createProduceRequests(batches, now); - // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 3e22e67..055ba46 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -323,10 +323,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with timeout from caller thread + * Test close with zero timeout from caller thread */ @Test - def testCloseWithTimeoutFromCallerThread() { + def testCloseWithZeroTimeoutFromCallerThread() { var producer: KafkaProducer[Array[Byte],Array[Byte]] = null try { // create topic @@ -343,12 +343,11 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) val responses = (0 until numRecords) map (i => producer.send(record0)) assertTrue("No request is complete.", responses.forall(!_.isDone())) - producer.close(-1, TimeUnit.MILLISECONDS) + producer.close(0, TimeUnit.MILLISECONDS) responses.foreach { future => try { future.get() - // No message should be sent successfully - assertTrue(false); + fail("No message should be sent successfully.") } catch { case e: Exception => assertEquals("org.apache.kafka.common.errors.InterruptException: Producer is closed forcefully.", e.getMessage) @@ -368,10 +367,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with timeout from sender thread + * Test close with zero timeout from sender thread */ @Test - def testCloseWithTimeoutFromSenderThread() { + def testCloseWithZeroTimeoutFromSenderThread() { var producer: KafkaProducer[Array[Byte],Array[Byte]] = null try { // create topic @@ -390,7 +389,9 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { // not be sent. (0 until numRecords) map (i => producer.send(record1)) // The close call will be called by all the message callbacks. This is to test idempotent. - producer.close(-1, TimeUnit.MILLISECONDS) + producer.close(0, TimeUnit.MILLISECONDS) + // Test close with non zero timeout. Should not block at all. + producer.close(Long.MaxValue, TimeUnit.MICROSECONDS) } } for(i <- 0 until 50) { -- 1.8.3.4 (Apple Git-47)