From 4d6b030e21e42a6db93a082a08e2c4e2f45ec71e Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Sat, 7 Feb 2015 12:01:51 -0800 Subject: [PATCH] KAFKA-1865 Add a flush() method to the producer. --- .../kafka/clients/producer/KafkaProducer.java | 20 ++++++++++++++++ .../kafka/clients/producer/MockProducer.java | 5 ++++ .../apache/kafka/clients/producer/Producer.java | 5 ++++ .../producer/internals/RecordAccumulator.java | 27 +++++++++++++++++++++- .../clients/producer/internals/RecordBatch.java | 2 +- .../kafka/clients/producer/MockProducerTest.java | 6 +++++ .../clients/producer/RecordAccumulatorTest.java | 16 +++++++++++++ .../integration/kafka/api/ProducerSendTest.scala | 26 ++++++++++++++++++--- .../test/scala/unit/kafka/utils/TestUtils.scala | 4 +++- 9 files changed, 105 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1fd6917..c6b83ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -399,6 +399,26 @@ public class KafkaProducer implements Producer { ProducerConfig.BUFFER_MEMORY_CONFIG + " configuration."); } + + /** + * Immediately flush any records currently buffered in the producer. This will cause any unsent records to immediately + * be sent regardless of the configured linger.ms setting. The post condition of this method is that all previously + * sends will have completed (either successfully or with an error). + */ + @Override + public void flush() { + // only allow a single flush at a time to avoid races between simultaneous flushes + synchronized (this) { + log.trace("Flushing accumulated records in producer."); + this.accumulator.beginFlush(); + this.sender.wakeup(); + try { + this.accumulator.awaitFlushCompletion(); + } catch (InterruptedException e) { + throw new KafkaException("Flush interrupted.", e); + } + } + } @Override public List partitionsFor(String topic) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 84530f2..6913090 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -128,6 +128,11 @@ public class MockProducer implements Producer { return offset; } } + + public synchronized void flush() { + while (!this.completions.isEmpty()) + completeNext(); + } public List partitionsFor(String topic) { return this.cluster.partitionsForTopic(topic); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 17fe541..5b3e75e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -45,6 +45,11 @@ public interface Producer extends Closeable { * Send a record and invoke the given callback when the record has been acknowledged by the server */ public Future send(ProducerRecord record, Callback callback); + + /** + * Flush any accumulated records from the producer. Blocks until all sends are complete. + */ + public void flush(); /** * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ecfe214..7d31527 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -55,6 +55,7 @@ public final class RecordAccumulator { private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class); private volatile boolean closed; + private volatile boolean flushing; private int drainIndex; private final int batchSize; private final long lingerMs; @@ -89,6 +90,7 @@ public final class RecordAccumulator { Map metricTags) { this.drainIndex = 0; this.closed = false; + this.flushing = false; this.batchSize = batchSize; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; @@ -226,7 +228,7 @@ public final class RecordAccumulator { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; - boolean sendable = full || expired || exhausted || closed; + boolean sendable = full || expired || exhausted || closed || flushing; if (sendable && !backingOff) { readyNodes.add(leader); } else { @@ -326,6 +328,29 @@ public final class RecordAccumulator { public void deallocate(RecordBatch batch) { free.deallocate(batch.records.buffer(), batch.records.capacity()); } + + public void beginFlush() { + this.flushing = true; + } + + /** + * Mark all partitions as ready to send and block until the send is complete + */ + public void awaitFlushCompletion() throws InterruptedException { + for (Deque deque: this.batches.values()) { + // lock the deque and make a copy + List batches = null; + synchronized (deque) { + if (deque.size() > 0) + batches = new ArrayList(deque); + } + if (batches != null) { + for (RecordBatch batch: batches) + batch.produceFuture.await(); + } + } + this.flushing = false; + } /** * Close this accumulator and force all the record buffers to be drained diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index dd0af8a..ba837ef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -39,7 +39,7 @@ public final class RecordBatch { public long lastAttemptMs; public final MemoryRecords records; public final TopicPartition topicPartition; - private final ProduceRequestResult produceFuture; + public final ProduceRequestResult produceFuture; private final List thunks; public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 75513b0..5d7baa6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -67,6 +67,12 @@ public class MockProducerTest { assertEquals(e, err.getCause()); } assertFalse("No more requests to complete", producer.completeNext()); + + Future md3 = producer.send(record1); + Future md4 = producer.send(record2); + assertFalse("Requests should not be completed.", md3.isDone() && md4.isDone()); + producer.flush(); + assertTrue("Requests should be completed.", md3.isDone() && md4.isDone()); } private boolean isError(Future future) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 8333863..0e1fa27 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -203,5 +203,21 @@ public class RecordAccumulatorTest { // but have leaders with other sendable data. assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs); } + + @Test + public void testFlush() throws Exception { + long lingerMs = Long.MAX_VALUE; + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + for (int i = 0; i < 10; i++) + accum.append(tp1, key, value, CompressionType.NONE, null); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + + accum.beginFlush(); + result = accum.ready(cluster, time.milliseconds()); + accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + accum.awaitFlushCompletion(); + assertFalse(accum.hasUnsent()); + } } diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index b15237b..b9bda9d 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.api.test +package kafka.api import java.lang.{Integer, IllegalArgumentException} @@ -27,7 +27,6 @@ import org.junit.Assert._ import kafka.server.KafkaConfig import kafka.utils.{TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer -import kafka.api.FetchRequestBuilder import kafka.message.Message import kafka.integration.KafkaServerTestHarness import org.apache.kafka.common.errors.SerializationException @@ -235,7 +234,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { val responses = for (i <- 1 to numRecords) - yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) val futures = responses.toList futures.map(_.get) for (future <- futures) @@ -294,4 +293,25 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } } } + + /** + * Test that flush immediately sends all accumulated requests. + */ + @Test + def testFlush() { + var producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + try { + TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes) + val responses = (0 until numRecords) map (i => producer.send(record)) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + producer.flush() + assertTrue("All requests are complete.", responses.forall(_.isDone())) + } finally { + if (producer != null) + producer.close() + } + } + + } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 54755e8..569aa86 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -384,7 +384,8 @@ object TestUtils extends Logging { metadataFetchTimeout: Long = 3000L, blockOnBufferFull: Boolean = true, bufferSize: Long = 1024L * 1024L, - retries: Int = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { + retries: Int = 0, + lingerMs: Long = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.producer.ProducerConfig val producerProps = new Properties() @@ -396,6 +397,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) -- 1.9.3 (Apple Git-50)