From 5b1b0bd088258b4761b8c31e79eb1bea7f9b66db Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 8 Mar 2015 21:01:40 -0700 Subject: [PATCH 01/14] Patch for KAFKA-1660 add a close method with timeout to producer. --- .../kafka/clients/producer/KafkaProducer.java | 35 +++++++-- .../kafka/clients/producer/MockProducer.java | 5 ++ .../apache/kafka/clients/producer/Producer.java | 7 ++ .../producer/internals/RecordAccumulator.java | 13 ++++ .../kafka/clients/producer/internals/Sender.java | 27 +++++-- .../kafka/common/serialization/Serializer.java | 6 ++ .../producer/internals/RecordAccumulatorTest.java | 26 +++++++ .../integration/kafka/api/ProducerSendTest.scala | 90 +++++++++++++++++++++- 8 files changed, 195 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 42b1292..f4ae106 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -271,7 +271,7 @@ public class KafkaProducer implements Producer { } catch (Throwable t) { // call close methods if internal objects are already constructed // this is to prevent resource leak. see KAFKA-2121 - close(true); + close(0, TimeUnit.MILLISECONDS, true); // now propagate the exception throw new KafkaException("Failed to construct kafka producer", t); } @@ -518,17 +518,35 @@ public class KafkaProducer implements Producer { /** * Close this producer. This method blocks until all previously sent requests complete. + * This method is equivalent to close(0, TimeUnit.MILLISECONDS). * @throws InterruptException If the thread is interrupted while blocked */ @Override public void close() { - close(false); + close(0, TimeUnit.MILLISECONDS); } - private void close(boolean swallowException) { - log.trace("Closing the Kafka producer."); + /** + * This method waits up to timeout for the producer to complete previous send requests. + * If producer was not able to finish before timeout, this method will fail the incomplete send requests + * and close the producer forcefully. When InterruptException is thrown, user should retry, otherwise + * there might be metric leak. + * @param timeout The max time to wait for producer complete send requests. + * Wait forever when timeout is 0. + * Does not wait when timeout is negative. + * @param timeUnit The time unit for timeout + * @throws InterruptException If the thread is interrupted while blocked + */ + @Override + public void close(long timeout, TimeUnit timeUnit) { + close(timeout, timeUnit, false); + } + + private void close(long timeout, TimeUnit timeUnit, boolean swallowException) { + log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); // this will keep track of the first encountered exception AtomicReference firstException = new AtomicReference(); + if (this.sender != null) { try { this.sender.initiateClose(); @@ -537,14 +555,19 @@ public class KafkaProducer implements Producer { log.error("Failed to close sender", t); } } - if (this.ioThread != null) { + if (this.ioThread != null && timeout >= 0) { try { - this.ioThread.join(); + this.ioThread.join(timeUnit.toMillis(timeout)); } catch (InterruptedException t) { firstException.compareAndSet(null, t); log.error("Interrupted while joining ioThread", t); } } + + if (this.ioThread != null && this.ioThread.isAlive()) { + this.sender.forceClose(); + } + ClientUtils.closeQuietly(metrics, "producer metrics", firstException); ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException); ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 6913090..3c34610 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.Partitioner; @@ -146,6 +147,10 @@ public class MockProducer implements Producer { public void close() { } + @Override + public void close(long timeout, TimeUnit timeUnit) { + } + /** * Get the list of sent records since the last call to {@link #clear()} */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 5b3e75e..cbf6e51 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; @@ -67,4 +68,10 @@ public interface Producer extends Closeable { */ public void close(); + /** + * Tries to close the producer cleanly until timeout is expired,force closes the producer after the timeout expires + * discarding any pending messages. + */ + public void close(long timeout, TimeUnit unit); + } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 49a9883..649a191 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -362,6 +363,18 @@ public final class RecordAccumulator { } /** + * This function is only called when sender is closed forcefully. It will fail all the + * incomplete batches and return. + */ + public void failAllIncompleteBatches() { + for (RecordBatch batch : incomplete.all()) { + incomplete.remove(batch); + batch.done(-1L, new InterruptException("Producer is closed forcefully.", new InterruptedException())); + } + this.batches.clear(); + } + + /** * Close this accumulator and force all the record buffers to be drained */ public void close() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index b2db91c..64732db 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -83,6 +83,9 @@ public class Sender implements Runnable { /* true while the sender thread is still running */ private volatile boolean running; + /* true when the caller wants to ignore all unsent/inflight messages and force close. */ + private volatile boolean forceClose; + /* metrics */ private final SenderMetrics sensors; @@ -132,12 +135,18 @@ public class Sender implements Runnable { // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { - try { - run(time.milliseconds()); - } catch (Exception e) { - log.error("Uncaught error in kafka producer I/O thread: ", e); + if (!this.forceClose) { + while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { + try { + run(time.milliseconds()); + } catch (Exception e) { + log.error("Uncaught error in kafka producer I/O thread: ", e); + } } + } else { + // We need to fail all the incomplete batches and wake up the threads waiting on + // the futures. + this.accumulator.failAllIncompleteBatches(); } try { this.client.close(); @@ -212,6 +221,14 @@ public class Sender implements Runnable { } /** + * Closes the sender without sending out any pending messages. + */ + public void forceClose() { + this.forceClose = true; + initiateClose(); + } + + /** * Handle a produce response */ private void handleProduceResponse(ClientResponse response, Map batches, long now) { diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java index c440540..01e9a22 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -38,4 +38,10 @@ public interface Serializer extends Closeable { */ public byte[] serialize(String topic, T data); + /** + * Close this serializer. + * This method has to be idempotent if the serializer is used in KafkaProducer because it might be called + * multiple times. + */ + public void close(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index baa48e7..bda4513 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -26,7 +26,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -265,4 +268,27 @@ public class RecordAccumulatorTest { assertFalse(accum.hasUnsent()); } + @Test + public void testFailAllIncomplete() throws Exception { + long lingerMs = Long.MAX_VALUE; + final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + class TestCallback implements Callback { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + assertTrue(exception.getMessage().equals("Producer is closed forcefully.")); + numExceptionReceivedInCallback.incrementAndGet(); + } + } + for (int i = 0; i < 100; i++) + accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, new TestCallback()); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + + accum.failAllIncompleteBatches(); + assertEquals(numExceptionReceivedInCallback.get(), 100); + assertFalse(accum.hasUnsent()); + + } + } diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 9811a2b..a60d09e 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,6 +17,10 @@ package kafka.api +import java.lang.{Integer, IllegalArgumentException} +import java.util.concurrent.TimeUnit +import java.util.Properties + import org.apache.kafka.clients.producer._ import org.scalatest.junit.JUnit3Suite import org.junit.Test @@ -27,8 +31,7 @@ import kafka.utils.TestUtils import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.integration.KafkaServerTestHarness -import org.apache.kafka.common.errors.SerializationException -import java.util.Properties +import org.apache.kafka.common.errors.{InterruptException, SerializationException} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.serialization.ByteArraySerializer @@ -318,6 +321,87 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer.close() } } - + + /** + * Test close with timeout + */ + @Test + def testCloseWithTimeout() { + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val leader0 = leaders(0) + val leader1 = leaders(1) + + // create record + val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes) + + // Test closing from caller thread. + for(i <- 0 until 50) { + producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + val responses = (0 until numRecords) map (i => producer.send(record0)) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + producer.close(-1, TimeUnit.MILLISECONDS) + responses.foreach { future => + try { + future.get() + // No message should be sent successfully + assertTrue(false); + } catch { + case e: Exception => + assertEquals("org.apache.kafka.common.errors.InterruptException: Producer is closed forcefully.", e.getMessage) + } + } + val fetchResponse = if (leader0.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) + } + + // Test closing from sender thread. + class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback { + override def onCompletion(metadata: RecordMetadata, exception: Exception) { + // Trigger another batch in accumulator before close the producer. These messages should + // not be sent. + (0 until numRecords) map (i => producer.send(record1)) + // The close call will be called by all the message callbacks. This is to test idempotent. + producer.close(-1, TimeUnit.MILLISECONDS) + } + } + for(i <- 1 until 51) { + producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + // send message to partition 0 + var responses = (0 until numRecords) map (i => producer.send(record0)) + // send message to partition 1 + responses ++= ((0 until numRecords) map (i => producer.send(record1, new CloseCallback(producer)))) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + // flush the messages. + producer.flush() + assertTrue("All request are complete.", responses.forall(_.isDone())) + // Check the messages received by broker. + val fetchResponse0 = if (leader0.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + val fetchResponse1 = if (leader1.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + } + assertEquals("Fetch response to partition 0 should have 100 message returned for each iteration.", + i * numRecords, fetchResponse0.messageSet(topic, 0).size) + assertEquals("Fetch response to partition 1 should have 100 message returned for each iteration.", + i * numRecords, fetchResponse1.messageSet(topic, 1).size) + } + } finally { + if (producer != null) + producer.close() + } + } } -- 1.8.3.4 (Apple Git-47) From e904087ccfe7a1900c066a72f2f36e93780786a6 Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 8 Mar 2015 21:14:33 -0700 Subject: [PATCH 02/14] A minor fix. --- .../apache/kafka/clients/producer/internals/Sender.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 64732db..a430e52 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -135,15 +135,14 @@ public class Sender implements Runnable { // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - if (!this.forceClose) { - while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { - try { - run(time.milliseconds()); - } catch (Exception e) { - log.error("Uncaught error in kafka producer I/O thread: ", e); - } + while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) { + try { + run(time.milliseconds()); + } catch (Exception e) { + log.error("Uncaught error in kafka producer I/O thread: ", e); } - } else { + } + if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. this.accumulator.failAllIncompleteBatches(); -- 1.8.3.4 (Apple Git-47) From 0aa0a5226ed95177baff4677f06731a3bef2854d Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 9 Mar 2015 12:56:15 -0700 Subject: [PATCH 03/14] Incorporated Guozhang's comments. --- .../kafka/clients/producer/KafkaProducer.java | 60 ++++++++++++---------- .../apache/kafka/clients/producer/Producer.java | 2 +- .../producer/internals/RecordAccumulator.java | 4 +- .../kafka/clients/producer/internals/Sender.java | 2 +- .../kafka/common/errors/InterruptException.java | 5 ++ .../producer/internals/RecordAccumulatorTest.java | 15 +++--- .../integration/kafka/api/ProducerSendTest.scala | 45 ++++++++++++---- 7 files changed, 86 insertions(+), 47 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f4ae106..18abeea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -527,15 +527,18 @@ public class KafkaProducer implements Producer { } /** - * This method waits up to timeout for the producer to complete previous send requests. - * If producer was not able to finish before timeout, this method will fail the incomplete send requests - * and close the producer forcefully. When InterruptException is thrown, user should retry, otherwise - * there might be metric leak. - * @param timeout The max time to wait for producer complete send requests. - * Wait forever when timeout is 0. - * Does not wait when timeout is negative. - * @param timeUnit The time unit for timeout + * This method waits up to timeout for the producer to complete the sending of all incomplete requests. + *

+ * If the producer is unable to complete all requests before the timeout expires, this method will fail + * any unsent and unacknowledged records immediately. + *

+ * If invoked from within a {@link Callback} this method will not block and will be equivalent to close(0, TimeUnit.MILLISECONDS). This is done since no further sending will happen while blocking the I/O thread of the producer. + * + * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be + * non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete. + * @param timeUnit The time unit for the timeout * @throws InterruptException If the thread is interrupted while blocked + * @throws IllegalArgumentException If the timeout is negative. */ @Override public void close(long timeout, TimeUnit timeUnit) { @@ -543,38 +546,41 @@ public class KafkaProducer implements Producer { } private void close(long timeout, TimeUnit timeUnit, boolean swallowException) { - log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); + if (timeout < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); + + log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); // this will keep track of the first encountered exception AtomicReference firstException = new AtomicReference(); + boolean invokedFromCallback = Thread.currentThread() == this.ioThread; - if (this.sender != null) { - try { - this.sender.initiateClose(); - } catch (Throwable t) { - firstException.compareAndSet(null, t); - log.error("Failed to close sender", t); - } - } - if (this.ioThread != null && timeout >= 0) { - try { - this.ioThread.join(timeUnit.toMillis(timeout)); - } catch (InterruptedException t) { - firstException.compareAndSet(null, t); - log.error("Interrupted while joining ioThread", t); + if (timeout > 0) { + if (invokedFromCallback) { + log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " + + "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout); + } else { + if (this.sender != null) + this.sender.initiateClose(); + if (this.ioThread != null) { + try { + this.ioThread.join(timeUnit.toMillis(timeout)); + } catch (InterruptedException t) { + firstException.compareAndSet(null, t); + log.error("Interrupted while joining ioThread", t); + } + } } } - if (this.ioThread != null && this.ioThread.isAlive()) { + if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) this.sender.forceClose(); - } ClientUtils.closeQuietly(metrics, "producer metrics", firstException); ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException); ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException); log.debug("The Kafka producer has closed."); - if (firstException.get() != null && !swallowException) { + if (firstException.get() != null && !swallowException) throw new KafkaException("Failed to close kafka producer", firstException.get()); - } } private static class FutureFailure implements Future { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index cbf6e51..7a1deea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -69,7 +69,7 @@ public interface Producer extends Closeable { public void close(); /** - * Tries to close the producer cleanly until timeout is expired,force closes the producer after the timeout expires + * Tries to close the producer cleanly until timeout is expired, force closes the producer after the timeout expires * discarding any pending messages. */ public void close(long timeout, TimeUnit unit); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 649a191..840e538 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -366,10 +366,10 @@ public final class RecordAccumulator { * This function is only called when sender is closed forcefully. It will fail all the * incomplete batches and return. */ - public void failAllIncompleteBatches() { + public void abortIncompleteBatches() { for (RecordBatch batch : incomplete.all()) { incomplete.remove(batch); - batch.done(-1L, new InterruptException("Producer is closed forcefully.", new InterruptedException())); + batch.done(-1L, new InterruptException("Producer is closed forcefully.")); } this.batches.clear(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index a430e52..2c334d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -145,7 +145,7 @@ public class Sender implements Runnable { if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. - this.accumulator.failAllIncompleteBatches(); + this.accumulator.abortIncompleteBatches(); } try { this.client.close(); diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java index fee322f..3680f1b 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java @@ -31,4 +31,9 @@ public class InterruptException extends KafkaException { Thread.currentThread().interrupt(); } + public InterruptException(String message) { + super(message, new InterruptedException()); + Thread.currentThread().interrupt(); + } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index bda4513..8b8503b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -63,6 +63,7 @@ public class RecordAccumulatorTest { private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); + RecordAccumulator accum = null; @Test public void testFull() throws Exception { @@ -91,16 +92,16 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, new byte[2 * batchSize], null); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); + accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, value, null); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -269,10 +270,10 @@ public class RecordAccumulatorTest { } @Test - public void testFailAllIncomplete() throws Exception { + public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -285,7 +286,7 @@ public class RecordAccumulatorTest { RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); - accum.failAllIncompleteBatches(); + accum.abortIncompleteBatches(); assertEquals(numExceptionReceivedInCallback.get(), 100); assertFalse(accum.hasUnsent()); diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index a60d09e..87c11c7 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,9 +17,8 @@ package kafka.api -import java.lang.{Integer, IllegalArgumentException} -import java.util.concurrent.TimeUnit import java.util.Properties +import java.util.concurrent.TimeUnit import org.apache.kafka.clients.producer._ import org.scalatest.junit.JUnit3Suite @@ -29,11 +28,19 @@ import org.junit.Assert._ import kafka.server.KafkaConfig import kafka.utils.TestUtils import kafka.consumer.SimpleConsumer -import kafka.message.Message import kafka.integration.KafkaServerTestHarness import org.apache.kafka.common.errors.{InterruptException, SerializationException} + +import kafka.message.Message +import kafka.server.KafkaConfig +import kafka.utils.{TestUtils, TestZKUtils} +import org.apache.kafka.clients.producer._ import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.errors.SerializationException import org.apache.kafka.common.serialization.ByteArraySerializer +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnit3Suite class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { @@ -323,10 +330,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with timeout + * Test close with timeout from caller thread */ @Test - def testCloseWithTimeout() { + def testCloseWithTimeoutFromCallerThread() { var producer: KafkaProducer[Array[Byte],Array[Byte]] = null try { // create topic @@ -361,6 +368,27 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) } + } finally { + if (producer != null) + producer.close() + } + } + + /** + * Test close with timeout from sender thread + */ + @Test + def testCloseWithTimeoutFromSenderThread() { + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val leader0 = leaders(0) + val leader1 = leaders(1) + + // create record + val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes) // Test closing from sender thread. class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback { @@ -372,7 +400,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer.close(-1, TimeUnit.MILLISECONDS) } } - for(i <- 1 until 51) { + for(i <- 0 until 50) { producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) // send message to partition 0 var responses = (0 until numRecords) map (i => producer.send(record0)) @@ -394,14 +422,13 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) } assertEquals("Fetch response to partition 0 should have 100 message returned for each iteration.", - i * numRecords, fetchResponse0.messageSet(topic, 0).size) + (i + 1) * numRecords, fetchResponse0.messageSet(topic, 0).size) assertEquals("Fetch response to partition 1 should have 100 message returned for each iteration.", - i * numRecords, fetchResponse1.messageSet(topic, 1).size) + (i + 1) * numRecords, fetchResponse1.messageSet(topic, 1).size) } } finally { if (producer != null) producer.close() } } - } -- 1.8.3.4 (Apple Git-47) From a20db25a5e415c48bc0cb9ba7bb188d7ddf40ba5 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 24 Mar 2015 13:32:51 -0700 Subject: [PATCH 04/14] Modify according to the latest conclusion. --- .../src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 18abeea..fc4a38d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -523,7 +523,7 @@ public class KafkaProducer implements Producer { */ @Override public void close() { - close(0, TimeUnit.MILLISECONDS); + close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } /** @@ -570,7 +570,6 @@ public class KafkaProducer implements Producer { } } } - } if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) this.sender.forceClose(); -- 1.8.3.4 (Apple Git-47) From 309a67098f840ca9b2467a511090f3c579e6ef13 Mon Sep 17 00:00:00 2001 From: jqin Date: Fri, 27 Mar 2015 16:34:06 -0700 Subject: [PATCH 05/14] Patch for the finally passed KIP-15git status --- .../apache/kafka/clients/producer/KafkaProducer.java | 5 ++++- .../apache/kafka/clients/producer/internals/Sender.java | 1 - .../scala/integration/kafka/api/ProducerSendTest.scala | 17 +++++++++-------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index fc4a38d..bdc6e41 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -518,7 +518,10 @@ public class KafkaProducer implements Producer { /** * Close this producer. This method blocks until all previously sent requests complete. - * This method is equivalent to close(0, TimeUnit.MILLISECONDS). + * This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS). + * [NOTE] If close() is called from callback, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) + * will be called instead. We are doing this because otherwise the sender thread will try to joint itself and + * block forever. * @throws InterruptException If the thread is interrupted while blocked */ @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 2c334d9..1e943d6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -189,7 +189,6 @@ public class Sender implements Runnable { now); sensors.updateProduceRequestMetrics(batches); List requests = createProduceRequests(batches, now); - // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 87c11c7..d870c0f 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -330,10 +330,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with timeout from caller thread + * Test close with zero timeout from caller thread */ @Test - def testCloseWithTimeoutFromCallerThread() { + def testCloseWithZeroTimeoutFromCallerThread() { var producer: KafkaProducer[Array[Byte],Array[Byte]] = null try { // create topic @@ -350,12 +350,11 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) val responses = (0 until numRecords) map (i => producer.send(record0)) assertTrue("No request is complete.", responses.forall(!_.isDone())) - producer.close(-1, TimeUnit.MILLISECONDS) + producer.close(0, TimeUnit.MILLISECONDS) responses.foreach { future => try { future.get() - // No message should be sent successfully - assertTrue(false); + fail("No message should be sent successfully.") } catch { case e: Exception => assertEquals("org.apache.kafka.common.errors.InterruptException: Producer is closed forcefully.", e.getMessage) @@ -375,10 +374,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with timeout from sender thread + * Test close with zero timeout from sender thread */ @Test - def testCloseWithTimeoutFromSenderThread() { + def testCloseWithZeroTimeoutFromSenderThread() { var producer: KafkaProducer[Array[Byte],Array[Byte]] = null try { // create topic @@ -397,7 +396,9 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { // not be sent. (0 until numRecords) map (i => producer.send(record1)) // The close call will be called by all the message callbacks. This is to test idempotent. - producer.close(-1, TimeUnit.MILLISECONDS) + producer.close(0, TimeUnit.MILLISECONDS) + // Test close with non zero timeout. Should not block at all. + producer.close(Long.MaxValue, TimeUnit.MICROSECONDS) } } for(i <- 0 until 50) { -- 1.8.3.4 (Apple Git-47) From 407a69ca8192da531751157f69e69a8bc6a82021 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 7 Apr 2015 15:52:29 -0700 Subject: [PATCH 06/14] Addressed Joel and Guozhang's comments. --- .../kafka/clients/producer/KafkaProducer.java | 8 +- .../apache/kafka/clients/producer/Producer.java | 4 +- .../producer/internals/RecordAccumulator.java | 93 ++++++++++++++-------- .../producer/internals/RecordAccumulatorTest.java | 3 +- .../integration/kafka/api/ProducerSendTest.scala | 4 +- 5 files changed, 69 insertions(+), 43 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index bdc6e41..ab1c747 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -519,9 +519,11 @@ public class KafkaProducer implements Producer { /** * Close this producer. This method blocks until all previously sent requests complete. * This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS). - * [NOTE] If close() is called from callback, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) - * will be called instead. We are doing this because otherwise the sender thread will try to joint itself and - * block forever. + *

+ * If close() is called from callback, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) + * will be called instead. We do this because the sender thread would otherwise try to joint itself and + * block forever. + *

* @throws InterruptException If the thread is interrupted while blocked */ @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 7a1deea..d4a5d39 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -69,8 +69,8 @@ public interface Producer extends Closeable { public void close(); /** - * Tries to close the producer cleanly until timeout is expired, force closes the producer after the timeout expires - * discarding any pending messages. + * Tries to close the producer cleanly within the specified timeout. If the close does not complete within the + * timeout, fail any pending send requests and force close the producer. */ public void close(long timeout, TimeUnit unit); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 840e538..18f16c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -44,6 +43,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} @@ -67,6 +68,7 @@ public final class RecordAccumulator { private final Time time; private final ConcurrentMap> batches; private final IncompleteRecordBatches incomplete; + private final ReadWriteLock closeLock; /** * Create a new record accumulator @@ -106,6 +108,7 @@ public final class RecordAccumulator { this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); this.incomplete = new IncompleteRecordBatches(); this.time = time; + this.closeLock = new ReentrantReadWriteLock(); registerMetrics(metrics, metricGrpName, metricTags); } @@ -146,41 +149,48 @@ public final class RecordAccumulator { * @param value The value for the record * @param callback The user-supplied callback to execute when the request is complete */ - public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { - if (closed) - throw new IllegalStateException("Cannot send after the producer is closed."); - // check if we have an in-progress batch - Deque dq = dequeFor(tp); - synchronized (dq) { - RecordBatch last = dq.peekLast(); - if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, callback); - if (future != null) - return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); + public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { + // We use the ReadWriteLock to make sure append is atomic to close. This will avoid the situation where the + // last batch is missed. + closeLock.readLock().lock(); + try { + if (closed) + throw new IllegalStateException("Cannot send after the producer is closed."); + // check if we have an in-progress batch + Deque dq = dequeFor(tp); + synchronized (dq) { + RecordBatch last = dq.peekLast(); + if (last != null) { + FutureRecordMetadata future = last.tryAppend(key, value, callback); + if (future != null) + return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); + } } - } - // we don't have an in-progress record batch try to allocate a new batch - int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); - log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); - ByteBuffer buffer = free.allocate(size); - synchronized (dq) { - RecordBatch last = dq.peekLast(); - if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, callback); - if (future != null) { - // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... - free.deallocate(buffer); - return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); + // we don't have an in-progress record batch try to allocate a new batch + int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); + log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); + ByteBuffer buffer = free.allocate(size); + synchronized (dq) { + RecordBatch last = dq.peekLast(); + if (last != null) { + FutureRecordMetadata future = last.tryAppend(key, value, callback); + if (future != null) { + // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... + free.deallocate(buffer); + return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); + } } - } - MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); - RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); - FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); + MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); + RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); + FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); - dq.addLast(batch); - incomplete.add(batch); - return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); + dq.addLast(batch); + incomplete.add(batch); + return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); + } + } finally { + closeLock.readLock().unlock(); } } @@ -188,12 +198,14 @@ public final class RecordAccumulator { * Re-enqueue the given record batch in the accumulator to retry */ public void reenqueue(RecordBatch batch, long now) { + closeLock.readLock().lock(); batch.attempts++; batch.lastAttemptMs = now; Deque deque = dequeFor(batch.topicPartition); synchronized (deque) { deque.addFirst(batch); } + closeLock.readLock().unlock(); } /** @@ -255,6 +267,9 @@ public final class RecordAccumulator { * @return Whether there is any unsent record in the accumulator. */ public boolean hasUnsent() { + // Ideally this method should also be atomic against append, i.e. should grab closeLock.writeLock. But + // because this method is only called after close() is called, no append should happen in concurrent when this + // method is called, so we are not grabbing the lock here. for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); synchronized (deque) { @@ -367,9 +382,17 @@ public final class RecordAccumulator { * incomplete batches and return. */ public void abortIncompleteBatches() { + // Ideally this method should also be atomic against append, i.e. should grab closeLock.writeLock. But + // because this method is only called after close() is called, no append should happen in concurrent when this + // method is called, so we are not grabbing the lock here. for (RecordBatch batch : incomplete.all()) { incomplete.remove(batch); - batch.done(-1L, new InterruptException("Producer is closed forcefully.")); + Deque dq = dequeFor(batch.topicPartition); + // Need to close the batch before finishes it. + synchronized (dq) { + batch.records.close(); + } + batch.done(-1L, new IllegalStateException("Producer is closed forcefully.")); } this.batches.clear(); } @@ -378,7 +401,9 @@ public final class RecordAccumulator { * Close this accumulator and force all the record buffers to be drained */ public void close() { + closeLock.writeLock().lock(); this.closed = true; + closeLock.writeLock().unlock(); } /* @@ -416,7 +441,7 @@ public final class RecordAccumulator { */ private final static class IncompleteRecordBatches { private final Set incomplete; - + public IncompleteRecordBatches() { this.incomplete = new HashSet(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 8b8503b..fbd6492 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -63,7 +63,6 @@ public class RecordAccumulatorTest { private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); - RecordAccumulator accum = null; @Test public void testFull() throws Exception { @@ -273,7 +272,7 @@ public class RecordAccumulatorTest { public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index d870c0f..122b273 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -357,7 +357,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { fail("No message should be sent successfully.") } catch { case e: Exception => - assertEquals("org.apache.kafka.common.errors.InterruptException: Producer is closed forcefully.", e.getMessage) + assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage) } } val fetchResponse = if (leader0.get == configs(0).brokerId) { @@ -374,7 +374,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with zero timeout from sender thread + * Test close with zero and non-zero timeout from sender thread */ @Test def testCloseWithZeroTimeoutFromSenderThread() { -- 1.8.3.4 (Apple Git-47) From 1e3b7723013d20692d0ea23110e8fc3ecea4332c Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 7 Apr 2015 16:34:30 -0700 Subject: [PATCH 07/14] rebased on trunk --- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../producer/internals/RecordAccumulator.java | 2 +- .../producer/internals/RecordAccumulatorTest.java | 22 +++++++++++----------- .../clients/producer/internals/SenderTest.java | 6 +++--- .../integration/kafka/api/ProducerSendTest.scala | 11 +---------- 5 files changed, 17 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index ab1c747..1ff160d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -386,7 +386,7 @@ public class KafkaProducer implements Producer { ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 18f16c8..a634c7c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -149,7 +149,7 @@ public final class RecordAccumulator { * @param value The value for the record * @param callback The user-supplied callback to execute when the request is complete */ - public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { + public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { // We use the ReadWriteLock to make sure append is atomic to close. This will avoid the situation where the // last batch is missed. closeLock.readLock().lock(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index fbd6492..c40e349 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -70,10 +70,10 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); @@ -91,7 +91,7 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -99,7 +99,7 @@ public class RecordAccumulatorTest { @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); @@ -122,7 +122,7 @@ public class RecordAccumulatorTest { List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, key, value, null); + accum.append(tp, key, value, CompressionType.NONE, null); } assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -143,7 +143,7 @@ public class RecordAccumulatorTest { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, null); + accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); } catch (Exception e) { e.printStackTrace(); } @@ -183,7 +183,7 @@ public class RecordAccumulatorTest { // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, CompressionType.NONE, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); @@ -192,14 +192,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, key, value, null); + accum.append(tp3, key, value, CompressionType.NONE, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, key, value, null); + accum.append(tp2, key, value, CompressionType.NONE, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -250,7 +250,7 @@ public class RecordAccumulatorTest { long lingerMs = Long.MAX_VALUE; final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, null); + accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -272,7 +272,7 @@ public class RecordAccumulatorTest { public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 8b1805d..7f69b3a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -72,7 +72,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { long offset = 0; - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -99,7 +99,7 @@ public class SenderTest { time, "clientId"); // do a successful retry - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals(1, client.inFlightRequestCount()); @@ -116,7 +116,7 @@ public class SenderTest { assertEquals(offset, future.get().offset()); // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().request().destination()); diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 122b273..b9cdecc 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -20,20 +20,11 @@ package kafka.api import java.util.Properties import java.util.concurrent.TimeUnit -import org.apache.kafka.clients.producer._ -import org.scalatest.junit.JUnit3Suite -import org.junit.Test -import org.junit.Assert._ - -import kafka.server.KafkaConfig -import kafka.utils.TestUtils import kafka.consumer.SimpleConsumer import kafka.integration.KafkaServerTestHarness -import org.apache.kafka.common.errors.{InterruptException, SerializationException} - import kafka.message.Message import kafka.server.KafkaConfig -import kafka.utils.{TestUtils, TestZKUtils} +import kafka.utils.TestUtils import org.apache.kafka.clients.producer._ import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.errors.SerializationException -- 1.8.3.4 (Apple Git-47) From 739620b4c5663ff723745212b3632c838981f635 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 7 Apr 2015 18:14:35 -0700 Subject: [PATCH 08/14] Rebase on trunk --- .../kafka/clients/producer/KafkaProducer.java | 3 +-- .../producer/internals/RecordAccumulator.java | 2 +- .../producer/internals/RecordAccumulatorTest.java | 22 +++++++++++----------- .../clients/producer/internals/SenderTest.java | 6 +++--- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1ff160d..38661e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -265,7 +265,6 @@ public class KafkaProducer implements Producer { } else { this.valueSerializer = valueSerializer; } - config.logUnused(); log.debug("Kafka producer started"); } catch (Throwable t) { @@ -386,7 +385,7 @@ public class KafkaProducer implements Producer { ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback); + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index a634c7c..edef492 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -149,7 +149,7 @@ public final class RecordAccumulator { * @param value The value for the record * @param callback The user-supplied callback to execute when the request is complete */ - public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { + public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { // We use the ReadWriteLock to make sure append is atomic to close. This will avoid the situation where the // last batch is missed. closeLock.readLock().lock(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index c40e349..5b2e4ff 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -70,10 +70,10 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); @@ -92,7 +92,7 @@ public class RecordAccumulatorTest { public void testAppendLarge() throws Exception { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); + accum.append(tp1, key, new byte[2 * batchSize], null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -100,7 +100,7 @@ public class RecordAccumulatorTest { public void testLinger() throws Exception { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -122,7 +122,7 @@ public class RecordAccumulatorTest { List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, key, value, CompressionType.NONE, null); + accum.append(tp, key, value, null); } assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -143,7 +143,7 @@ public class RecordAccumulatorTest { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition(topic, i % numParts), key, value, null); } catch (Exception e) { e.printStackTrace(); } @@ -183,7 +183,7 @@ public class RecordAccumulatorTest { // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); @@ -192,14 +192,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, key, value, CompressionType.NONE, null); + accum.append(tp3, key, value, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, key, value, CompressionType.NONE, null); + accum.append(tp2, key, value, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -250,7 +250,7 @@ public class RecordAccumulatorTest { long lingerMs = Long.MAX_VALUE; final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition(topic, i % 3), key, value, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -281,7 +281,7 @@ public class RecordAccumulatorTest { } } for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, new TestCallback()); + accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback()); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 7f69b3a..8b1805d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -72,7 +72,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { long offset = 0; - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -99,7 +99,7 @@ public class SenderTest { time, "clientId"); // do a successful retry - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals(1, client.inFlightRequestCount()); @@ -116,7 +116,7 @@ public class SenderTest { assertEquals(offset, future.get().offset()); // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().request().destination()); -- 1.8.3.4 (Apple Git-47) From dcd9ebc8e5b5c22fa28f322726bae8b6448e46f4 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 8 Apr 2015 14:00:25 -0700 Subject: [PATCH 09/14] Addressed Joel's comments. --- .../kafka/clients/producer/KafkaProducer.java | 4 +- .../producer/internals/RecordAccumulator.java | 60 +++++++++++++--------- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 38661e1..84ef4ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -520,7 +520,7 @@ public class KafkaProducer implements Producer { * This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS). *

* If close() is called from callback, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) - * will be called instead. We do this because the sender thread would otherwise try to joint itself and + * will be called instead. We do this because the sender thread would otherwise try to join itself and * block forever. *

* @throws InterruptException If the thread is interrupted while blocked @@ -557,12 +557,12 @@ public class KafkaProducer implements Producer { // this will keep track of the first encountered exception AtomicReference firstException = new AtomicReference(); boolean invokedFromCallback = Thread.currentThread() == this.ioThread; - if (timeout > 0) { if (invokedFromCallback) { log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " + "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout); } else { + // Try to close gracefully. if (this.sender != null) this.sender.initiateClose(); if (this.ioThread != null) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index edef492..a123d5d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -43,8 +43,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} @@ -58,8 +56,9 @@ public final class RecordAccumulator { private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class); private volatile boolean closed; - private volatile AtomicInteger flushesInProgress; private int drainIndex; + private final AtomicInteger flushesInProgress; + private final AtomicInteger appendInProgress; private final int batchSize; private final CompressionType compression; private final long lingerMs; @@ -68,7 +67,7 @@ public final class RecordAccumulator { private final Time time; private final ConcurrentMap> batches; private final IncompleteRecordBatches incomplete; - private final ReadWriteLock closeLock; + /** * Create a new record accumulator @@ -99,6 +98,7 @@ public final class RecordAccumulator { this.drainIndex = 0; this.closed = false; this.flushesInProgress = new AtomicInteger(0); + this.appendInProgress = new AtomicInteger(0); this.batchSize = batchSize; this.compression = compression; this.lingerMs = lingerMs; @@ -108,7 +108,6 @@ public final class RecordAccumulator { this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); this.incomplete = new IncompleteRecordBatches(); this.time = time; - this.closeLock = new ReentrantReadWriteLock(); registerMetrics(metrics, metricGrpName, metricTags); } @@ -150,9 +149,9 @@ public final class RecordAccumulator { * @param callback The user-supplied callback to execute when the request is complete */ public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { - // We use the ReadWriteLock to make sure append is atomic to close. This will avoid the situation where the - // last batch is missed. - closeLock.readLock().lock(); + // We keep track of the number of appending thread to make sure we do not miss batches in + // abortIncompleteBatches(). + appendInProgress.incrementAndGet(); try { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); @@ -172,6 +171,9 @@ public final class RecordAccumulator { log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size); synchronized (dq) { + // Need to check if producer is closed again after grabbing the dequeue lock. + if (closed) + throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); @@ -190,7 +192,7 @@ public final class RecordAccumulator { return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { - closeLock.readLock().unlock(); + appendInProgress.decrementAndGet(); } } @@ -198,14 +200,12 @@ public final class RecordAccumulator { * Re-enqueue the given record batch in the accumulator to retry */ public void reenqueue(RecordBatch batch, long now) { - closeLock.readLock().lock(); batch.attempts++; batch.lastAttemptMs = now; Deque deque = dequeFor(batch.topicPartition); synchronized (deque) { deque.addFirst(batch); } - closeLock.readLock().unlock(); } /** @@ -267,9 +267,6 @@ public final class RecordAccumulator { * @return Whether there is any unsent record in the accumulator. */ public boolean hasUnsent() { - // Ideally this method should also be atomic against append, i.e. should grab closeLock.writeLock. But - // because this method is only called after close() is called, no append should happen in concurrent when this - // method is called, so we are not grabbing the lock here. for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); synchronized (deque) { @@ -367,7 +364,14 @@ public final class RecordAccumulator { public void beginFlush() { this.flushesInProgress.getAndIncrement(); } - + + /** + * Are there ay threads currently appeding messages? + */ + private boolean appendInProgress() { + return appendInProgress.get() > 0; + } + /** * Mark all partitions as ready to send and block until the send is complete */ @@ -382,28 +386,38 @@ public final class RecordAccumulator { * incomplete batches and return. */ public void abortIncompleteBatches() { - // Ideally this method should also be atomic against append, i.e. should grab closeLock.writeLock. But - // because this method is only called after close() is called, no append should happen in concurrent when this - // method is called, so we are not grabbing the lock here. + // We need to keep aborting the incomplete batch until no thread is trying to append to + // 1. Avoid losing batches. + // 2. Free up memory in case appending threads are blocked on buffer full. + // This is a tight loop but should be able to get through very quickly. + do { + abortBatches(); + } while (appendInProgress()); + // Do the last abort after no thread was appending. + abortBatches(); + this.batches.clear(); + } + + /** + * Go through incomplete batches and abort them. + */ + private void abortBatches() { for (RecordBatch batch : incomplete.all()) { - incomplete.remove(batch); Deque dq = dequeFor(batch.topicPartition); - // Need to close the batch before finishes it. + // Close the batch before finishes it. synchronized (dq) { batch.records.close(); } batch.done(-1L, new IllegalStateException("Producer is closed forcefully.")); + deallocate(batch); } - this.batches.clear(); } /** * Close this accumulator and force all the record buffers to be drained */ public void close() { - closeLock.writeLock().lock(); this.closed = true; - closeLock.writeLock().unlock(); } /* -- 1.8.3.4 (Apple Git-47) From f2367bdfc55a53c688d003f8ceaf7fa764b905a4 Mon Sep 17 00:00:00 2001 From: jqin Date: Fri, 10 Apr 2015 15:08:24 -0700 Subject: [PATCH 10/14] Addressed Joel's comments --- .../producer/internals/RecordAccumulator.java | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index a123d5d..17b1ffc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -58,7 +58,7 @@ public final class RecordAccumulator { private volatile boolean closed; private int drainIndex; private final AtomicInteger flushesInProgress; - private final AtomicInteger appendInProgress; + private final AtomicInteger appendsInProgress; private final int batchSize; private final CompressionType compression; private final long lingerMs; @@ -98,7 +98,7 @@ public final class RecordAccumulator { this.drainIndex = 0; this.closed = false; this.flushesInProgress = new AtomicInteger(0); - this.appendInProgress = new AtomicInteger(0); + this.appendsInProgress = new AtomicInteger(0); this.batchSize = batchSize; this.compression = compression; this.lingerMs = lingerMs; @@ -151,7 +151,7 @@ public final class RecordAccumulator { public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). - appendInProgress.incrementAndGet(); + appendsInProgress.incrementAndGet(); try { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); @@ -192,7 +192,7 @@ public final class RecordAccumulator { return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { - appendInProgress.decrementAndGet(); + appendsInProgress.decrementAndGet(); } } @@ -366,10 +366,10 @@ public final class RecordAccumulator { } /** - * Are there ay threads currently appeding messages? + * Are there any threads currently appending messages? */ - private boolean appendInProgress() { - return appendInProgress.get() > 0; + private boolean appendsInProgress() { + return appendsInProgress.get() > 0; } /** @@ -392,8 +392,10 @@ public final class RecordAccumulator { // This is a tight loop but should be able to get through very quickly. do { abortBatches(); - } while (appendInProgress()); - // Do the last abort after no thread was appending. + } while (appendsInProgress()); + // After this point, no thread will append any messages because they will see the close + // flag set. We need to do the last abort after no thread was appending in case the there was a new + // batch appended by the last appending thread. abortBatches(); this.batches.clear(); } -- 1.8.3.4 (Apple Git-47) From 461f7ad42d2b32376325d1e3fc14650b1f09bef4 Mon Sep 17 00:00:00 2001 From: jqin Date: Thu, 16 Apr 2015 11:35:03 -0700 Subject: [PATCH 11/14] Addressed Jay's comments --- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../producer/internals/RecordAccumulator.java | 24 ++++++++-------------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 84ef4ee..26e76d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -519,7 +519,7 @@ public class KafkaProducer implements Producer { * Close this producer. This method blocks until all previously sent requests complete. * This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS). *

- * If close() is called from callback, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) + * If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) * will be called instead. We do this because the sender thread would otherwise try to join itself and * block forever. *

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 17b1ffc..3aa1335 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -43,6 +43,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} @@ -58,7 +60,6 @@ public final class RecordAccumulator { private volatile boolean closed; private int drainIndex; private final AtomicInteger flushesInProgress; - private final AtomicInteger appendsInProgress; private final int batchSize; private final CompressionType compression; private final long lingerMs; @@ -67,6 +68,7 @@ public final class RecordAccumulator { private final Time time; private final ConcurrentMap> batches; private final IncompleteRecordBatches incomplete; + private final ReadWriteLock abortIncompleteBatchLock; /** @@ -98,7 +100,6 @@ public final class RecordAccumulator { this.drainIndex = 0; this.closed = false; this.flushesInProgress = new AtomicInteger(0); - this.appendsInProgress = new AtomicInteger(0); this.batchSize = batchSize; this.compression = compression; this.lingerMs = lingerMs; @@ -108,6 +109,7 @@ public final class RecordAccumulator { this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); this.incomplete = new IncompleteRecordBatches(); this.time = time; + this.abortIncompleteBatchLock = new ReentrantReadWriteLock(); registerMetrics(metrics, metricGrpName, metricTags); } @@ -149,10 +151,9 @@ public final class RecordAccumulator { * @param callback The user-supplied callback to execute when the request is complete */ public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { - // We keep track of the number of appending thread to make sure we do not miss batches in - // abortIncompleteBatches(). - appendsInProgress.incrementAndGet(); + // We use the ReadWriteLock to make sure when we abort incomplete batches, we don't miss any message. try { + abortIncompleteBatchLock.readLock().lock(); if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // check if we have an in-progress batch @@ -192,7 +193,7 @@ public final class RecordAccumulator { return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { - appendsInProgress.decrementAndGet(); + abortIncompleteBatchLock.readLock().unlock(); } } @@ -366,13 +367,6 @@ public final class RecordAccumulator { } /** - * Are there any threads currently appending messages? - */ - private boolean appendsInProgress() { - return appendsInProgress.get() > 0; - } - - /** * Mark all partitions as ready to send and block until the send is complete */ public void awaitFlushCompletion() throws InterruptedException { @@ -390,14 +384,14 @@ public final class RecordAccumulator { // 1. Avoid losing batches. // 2. Free up memory in case appending threads are blocked on buffer full. // This is a tight loop but should be able to get through very quickly. - do { + while (!abortIncompleteBatchLock.writeLock().tryLock()) abortBatches(); - } while (appendsInProgress()); // After this point, no thread will append any messages because they will see the close // flag set. We need to do the last abort after no thread was appending in case the there was a new // batch appended by the last appending thread. abortBatches(); this.batches.clear(); + abortIncompleteBatchLock.writeLock().unlock(); } /** -- 1.8.3.4 (Apple Git-47) From 4a1bbb1ef4fe30a9cdcbfd7d261fd468b0a9e507 Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 20 Apr 2015 17:04:11 -0700 Subject: [PATCH 12/14] Change java doc as Jay suggested. --- .../main/java/org/apache/kafka/clients/producer/KafkaProducer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 26e76d9..8f4400b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -536,7 +536,9 @@ public class KafkaProducer implements Producer { * If the producer is unable to complete all requests before the timeout expires, this method will fail * any unsent and unacknowledged records immediately. *

- * If invoked from within a {@link Callback} this method will not block and will be equivalent to close(0, TimeUnit.MILLISECONDS). This is done since no further sending will happen while blocking the I/O thread of the producer. + * If invoked from within a {@link Callback} this method will not block and will be equivalent to + * close(0, TimeUnit.MILLISECONDS). This is done since no further sending will happen while + * blocking the I/O thread of the producer. * * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be * non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete. -- 1.8.3.4 (Apple Git-47) From 8369748fef109cd3c7d8b98b7c5f41bf2c99a122 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 29 Apr 2015 14:56:49 -0700 Subject: [PATCH 13/14] Go back to the AtomicInteger approach for less dependency. --- .../producer/internals/RecordAccumulator.java | 24 ++++++++++++++-------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 3aa1335..17b1ffc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -43,8 +43,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} @@ -60,6 +58,7 @@ public final class RecordAccumulator { private volatile boolean closed; private int drainIndex; private final AtomicInteger flushesInProgress; + private final AtomicInteger appendsInProgress; private final int batchSize; private final CompressionType compression; private final long lingerMs; @@ -68,7 +67,6 @@ public final class RecordAccumulator { private final Time time; private final ConcurrentMap> batches; private final IncompleteRecordBatches incomplete; - private final ReadWriteLock abortIncompleteBatchLock; /** @@ -100,6 +98,7 @@ public final class RecordAccumulator { this.drainIndex = 0; this.closed = false; this.flushesInProgress = new AtomicInteger(0); + this.appendsInProgress = new AtomicInteger(0); this.batchSize = batchSize; this.compression = compression; this.lingerMs = lingerMs; @@ -109,7 +108,6 @@ public final class RecordAccumulator { this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); this.incomplete = new IncompleteRecordBatches(); this.time = time; - this.abortIncompleteBatchLock = new ReentrantReadWriteLock(); registerMetrics(metrics, metricGrpName, metricTags); } @@ -151,9 +149,10 @@ public final class RecordAccumulator { * @param callback The user-supplied callback to execute when the request is complete */ public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { - // We use the ReadWriteLock to make sure when we abort incomplete batches, we don't miss any message. + // We keep track of the number of appending thread to make sure we do not miss batches in + // abortIncompleteBatches(). + appendsInProgress.incrementAndGet(); try { - abortIncompleteBatchLock.readLock().lock(); if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // check if we have an in-progress batch @@ -193,7 +192,7 @@ public final class RecordAccumulator { return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { - abortIncompleteBatchLock.readLock().unlock(); + appendsInProgress.decrementAndGet(); } } @@ -367,6 +366,13 @@ public final class RecordAccumulator { } /** + * Are there any threads currently appending messages? + */ + private boolean appendsInProgress() { + return appendsInProgress.get() > 0; + } + + /** * Mark all partitions as ready to send and block until the send is complete */ public void awaitFlushCompletion() throws InterruptedException { @@ -384,14 +390,14 @@ public final class RecordAccumulator { // 1. Avoid losing batches. // 2. Free up memory in case appending threads are blocked on buffer full. // This is a tight loop but should be able to get through very quickly. - while (!abortIncompleteBatchLock.writeLock().tryLock()) + do { abortBatches(); + } while (appendsInProgress()); // After this point, no thread will append any messages because they will see the close // flag set. We need to do the last abort after no thread was appending in case the there was a new // batch appended by the last appending thread. abortBatches(); this.batches.clear(); - abortIncompleteBatchLock.writeLock().unlock(); } /** -- 1.8.3.4 (Apple Git-47) From f3e7f5a4823271f68f5ec19a2ef3402d325e9ee0 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 29 Apr 2015 16:57:54 -0700 Subject: [PATCH 14/14] Rebased on trunk --- .../src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 8f4400b..766b775 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -576,6 +576,7 @@ public class KafkaProducer implements Producer { } } } + } if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) this.sender.forceClose(); -- 1.8.3.4 (Apple Git-47)