From 15a78afdb0de07a84aaa09dfd96f172fbde6aa55 Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 8 Mar 2015 21:01:40 -0700 Subject: [PATCH 1/3] Patch for KAFKA-1660 add a close method with timeout to producer. --- .../kafka/clients/producer/KafkaProducer.java | 36 +++++++-- .../kafka/clients/producer/MockProducer.java | 5 ++ .../apache/kafka/clients/producer/Producer.java | 7 ++ .../producer/internals/RecordAccumulator.java | 13 ++++ .../kafka/clients/producer/internals/Sender.java | 27 +++++-- .../kafka/common/serialization/Serializer.java | 4 +- .../producer/internals/RecordAccumulatorTest.java | 26 +++++++ .../integration/kafka/api/ProducerSendTest.scala | 86 +++++++++++++++++++++- 8 files changed, 190 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7397e56..6fa503a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -218,7 +218,7 @@ public class KafkaProducer implements Producer { this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); @@ -527,17 +527,41 @@ public class KafkaProducer implements Producer { /** * Close this producer. This method blocks until all previously sent requests complete. + * This method is equivalent to close(0, TimeUnit.MILLISECONDS). * @throws InterruptException If the thread is interrupted while blocked */ @Override public void close() { - log.trace("Closing the Kafka producer."); + close(0, TimeUnit.MILLISECONDS); + } + + /** + * This method waits up to timeout for the producer to complete previous send requests. + * If producer was not able to finish before timeout, this method will fail the incomplete send requests + * and close the producer forcefully. When InterruptException is thrown, user should retry, otherwise + * there might be metric leak. + * @param timeout The max time to wait for producer complete send requests. + * Wait forever when timeout is 0. + * Does not wait when timeout is negative. + * @param timeUnit The time unit for timeout + * @throws InterruptException If the thread is interrupted while blocked + */ + @Override + public void close(long timeout, TimeUnit timeUnit) { + log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); this.sender.initiateClose(); - try { - this.ioThread.join(); - } catch (InterruptedException e) { - throw new InterruptException(e); + if (timeout >= 0) { + try { + this.ioThread.join(timeUnit.toMillis(timeout)); + } catch (InterruptedException e) { + throw new KafkaException(e); + } } + + if (this.ioThread.isAlive()) { + this.sender.forceClose(); + } + this.metrics.close(); this.keySerializer.close(); this.valueSerializer.close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 6913090..3c34610 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.Partitioner; @@ -146,6 +147,10 @@ public class MockProducer implements Producer { public void close() { } + @Override + public void close(long timeout, TimeUnit timeUnit) { + } + /** * Get the list of sent records since the last call to {@link #clear()} */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 5b3e75e..cbf6e51 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; @@ -67,4 +68,10 @@ public interface Producer extends Closeable { */ public void close(); + /** + * Tries to close the producer cleanly until timeout is expired,force closes the producer after the timeout expires + * discarding any pending messages. + */ + public void close(long timeout, TimeUnit unit); + } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index d5c79e2..1faaad4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.MetricName; @@ -355,6 +356,18 @@ public final class RecordAccumulator { } /** + * This function is only called when sender is closed forcefully. It will fail all the + * incomplete batches and return. + */ + public void failAllIncompleteBatches() { + for (RecordBatch batch : incomplete.all()) { + incomplete.remove(batch); + batch.done(-1L, new InterruptException("Producer is closed forcefully.", new InterruptedException())); + } + this.batches.clear(); + } + + /** * Close this accumulator and force all the record buffers to be drained */ public void close() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index ed9c63a..121bb92 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -83,6 +83,9 @@ public class Sender implements Runnable { /* true while the sender thread is still running */ private volatile boolean running; + /* true when the caller wants to ignore all unsent/inflight messages and force close. */ + private volatile boolean forceClose; + /* metrics */ private final SenderMetrics sensors; @@ -132,12 +135,18 @@ public class Sender implements Runnable { // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { - try { - run(time.milliseconds()); - } catch (Exception e) { - log.error("Uncaught error in kafka producer I/O thread: ", e); + if (!this.forceClose) { + while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { + try { + run(time.milliseconds()); + } catch (Exception e) { + log.error("Uncaught error in kafka producer I/O thread: ", e); + } } + } else { + // We need to fail all the incomplete batches and wake up the threads waiting on + // the futures. + this.accumulator.failAllIncompleteBatches(); } this.client.close(); @@ -209,6 +218,14 @@ public class Sender implements Runnable { } /** + * Closes the sender without sending out any pending messages. + */ + public void forceClose() { + this.forceClose = true; + initiateClose(); + } + + /** * Handle a produce response */ private void handleProduceResponse(ClientResponse response, Map batches, long now) { diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java index c2fdc23..50f8703 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -38,7 +38,9 @@ public interface Serializer { public byte[] serialize(String topic, T data); /** - * Close this serializer + * Close this serializer. + * This method has to be idempotent if the serializer is used in KafkaProducer because it might be called + * multiple times. */ public void close(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index c1bc406..f63a542 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -26,7 +26,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -225,4 +228,27 @@ public class RecordAccumulatorTest { assertFalse(accum.hasUnsent()); } + @Test + public void testFailAllIncomplete() throws Exception { + long lingerMs = Long.MAX_VALUE; + final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + class TestCallback implements Callback { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + assertTrue(exception.getMessage().equals("Producer is closed forcefully.")); + numExceptionReceivedInCallback.incrementAndGet(); + } + } + for (int i = 0; i < 100; i++) + accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, new TestCallback()); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + + accum.failAllIncompleteBatches(); + assertEquals(numExceptionReceivedInCallback.get(), 100); + assertFalse(accum.hasUnsent()); + + } + } diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 3df4507..7dc1655 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -18,6 +18,7 @@ package kafka.api import java.lang.{Integer, IllegalArgumentException} +import java.util.concurrent.TimeUnit import org.apache.kafka.clients.producer._ import org.scalatest.junit.JUnit3Suite @@ -29,7 +30,7 @@ import kafka.utils.{TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.integration.KafkaServerTestHarness -import org.apache.kafka.common.errors.SerializationException +import org.apache.kafka.common.errors.{InterruptException, SerializationException} import java.util.Properties import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.serialization.ByteArraySerializer @@ -322,6 +323,87 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer.close() } } - + + /** + * Test close with timeout + */ + @Test + def testCloseWithTimeout() { + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val leader0 = leaders(0) + val leader1 = leaders(1) + + // create record + val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes) + + // Test closing from caller thread. + for(i <- 0 until 50) { + producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + val responses = (0 until numRecords) map (i => producer.send(record0)) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + producer.close(-1, TimeUnit.MILLISECONDS) + responses.foreach { future => + try { + future.get() + // No message should be sent successfully + assertTrue(false); + } catch { + case e: Exception => + assertEquals("org.apache.kafka.common.errors.InterruptException: Producer is closed forcefully.", e.getMessage) + } + } + val fetchResponse = if (leader0.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) + } + + // Test closing from sender thread. + class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback { + override def onCompletion(metadata: RecordMetadata, exception: Exception) { + // Trigger another batch in accumulator before close the producer. These messages should + // not be sent. + (0 until numRecords) map (i => producer.send(record1)) + // The close call will be called by all the message callbacks. This is to test idempotent. + producer.close(-1, TimeUnit.MILLISECONDS) + } + } + for(i <- 1 until 51) { + producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + // send message to partition 0 + var responses = (0 until numRecords) map (i => producer.send(record0)) + // send message to partition 1 + responses ++= ((0 until numRecords) map (i => producer.send(record1, new CloseCallback(producer)))) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + // flush the messages. + producer.flush() + assertTrue("All request are complete.", responses.forall(_.isDone())) + // Check the messages received by broker. + val fetchResponse0 = if (leader0.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) + } + val fetchResponse1 = if (leader1.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + } + assertEquals("Fetch response to partition 0 should have 100 message returned for each iteration.", + i * numRecords, fetchResponse0.messageSet(topic, 0).size) + assertEquals("Fetch response to partition 1 should have 100 message returned for each iteration.", + i * numRecords, fetchResponse1.messageSet(topic, 1).size) + } + } finally { + if (producer != null) + producer.close() + } + } } -- 1.8.3.4 (Apple Git-47) From 9eebaef94a0847525e9c4972a69afeaaea76392e Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 8 Mar 2015 21:14:33 -0700 Subject: [PATCH 2/3] A minor fix. --- .../apache/kafka/clients/producer/internals/Sender.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 121bb92..8390c2e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -135,15 +135,14 @@ public class Sender implements Runnable { // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - if (!this.forceClose) { - while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) { - try { - run(time.milliseconds()); - } catch (Exception e) { - log.error("Uncaught error in kafka producer I/O thread: ", e); - } + while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) { + try { + run(time.milliseconds()); + } catch (Exception e) { + log.error("Uncaught error in kafka producer I/O thread: ", e); } - } else { + } + if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. this.accumulator.failAllIncompleteBatches(); -- 1.8.3.4 (Apple Git-47) From 8be80d4c4343b297d23534d436f5f580b6eb90ce Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 9 Mar 2015 12:56:15 -0700 Subject: [PATCH 3/3] Incorporated Guozhang's comments. --- .../kafka/clients/producer/KafkaProducer.java | 4 +- .../apache/kafka/clients/producer/Producer.java | 2 +- .../producer/internals/RecordAccumulator.java | 4 +- .../kafka/clients/producer/internals/Sender.java | 2 +- .../kafka/common/errors/InterruptException.java | 5 +++ .../producer/internals/RecordAccumulatorTest.java | 21 ++++----- .../integration/kafka/api/ProducerSendTest.scala | 52 +++++++++++++++------- 7 files changed, 57 insertions(+), 33 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 6fa503a..ed7957a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -549,12 +549,12 @@ public class KafkaProducer implements Producer { @Override public void close(long timeout, TimeUnit timeUnit) { log.trace("Closing the Kafka producer with timeoutMillis = {}.", timeUnit.toMillis(timeout)); - this.sender.initiateClose(); if (timeout >= 0) { + this.sender.initiateClose(); try { this.ioThread.join(timeUnit.toMillis(timeout)); } catch (InterruptedException e) { - throw new KafkaException(e); + throw new InterruptException(e); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index cbf6e51..7a1deea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -69,7 +69,7 @@ public interface Producer extends Closeable { public void close(); /** - * Tries to close the producer cleanly until timeout is expired,force closes the producer after the timeout expires + * Tries to close the producer cleanly until timeout is expired, force closes the producer after the timeout expires * discarding any pending messages. */ public void close(long timeout, TimeUnit unit); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 1faaad4..16b53a2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -359,10 +359,10 @@ public final class RecordAccumulator { * This function is only called when sender is closed forcefully. It will fail all the * incomplete batches and return. */ - public void failAllIncompleteBatches() { + public void abortIncompleteBatches() { for (RecordBatch batch : incomplete.all()) { incomplete.remove(batch); - batch.done(-1L, new InterruptException("Producer is closed forcefully.", new InterruptedException())); + batch.done(-1L, new InterruptException("Producer is closed forcefully.")); } this.batches.clear(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8390c2e..e5de8e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -145,7 +145,7 @@ public class Sender implements Runnable { if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. - this.accumulator.failAllIncompleteBatches(); + this.accumulator.abortIncompleteBatches(); } this.client.close(); diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java index fee322f..3680f1b 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java @@ -31,4 +31,9 @@ public class InterruptException extends KafkaException { Thread.currentThread().interrupt(); } + public InterruptException(String message) { + super(message, new InterruptedException()); + Thread.currentThread().interrupt(); + } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index f63a542..31fbfd4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -64,11 +64,12 @@ public class RecordAccumulatorTest { private Metrics metrics = new Metrics(time); String metricGroup = "TestMetrics"; Map metricTags = new LinkedHashMap(); + RecordAccumulator accum = null; @Test public void testFull() throws Exception { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp1, key, value, CompressionType.NONE, null); @@ -91,7 +92,7 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -99,7 +100,7 @@ public class RecordAccumulatorTest { @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); @@ -116,7 +117,7 @@ public class RecordAccumulatorTest { @Test public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize + 1; List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { @@ -135,7 +136,7 @@ public class RecordAccumulatorTest { final int numThreads = 5; final int msgs = 10000; final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags); + accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { @@ -175,7 +176,7 @@ public class RecordAccumulatorTest { public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); // Just short of going over the limit so we trigger linger time int appends = 1024 / msgSize; @@ -208,7 +209,7 @@ public class RecordAccumulatorTest { @Test public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); @@ -229,10 +230,10 @@ public class RecordAccumulatorTest { } @Test - public void testFailAllIncomplete() throws Exception { + public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -245,7 +246,7 @@ public class RecordAccumulatorTest { RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); - accum.failAllIncompleteBatches(); + accum.abortIncompleteBatches(); assertEquals(numExceptionReceivedInCallback.get(), 100); assertFalse(accum.hasUnsent()); diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 7dc1655..3e22e67 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,23 +17,21 @@ package kafka.api -import java.lang.{Integer, IllegalArgumentException} +import java.util.Properties import java.util.concurrent.TimeUnit -import org.apache.kafka.clients.producer._ -import org.scalatest.junit.JUnit3Suite -import org.junit.Test -import org.junit.Assert._ - -import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer -import kafka.message.Message import kafka.integration.KafkaServerTestHarness -import org.apache.kafka.common.errors.{InterruptException, SerializationException} -import java.util.Properties +import kafka.message.Message +import kafka.server.KafkaConfig +import kafka.utils.{TestUtils, TestZKUtils} +import org.apache.kafka.clients.producer._ import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.errors.SerializationException import org.apache.kafka.common.serialization.ByteArraySerializer +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnit3Suite class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { @@ -325,10 +323,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } /** - * Test close with timeout + * Test close with timeout from caller thread */ @Test - def testCloseWithTimeout() { + def testCloseWithTimeoutFromCallerThread() { var producer: KafkaProducer[Array[Byte],Array[Byte]] = null try { // create topic @@ -363,6 +361,27 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size) } + } finally { + if (producer != null) + producer.close() + } + } + + /** + * Test close with timeout from sender thread + */ + @Test + def testCloseWithTimeoutFromSenderThread() { + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val leader0 = leaders(0) + val leader1 = leaders(1) + + // create record + val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes) // Test closing from sender thread. class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback { @@ -374,7 +393,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { producer.close(-1, TimeUnit.MILLISECONDS) } } - for(i <- 1 until 51) { + for(i <- 0 until 50) { producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) // send message to partition 0 var responses = (0 until numRecords) map (i => producer.send(record0)) @@ -396,14 +415,13 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) } assertEquals("Fetch response to partition 0 should have 100 message returned for each iteration.", - i * numRecords, fetchResponse0.messageSet(topic, 0).size) + (i + 1) * numRecords, fetchResponse0.messageSet(topic, 0).size) assertEquals("Fetch response to partition 1 should have 100 message returned for each iteration.", - i * numRecords, fetchResponse1.messageSet(topic, 1).size) + (i + 1) * numRecords, fetchResponse1.messageSet(topic, 1).size) } } finally { if (producer != null) producer.close() } } - } -- 1.8.3.4 (Apple Git-47)