From 6d90ca88556090c0a7cc3bd43ca820c954be6d5f Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 8 Mar 2015 21:01:40 -0700 Subject: [PATCH 1/8] Patch for KAFKA-1660 add a close method with timeout to producer. --- .../kafka/clients/producer/KafkaProducer.java | 36 +++++++-- .../kafka/clients/producer/MockProducer.java | 5 ++ .../apache/kafka/clients/producer/Producer.java | 7 ++ .../producer/internals/RecordAccumulator.java | 13 ++++ .../kafka/clients/producer/internals/Sender.java | 27 +++++-- .../kafka/common/serialization/Serializer.java | 4 +- .../producer/internals/RecordAccumulatorTest.java | 26 +++++++ .../integration/kafka/api/ProducerSendTest.scala | 90 +++++++++++++++++++++- 8 files changed, 193 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b91e2c5..6887f85 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -196,7 +196,7 @@ public class KafkaProducer implements Producer { this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); @@ -509,17 +509,41 @@ public class KafkaProducer implements Producer { /** * Close this producer. This method blocks until all previously sent requests complete. + * This method is equivalent to close(0, TimeUnit.MILLISECONDS). * @throws InterruptException If the thread is interrupted while blocked */ @Override public void close() { - log.trace("Closing the Kafka producer."); + close(0, TimeUnit.MILLISECONDS); + } + + /** + * This method waits up to timeout for the producer to complete previous send requests. + * If producer was not able to finish before timeout, this method will fail the incomplete send requests + * and close the producer forcefully. When InterruptException is thrown, user should retry, otherwise + * there might be metric leak. + * @param timeout The max time to wait for producer complete send requests. + * Wait forever when timeout is 0. + * Does not wait when timeout is negative. + * @param timeUnit The time unit for timeout + * @throws InterruptException If the thread is interrupted while blocked + */ + @Override + public void close(long timeout, TimeUnit timeUnit) { + log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); this.sender.initiateClose(); - try { - this.ioThread.join(); - } catch (InterruptedException e) { - throw new InterruptException(e); + if (timeout >= 0) { + try { + this.ioThread.join(timeUnit.toMillis(timeout)); + } catch (InterruptedException e) { + throw new KafkaException(e); + } } + + if (this.ioThread.isAlive()) { + this.sender.forceClose(); + } + this.metrics.close(); this.keySerializer.close(); this.valueSerializer.close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 6913090..3c34610 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.Partitioner; @@ -146,6 +147,10 @@ public class MockProducer implements Producer { public void close() { } + @Override + public void close(long timeout, TimeUnit timeUnit) { + } + /** * Get the list of sent records since the last call to {@link #clear()} */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 5b3e75e..cbf6e51 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; @@ -67,4 +68,10 @@ public interface Producer extends Closeable { */ public void close(); + /** + * Tries to close the producer cleanly until timeout is expired,force closes the producer after the timeout expires + * discarding any pending messages. + */ + public void close(long timeout, TimeUnit unit); + } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 0e7ab29..b1311fa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -358,6 +359,18 @@ public final class RecordAccumulator { } /** + * This function is only called when sender is closed forcefully. It will fail all the + * incomplete batches and return. + */ + public void failAllIncompleteBatches() { + for (RecordBatch batch : incomplete.all()) { + incomplete.remove(batch); + batch.done(-1L, new InterruptException("Producer is closed forcefully.", new InterruptedException())); + } + this.batches.clear(); + } + + /** * Close this accumulator and force all the record buffers to be drained */ public void close() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 70954ca..f6d5b28 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -83,6 +83,9 @@ public class Sender implements Runnable { /* true while the sender thread is still running */ private volatile boolean running; + /* true when the caller wants to ignore all unsent/inflight messages and force close. */ + private volatile boolean forceClose; + /* metrics */ private final SenderMetrics sensors; @@ -132,12 +135,18 @@ public class Sender implements Runnable { // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { - try { - run(time.milliseconds()); - } catch (Exception e) { - log.error("Uncaught error in kafka producer I/O thread: ", e); + if (!this.forceClose) { + while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { + try { + run(time.milliseconds()); + } catch (Exception e) { + log.error("Uncaught error in kafka producer I/O thread: ", e); + } } + } else { + // We need to fail all the incomplete batches and wake up the threads waiting on + // the futures. + this.accumulator.failAllIncompleteBatches(); } this.client.close(); @@ -209,6 +218,14 @@ public class Sender implements Runnable { } /** + * Closes the sender without sending out any pending messages. + */ + public void forceClose() { + this.forceClose = true; + initiateClose(); + } + + /** * Handle a produce response */ private void handleProduceResponse(ClientResponse response, Map batches, long now) { diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java index c2fdc23..50f8703 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -38,7 +38,9 @@ public interface Serializer { public byte[] serialize(String topic, T data); /** - * Close this serializer + * Close this serializer. + * This method has to be idempotent if the serializer is used in KafkaProducer because it might be called + * multiple times. */ public void close(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 05e2929..2113a41 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -26,7 +26,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -227,4 +230,27 @@ public class RecordAccumulatorTest { assertFalse(accum.hasUnsent()); } + @Test + public void testFailAllIncomplete() throws Exception { + long lingerMs = Long.MAX_VALUE; + final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + class TestCallback implements Callback { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + assertTrue(exception.getMessage().equals("Producer is closed forcefully.")); + numExceptionReceivedInCallback.incrementAndGet(); + } + } + for (int i = 0; i < 100; i++) + accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, new TestCallback()); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + + accum.failAllIncompleteBatches(); + assertEquals(numExceptionReceivedInCallback.get(), 100); + assertFalse(accum.hasUnsent()); + + } + } diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 9811a2b..a60d09e 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,6 +17,10 @@ package kafka.api +import java.lang.{Integer, IllegalArgumentException} +import java.util.concurrent.TimeUnit +import java.util.Properties + import org.apache.kafka.clients.producer._ import org.scalatest.junit.JUnit3Suite import org.junit.Test @@ -27,8 +31,7 @@ import kafka.utils.TestUtils import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.integration.KafkaServerTestHarness -import org.apache.kafka.common.errors.SerializationException -import java.util.Properties +import org.apache.kafka.common.errors.{InterruptException, SerializationException} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.serialization.ByteArraySerializer @@ -318,6 +321,87 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer.close() } } - + + /** + * Test close with timeout + */ + @Test + def testCloseWithTimeout() { + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val leader0 = leaders(0) + val leader1 = leaders(1) + + // create record + val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes) + + // Test closing from caller thread. + for(i <- 0 until 50) { + producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + val responses = (0 until numRecords) map (i => producer.send(record0)) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + producer.close(-1, TimeUnit.MILLISECONDS) + responses.foreach { future => + try { + future.get() + // No message should be sent successfully + assertTrue(false); + } catch { + case e: Exception => + assertEquals("org.apache.kafka.common.errors.InterruptException: Producer is closed forcefully.", e.getMessage) + } + } + val fetchResponse = if (leader0.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) + } + + // Test closing from sender thread. + class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback { + override def onCompletion(metadata: RecordMetadata, exception: Exception) { + // Trigger another batch in accumulator before close the producer. These messages should + // not be sent. + (0 until numRecords) map (i => producer.send(record1)) + // The close call will be called by all the message callbacks. This is to test idempotent. + producer.close(-1, TimeUnit.MILLISECONDS) + } + } + for(i <- 1 until 51) { + producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + // send message to partition 0 + var responses = (0 until numRecords) map (i => producer.send(record0)) + // send message to partition 1 + responses ++= ((0 until numRecords) map (i => producer.send(record1, new CloseCallback(producer)))) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + // flush the messages. + producer.flush() + assertTrue("All request are complete.", responses.forall(_.isDone())) + // Check the messages received by broker. + val fetchResponse0 = if (leader0.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + val fetchResponse1 = if (leader1.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + } + assertEquals("Fetch response to partition 0 should have 100 message returned for each iteration.", + i * numRecords, fetchResponse0.messageSet(topic, 0).size) + assertEquals("Fetch response to partition 1 should have 100 message returned for each iteration.", + i * numRecords, fetchResponse1.messageSet(topic, 1).size) + } + } finally { + if (producer != null) + producer.close() + } + } } -- 1.8.3.4 (Apple Git-47) From 509334fa2227d3dbe89d4a4b5fb73e070e03a10f Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 8 Mar 2015 21:14:33 -0700 Subject: [PATCH 2/8] A minor fix. --- .../apache/kafka/clients/producer/internals/Sender.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index f6d5b28..5f85309 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -135,15 +135,14 @@ public class Sender implements Runnable { // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - if (!this.forceClose) { - while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { - try { - run(time.milliseconds()); - } catch (Exception e) { - log.error("Uncaught error in kafka producer I/O thread: ", e); - } + while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) { + try { + run(time.milliseconds()); + } catch (Exception e) { + log.error("Uncaught error in kafka producer I/O thread: ", e); } - } else { + } + if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. this.accumulator.failAllIncompleteBatches(); -- 1.8.3.4 (Apple Git-47) From 50f9b97548c052ba73dece1bbc4f14e378c14814 Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 9 Mar 2015 12:56:15 -0700 Subject: [PATCH 3/8] Incorporated Guozhang's comments. --- .../kafka/clients/producer/KafkaProducer.java | 4 +- .../apache/kafka/clients/producer/Producer.java | 2 +- .../producer/internals/RecordAccumulator.java | 4 +- .../kafka/clients/producer/internals/Sender.java | 2 +- .../kafka/common/errors/InterruptException.java | 5 +++ .../producer/internals/RecordAccumulatorTest.java | 15 ++++---- .../integration/kafka/api/ProducerSendTest.scala | 45 +++++++++++++++++----- 7 files changed, 55 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 6887f85..b3ba477 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -531,12 +531,12 @@ public class KafkaProducer implements Producer { @Override public void close(long timeout, TimeUnit timeUnit) { log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); - this.sender.initiateClose(); if (timeout >= 0) { + this.sender.initiateClose(); try { this.ioThread.join(timeUnit.toMillis(timeout)); } catch (InterruptedException e) { - throw new KafkaException(e); + throw new InterruptException(e); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index cbf6e51..7a1deea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -69,7 +69,7 @@ public interface Producer extends Closeable { public void close(); /** - * Tries to close the producer cleanly until timeout is expired,force closes the producer after the timeout expires + * Tries to close the producer cleanly until timeout is expired, force closes the producer after the timeout expires * discarding any pending messages. */ public void close(long timeout, TimeUnit unit); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index b1311fa..af0df7d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -362,10 +362,10 @@ public final class RecordAccumulator { * This function is only called when sender is closed forcefully. It will fail all the * incomplete batches and return. */ - public void failAllIncompleteBatches() { + public void abortIncompleteBatches() { for (RecordBatch batch : incomplete.all()) { incomplete.remove(batch); - batch.done(-1L, new InterruptException("Producer is closed forcefully.", new InterruptedException())); + batch.done(-1L, new InterruptException("Producer is closed forcefully.")); } this.batches.clear(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 5f85309..9a75b1a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -145,7 +145,7 @@ public class Sender implements Runnable { if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. - this.accumulator.failAllIncompleteBatches(); + this.accumulator.abortIncompleteBatches(); } this.client.close(); diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java index fee322f..3680f1b 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java @@ -31,4 +31,9 @@ public class InterruptException extends KafkaException { Thread.currentThread().interrupt(); } + public InterruptException(String message) { + super(message, new InterruptedException()); + Thread.currentThread().interrupt(); + } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 2113a41..3dd40b5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -63,6 +63,7 @@ public class RecordAccumulatorTest { private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); + RecordAccumulator accum = null; @Test public void testFull() throws Exception { @@ -91,16 +92,16 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, new byte[2 * batchSize], null); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); + accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, value, null); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -231,10 +232,10 @@ public class RecordAccumulatorTest { } @Test - public void testFailAllIncomplete() throws Exception { + public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -247,7 +248,7 @@ public class RecordAccumulatorTest { RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); - accum.failAllIncompleteBatches(); + accum.abortIncompleteBatches(); assertEquals(numExceptionReceivedInCallback.get(), 100); assertFalse(accum.hasUnsent()); diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index a60d09e..87c11c7 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,9 +17,8 @@ package kafka.api -import java.lang.{Integer, IllegalArgumentException} -import java.util.concurrent.TimeUnit import java.util.Properties +import java.util.concurrent.TimeUnit import org.apache.kafka.clients.producer._ import org.scalatest.junit.JUnit3Suite @@ -29,11 +28,19 @@ import org.junit.Assert._ import kafka.server.KafkaConfig import kafka.utils.TestUtils import kafka.consumer.SimpleConsumer -import kafka.message.Message import kafka.integration.KafkaServerTestHarness import org.apache.kafka.common.errors.{InterruptException, SerializationException} + +import kafka.message.Message +import kafka.server.KafkaConfig +import kafka.utils.{TestUtils, TestZKUtils} +import org.apache.kafka.clients.producer._ import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.errors.SerializationException import org.apache.kafka.common.serialization.ByteArraySerializer +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnit3Suite class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { @@ -323,10 +330,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with timeout + * Test close with timeout from caller thread */ @Test - def testCloseWithTimeout() { + def testCloseWithTimeoutFromCallerThread() { var producer: KafkaProducer[Array[Byte],Array[Byte]] = null try { // create topic @@ -361,6 +368,27 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) } + } finally { + if (producer != null) + producer.close() + } + } + + /** + * Test close with timeout from sender thread + */ + @Test + def testCloseWithTimeoutFromSenderThread() { + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val leader0 = leaders(0) + val leader1 = leaders(1) + + // create record + val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes) // Test closing from sender thread. class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback { @@ -372,7 +400,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer.close(-1, TimeUnit.MILLISECONDS) } } - for(i <- 1 until 51) { + for(i <- 0 until 50) { producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) // send message to partition 0 var responses = (0 until numRecords) map (i => producer.send(record0)) @@ -394,14 +422,13 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) } assertEquals("Fetch response to partition 0 should have 100 message returned for each iteration.", - i * numRecords, fetchResponse0.messageSet(topic, 0).size) + (i + 1) * numRecords, fetchResponse0.messageSet(topic, 0).size) assertEquals("Fetch response to partition 1 should have 100 message returned for each iteration.", - i * numRecords, fetchResponse1.messageSet(topic, 1).size) + (i + 1) * numRecords, fetchResponse1.messageSet(topic, 1).size) } } finally { if (producer != null) producer.close() } } - } -- 1.8.3.4 (Apple Git-47) From 0758f418ef5bb3007b8be48c6d38598884f9839d Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 24 Mar 2015 13:32:51 -0700 Subject: [PATCH 4/8] Modify according to the latest conclusion. --- .../kafka/clients/producer/KafkaProducer.java | 38 ++++++++++++++++------ 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b3ba477..27b9eed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -514,34 +514,52 @@ public class KafkaProducer implements Producer { */ @Override public void close() { - close(0, TimeUnit.MILLISECONDS); + close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } /** * This method waits up to timeout for the producer to complete previous send requests. * If producer was not able to finish before timeout, this method will fail the incomplete send requests - * and close the producer forcefully. When InterruptException is thrown, user should retry, otherwise - * there might be metric leak. - * @param timeout The max time to wait for producer complete send requests. - * Wait forever when timeout is 0. - * Does not wait when timeout is negative. + * and close the producer forcefully. If timeout > 0, this method is a blocking call and will try to join + * the sender thread before it returns. If timeout = 0, this method is a non-blocking call and does not join + * the sender thread. If this method is invoked from callback with timeout > 0, a warning message will be logged + * and the producer will block forever. + * When InterruptException is thrown, user should retry, otherwise there might be metric leak. + * @param timeout The max time to wait for producer complete send requests. The value should be non negative. + * Does not wait when timeout is 0. * @param timeUnit The time unit for timeout * @throws InterruptException If the thread is interrupted while blocked + * @throws IllegalArgumentException If the timeout is negative. */ @Override public void close(long timeout, TimeUnit timeUnit) { log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); - if (timeout >= 0) { + // For timeout > 0, we ensure the sender thread is joined. + if (timeout > 0) { + if (Thread.currentThread() == this.ioThread) + log.warn("You see this warning because you are calling close(timeout, TimeUnit) from callback with timeout > 0." + + "Only close(0, TimeUnit) can be invoked from callback. Otherwise the producer will block on close forever."); this.sender.initiateClose(); try { this.ioThread.join(timeUnit.toMillis(timeout)); } catch (InterruptedException e) { throw new InterruptException(e); } - } - if (this.ioThread.isAlive()) { - this.sender.forceClose(); + // If elegant close timeout, we issue a force close. + if (this.ioThread.isAlive()) { + this.sender.forceClose(); + try { + this.ioThread.join(); + } catch (InterruptedException e) { + throw new InterruptException(e); + } + } + } else if (timeout == 0) { + if (this.ioThread.isAlive()) + this.sender.forceClose(); + } else { + throw new IllegalArgumentException("The timeout cannot be negative."); } this.metrics.close(); -- 1.8.3.4 (Apple Git-47) From 3a29cf2f32d66e46a083883a7d52c1228ebdbbb5 Mon Sep 17 00:00:00 2001 From: jqin Date: Fri, 27 Mar 2015 16:34:06 -0700 Subject: [PATCH 5/8] Patch for the finally passed KIP-15git status --- .../kafka/clients/producer/KafkaProducer.java | 55 +++++++++++++--------- .../kafka/clients/producer/internals/Sender.java | 1 - .../integration/kafka/api/ProducerSendTest.scala | 17 +++---- 3 files changed, 42 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 27b9eed..4eabe50 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -509,7 +509,10 @@ public class KafkaProducer implements Producer { /** * Close this producer. This method blocks until all previously sent requests complete. - * This method is equivalent to close(0, TimeUnit.MILLISECONDS). + * This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS). + * [NOTE] If close() is called from callback, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) + * will be called instead. We are doing this because otherwise the sender thread will try to joint itself and + * block forever. * @throws InterruptException If the thread is interrupted while blocked */ @Override @@ -520,10 +523,14 @@ public class KafkaProducer implements Producer { /** * This method waits up to timeout for the producer to complete previous send requests. * If producer was not able to finish before timeout, this method will fail the incomplete send requests - * and close the producer forcefully. If timeout > 0, this method is a blocking call and will try to join - * the sender thread before it returns. If timeout = 0, this method is a non-blocking call and does not join - * the sender thread. If this method is invoked from callback with timeout > 0, a warning message will be logged - * and the producer will block forever. + * and close the producer forcefully. + * If timeout > 0, this method is a blocking call and will try to join the sender thread before it returns. + * If timeout = 0, this method is a non-blocking call and does not join the sender thread. + *

+ * If this method is invoked from callback with timeout > 0, a warning message will be logged and + * close(0, TimeUnit.MILLISECONDS) will be called instead. We are doing this because otherwise the sender + * thread will try to join itself and block forever. + *

* When InterruptException is thrown, user should retry, otherwise there might be metric leak. * @param timeout The max time to wait for producer complete send requests. The value should be non negative. * Does not wait when timeout is 0. @@ -533,33 +540,37 @@ public class KafkaProducer implements Producer { */ @Override public void close(long timeout, TimeUnit timeUnit) { + if (timeout < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); + log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); - // For timeout > 0, we ensure the sender thread is joined. + boolean invokedFromCallback = Thread.currentThread() == this.ioThread; + // Try to close elegantly. if (timeout > 0) { - if (Thread.currentThread() == this.ioThread) - log.warn("You see this warning because you are calling close(timeout, TimeUnit) from callback with timeout > 0." + - "Only close(0, TimeUnit) can be invoked from callback. Otherwise the producer will block on close forever."); - this.sender.initiateClose(); - try { - this.ioThread.join(timeUnit.toMillis(timeout)); - } catch (InterruptedException e) { - throw new InterruptException(e); + if (invokedFromCallback) { + log.warn("You see this warning because you are calling close(timeout, TimeUnit) from callback with timeout > 0. " + + "close(0, TimeUnit.MILLISECONDS) will be invoked instead to avoid deadlock."); + } else { + this.sender.initiateClose(); + try { + this.ioThread.join(timeUnit.toMillis(timeout)); + } catch (InterruptedException e) { + throw new InterruptException(e); + } } + } - // If elegant close timeout, we issue a force close. - if (this.ioThread.isAlive()) { - this.sender.forceClose(); + // Initiate a force close if sender thread is still alive. + if (this.ioThread.isAlive()) { + this.sender.forceClose(); + // Only join the sender thread when not calling from callback. + if (!invokedFromCallback) { try { this.ioThread.join(); } catch (InterruptedException e) { throw new InterruptException(e); } } - } else if (timeout == 0) { - if (this.ioThread.isAlive()) - this.sender.forceClose(); - } else { - throw new IllegalArgumentException("The timeout cannot be negative."); } this.metrics.close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9a75b1a..0dee33e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -186,7 +186,6 @@ public class Sender implements Runnable { now); sensors.updateProduceRequestMetrics(batches); List requests = createProduceRequests(batches, now); - // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 87c11c7..d870c0f 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -330,10 +330,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with timeout from caller thread + * Test close with zero timeout from caller thread */ @Test - def testCloseWithTimeoutFromCallerThread() { + def testCloseWithZeroTimeoutFromCallerThread() { var producer: KafkaProducer[Array[Byte],Array[Byte]] = null try { // create topic @@ -350,12 +350,11 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) val responses = (0 until numRecords) map (i => producer.send(record0)) assertTrue("No request is complete.", responses.forall(!_.isDone())) - producer.close(-1, TimeUnit.MILLISECONDS) + producer.close(0, TimeUnit.MILLISECONDS) responses.foreach { future => try { future.get() - // No message should be sent successfully - assertTrue(false); + fail("No message should be sent successfully.") } catch { case e: Exception => assertEquals("org.apache.kafka.common.errors.InterruptException: Producer is closed forcefully.", e.getMessage) @@ -375,10 +374,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with timeout from sender thread + * Test close with zero timeout from sender thread */ @Test - def testCloseWithTimeoutFromSenderThread() { + def testCloseWithZeroTimeoutFromSenderThread() { var producer: KafkaProducer[Array[Byte],Array[Byte]] = null try { // create topic @@ -397,7 +396,9 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { // not be sent. (0 until numRecords) map (i => producer.send(record1)) // The close call will be called by all the message callbacks. This is to test idempotent. - producer.close(-1, TimeUnit.MILLISECONDS) + producer.close(0, TimeUnit.MILLISECONDS) + // Test close with non zero timeout. Should not block at all. + producer.close(Long.MaxValue, TimeUnit.MICROSECONDS) } } for(i <- 0 until 50) { -- 1.8.3.4 (Apple Git-47) From 5af115000ab097f66c43287ec7f4eae40a50b9e0 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 7 Apr 2015 15:52:29 -0700 Subject: [PATCH 6/8] Addressed Joel and Guozhang's comments. --- .../kafka/clients/producer/KafkaProducer.java | 38 +++++---- .../apache/kafka/clients/producer/Producer.java | 4 +- .../producer/internals/RecordAccumulator.java | 93 ++++++++++++++-------- .../producer/internals/RecordAccumulatorTest.java | 3 +- .../integration/kafka/api/ProducerSendTest.scala | 4 +- 5 files changed, 87 insertions(+), 55 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 4eabe50..d795d2f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -510,9 +510,11 @@ public class KafkaProducer implements Producer { /** * Close this producer. This method blocks until all previously sent requests complete. * This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS). - * [NOTE] If close() is called from callback, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) - * will be called instead. We are doing this because otherwise the sender thread will try to joint itself and - * block forever. + *

+ * If close() is called from callback, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) + * will be called instead. We do this because the sender thread would otherwise try to joint itself and + * block forever. + *

* @throws InterruptException If the thread is interrupted while blocked */ @Override @@ -521,17 +523,22 @@ public class KafkaProducer implements Producer { } /** - * This method waits up to timeout for the producer to complete previous send requests. - * If producer was not able to finish before timeout, this method will fail the incomplete send requests + * This method waits up to timeout for the producer to complete outstanding send requests. + * If producer is unable to finish before timeout, this method will fail the incomplete send requests * and close the producer forcefully. - * If timeout > 0, this method is a blocking call and will try to join the sender thread before it returns. - * If timeout = 0, this method is a non-blocking call and does not join the sender thread. *

- * If this method is invoked from callback with timeout > 0, a warning message will be logged and - * close(0, TimeUnit.MILLISECONDS) will be called instead. We are doing this because otherwise the sender - * thread will try to join itself and block forever. + * When timeout = 0, this method initiates a force close immediately then may or may not block on joining sender + * thread depending on how the method is invoked. + *

    + *
  • If the method is invoked from user thread, it will try to join the sender thread.
  • + *
  • If the method is invoked from callback, it is a non-blocking call without trying to join itself.
  • + *
+ *

+ *

+ * If this method is invoked from callback with timeout > 0, a warning message will be logged and + * close(0, TimeUnit.MILLISECONDS) will be called instead. We do this because the sender would otherwise + * try to join itself and block forever. *

- * When InterruptException is thrown, user should retry, otherwise there might be metric leak. * @param timeout The max time to wait for producer complete send requests. The value should be non negative. * Does not wait when timeout is 0. * @param timeUnit The time unit for timeout @@ -543,13 +550,13 @@ public class KafkaProducer implements Producer { if (timeout < 0) throw new IllegalArgumentException("The timeout cannot be negative."); - log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); + log.trace("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); boolean invokedFromCallback = Thread.currentThread() == this.ioThread; - // Try to close elegantly. + // Try to close gracefully. if (timeout > 0) { if (invokedFromCallback) { - log.warn("You see this warning because you are calling close(timeout, TimeUnit) from callback with timeout > 0. " + - "close(0, TimeUnit.MILLISECONDS) will be invoked instead to avoid deadlock."); + log.warn("Overriding close timeout {} ms to 0 ms in order to prevent deadlock due to self-join. " + + "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout); } else { this.sender.initiateClose(); try { @@ -562,6 +569,7 @@ public class KafkaProducer implements Producer { // Initiate a force close if sender thread is still alive. if (this.ioThread.isAlive()) { + log.info("Initiating force close."); this.sender.forceClose(); // Only join the sender thread when not calling from callback. if (!invokedFromCallback) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 7a1deea..d4a5d39 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -69,8 +69,8 @@ public interface Producer extends Closeable { public void close(); /** - * Tries to close the producer cleanly until timeout is expired, force closes the producer after the timeout expires - * discarding any pending messages. + * Tries to close the producer cleanly within the specified timeout. If the close does not complete within the + * timeout, fail any pending send requests and force close the producer. */ public void close(long timeout, TimeUnit unit); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index af0df7d..464df9d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -44,6 +43,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} @@ -67,6 +68,7 @@ public final class RecordAccumulator { private final Time time; private final ConcurrentMap> batches; private final IncompleteRecordBatches incomplete; + private final ReadWriteLock closeLock; /** * Create a new record accumulator @@ -106,6 +108,7 @@ public final class RecordAccumulator { this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); this.incomplete = new IncompleteRecordBatches(); this.time = time; + this.closeLock = new ReentrantReadWriteLock(); registerMetrics(metrics, metricGrpName, metricTags); } @@ -146,41 +149,48 @@ public final class RecordAccumulator { * @param value The value for the record * @param callback The user-supplied callback to execute when the request is complete */ - public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { - if (closed) - throw new IllegalStateException("Cannot send after the producer is closed."); - // check if we have an in-progress batch - Deque dq = dequeFor(tp); - synchronized (dq) { - RecordBatch last = dq.peekLast(); - if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, callback); - if (future != null) - return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); + public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { + // We use the ReadWriteLock to make sure append is atomic to close. This will avoid the situation where the + // last batch is missed. + closeLock.readLock().lock(); + try { + if (closed) + throw new IllegalStateException("Cannot send after the producer is closed."); + // check if we have an in-progress batch + Deque dq = dequeFor(tp); + synchronized (dq) { + RecordBatch last = dq.peekLast(); + if (last != null) { + FutureRecordMetadata future = last.tryAppend(key, value, callback); + if (future != null) + return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); + } } - } - // we don't have an in-progress record batch try to allocate a new batch - int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); - log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); - ByteBuffer buffer = free.allocate(size); - synchronized (dq) { - RecordBatch last = dq.peekLast(); - if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, callback); - if (future != null) { - // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... - free.deallocate(buffer); - return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); + // we don't have an in-progress record batch try to allocate a new batch + int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); + log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); + ByteBuffer buffer = free.allocate(size); + synchronized (dq) { + RecordBatch last = dq.peekLast(); + if (last != null) { + FutureRecordMetadata future = last.tryAppend(key, value, callback); + if (future != null) { + // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... + free.deallocate(buffer); + return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); + } } - } - MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); - RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); - FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); + MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); + RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); + FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); - dq.addLast(batch); - incomplete.add(batch); - return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); + dq.addLast(batch); + incomplete.add(batch); + return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); + } + } finally { + closeLock.readLock().unlock(); } } @@ -188,12 +198,14 @@ public final class RecordAccumulator { * Re-enqueue the given record batch in the accumulator to retry */ public void reenqueue(RecordBatch batch, long now) { + closeLock.readLock().lock(); batch.attempts++; batch.lastAttemptMs = now; Deque deque = dequeFor(batch.topicPartition); synchronized (deque) { deque.addFirst(batch); } + closeLock.readLock().unlock(); } /** @@ -255,6 +267,9 @@ public final class RecordAccumulator { * @return Whether there is any unsent record in the accumulator. */ public boolean hasUnsent() { + // Ideally this method should also be atomic against append, i.e. should grab closeLock.writeLock. But + // because this method is only called after close() is called, no append should happen in concurrent when this + // method is called, so we are not grabbing the lock here. for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); synchronized (deque) { @@ -363,9 +378,17 @@ public final class RecordAccumulator { * incomplete batches and return. */ public void abortIncompleteBatches() { + // Ideally this method should also be atomic against append, i.e. should grab closeLock.writeLock. But + // because this method is only called after close() is called, no append should happen in concurrent when this + // method is called, so we are not grabbing the lock here. for (RecordBatch batch : incomplete.all()) { incomplete.remove(batch); - batch.done(-1L, new InterruptException("Producer is closed forcefully.")); + Deque dq = dequeFor(batch.topicPartition); + // Need to close the batch before finishes it. + synchronized (dq) { + batch.records.close(); + } + batch.done(-1L, new IllegalStateException("Producer is closed forcefully.")); } this.batches.clear(); } @@ -374,7 +397,9 @@ public final class RecordAccumulator { * Close this accumulator and force all the record buffers to be drained */ public void close() { + closeLock.writeLock().lock(); this.closed = true; + closeLock.writeLock().unlock(); } /* @@ -412,7 +437,7 @@ public final class RecordAccumulator { */ private final static class IncompleteRecordBatches { private final Set incomplete; - + public IncompleteRecordBatches() { this.incomplete = new HashSet(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 3dd40b5..76e3b9c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -63,7 +63,6 @@ public class RecordAccumulatorTest { private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); - RecordAccumulator accum = null; @Test public void testFull() throws Exception { @@ -235,7 +234,7 @@ public class RecordAccumulatorTest { public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index d870c0f..122b273 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -357,7 +357,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { fail("No message should be sent successfully.") } catch { case e: Exception => - assertEquals("org.apache.kafka.common.errors.InterruptException: Producer is closed forcefully.", e.getMessage) + assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage) } } val fetchResponse = if (leader0.get == configs(0).brokerId) { @@ -374,7 +374,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with zero timeout from sender thread + * Test close with zero and non-zero timeout from sender thread */ @Test def testCloseWithZeroTimeoutFromSenderThread() { -- 1.8.3.4 (Apple Git-47) From 0930af24167b5c7e45d7ba5e51ed01af8a891c32 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 7 Apr 2015 16:34:30 -0700 Subject: [PATCH 7/8] rebased on trunk --- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../producer/internals/RecordAccumulator.java | 2 +- .../producer/internals/RecordAccumulatorTest.java | 22 +++++++++++----------- .../clients/producer/internals/SenderTest.java | 6 +++--- .../integration/kafka/api/ProducerSendTest.scala | 11 +---------- 5 files changed, 17 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index d795d2f..9825990 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -377,7 +377,7 @@ public class KafkaProducer implements Producer { ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 464df9d..daa72c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -149,7 +149,7 @@ public final class RecordAccumulator { * @param value The value for the record * @param callback The user-supplied callback to execute when the request is complete */ - public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { + public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { // We use the ReadWriteLock to make sure append is atomic to close. This will avoid the situation where the // last batch is missed. closeLock.readLock().lock(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 76e3b9c..2866595 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -70,10 +70,10 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); @@ -91,7 +91,7 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -99,7 +99,7 @@ public class RecordAccumulatorTest { @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); @@ -122,7 +122,7 @@ public class RecordAccumulatorTest { List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, key, value, null); + accum.append(tp, key, value, CompressionType.NONE, null); } assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -143,7 +143,7 @@ public class RecordAccumulatorTest { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, null); + accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); } catch (Exception e) { e.printStackTrace(); } @@ -183,7 +183,7 @@ public class RecordAccumulatorTest { // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, CompressionType.NONE, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); @@ -192,14 +192,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, key, value, null); + accum.append(tp3, key, value, CompressionType.NONE, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, key, value, null); + accum.append(tp2, key, value, CompressionType.NONE, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -212,7 +212,7 @@ public class RecordAccumulatorTest { long lingerMs = Long.MAX_VALUE; final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, null); + accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -234,7 +234,7 @@ public class RecordAccumulatorTest { public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 8b1805d..7f69b3a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -72,7 +72,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { long offset = 0; - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -99,7 +99,7 @@ public class SenderTest { time, "clientId"); // do a successful retry - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals(1, client.inFlightRequestCount()); @@ -116,7 +116,7 @@ public class SenderTest { assertEquals(offset, future.get().offset()); // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().request().destination()); diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 122b273..b9cdecc 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -20,20 +20,11 @@ package kafka.api import java.util.Properties import java.util.concurrent.TimeUnit -import org.apache.kafka.clients.producer._ -import org.scalatest.junit.JUnit3Suite -import org.junit.Test -import org.junit.Assert._ - -import kafka.server.KafkaConfig -import kafka.utils.TestUtils import kafka.consumer.SimpleConsumer import kafka.integration.KafkaServerTestHarness -import org.apache.kafka.common.errors.{InterruptException, SerializationException} - import kafka.message.Message import kafka.server.KafkaConfig -import kafka.utils.{TestUtils, TestZKUtils} +import kafka.utils.TestUtils import org.apache.kafka.clients.producer._ import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.errors.SerializationException -- 1.8.3.4 (Apple Git-47) From 81f5ef25b7027f3cee9796c7962f8a968f5a7e18 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 7 Apr 2015 18:14:35 -0700 Subject: [PATCH 8/8] Rebase on trunk --- .../kafka/clients/producer/KafkaProducer.java | 4 ++-- .../producer/internals/RecordAccumulator.java | 2 +- .../producer/internals/RecordAccumulatorTest.java | 22 +++++++++++----------- .../clients/producer/internals/SenderTest.java | 6 +++--- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 9825990..2a4cab9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -196,7 +196,7 @@ public class KafkaProducer implements Producer { this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); @@ -377,7 +377,7 @@ public class KafkaProducer implements Producer { ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback); + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index daa72c3..6afcfbf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -149,7 +149,7 @@ public final class RecordAccumulator { * @param value The value for the record * @param callback The user-supplied callback to execute when the request is complete */ - public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { + public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { // We use the ReadWriteLock to make sure append is atomic to close. This will avoid the situation where the // last batch is missed. closeLock.readLock().lock(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 2866595..98dc6ce 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -70,10 +70,10 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); @@ -92,7 +92,7 @@ public class RecordAccumulatorTest { public void testAppendLarge() throws Exception { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); + accum.append(tp1, key, new byte[2 * batchSize], null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -100,7 +100,7 @@ public class RecordAccumulatorTest { public void testLinger() throws Exception { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -122,7 +122,7 @@ public class RecordAccumulatorTest { List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, key, value, CompressionType.NONE, null); + accum.append(tp, key, value, null); } assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -143,7 +143,7 @@ public class RecordAccumulatorTest { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition(topic, i % numParts), key, value, null); } catch (Exception e) { e.printStackTrace(); } @@ -183,7 +183,7 @@ public class RecordAccumulatorTest { // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); @@ -192,14 +192,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, key, value, CompressionType.NONE, null); + accum.append(tp3, key, value, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, key, value, CompressionType.NONE, null); + accum.append(tp2, key, value, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -212,7 +212,7 @@ public class RecordAccumulatorTest { long lingerMs = Long.MAX_VALUE; final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition(topic, i % 3), key, value, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -243,7 +243,7 @@ public class RecordAccumulatorTest { } } for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, new TestCallback()); + accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback()); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 7f69b3a..8b1805d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -72,7 +72,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { long offset = 0; - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -99,7 +99,7 @@ public class SenderTest { time, "clientId"); // do a successful retry - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals(1, client.inFlightRequestCount()); @@ -116,7 +116,7 @@ public class SenderTest { assertEquals(offset, future.get().offset()); // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().request().destination()); -- 1.8.3.4 (Apple Git-47)