From 12a584da9190866628ed2937d6c5dd0d75ecd7f4 Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Sat, 7 Feb 2015 12:01:51 -0800 Subject: [PATCH] KAFKA-1865 Add a flush() method to the producer. --- .../java/org/apache/kafka/clients/Metadata.java | 10 ++---- .../kafka/clients/producer/KafkaProducer.java | 40 +++++++++++++++++++--- .../kafka/clients/producer/MockProducer.java | 5 +++ .../apache/kafka/clients/producer/Producer.java | 5 +++ .../producer/internals/FutureRecordMetadata.java | 10 ++++-- .../producer/internals/RecordAccumulator.java | 27 ++++++++++++++- .../clients/producer/internals/RecordBatch.java | 6 ++-- .../kafka/common/errors/InterruptException.java | 34 ++++++++++++++++++ .../org/apache/kafka/common/utils/SystemTime.java | 2 +- .../kafka/clients/producer/BufferPoolTest.java | 2 +- .../kafka/clients/producer/MetadataTest.java | 4 +-- .../kafka/clients/producer/MockProducerTest.java | 6 ++++ .../clients/producer/RecordAccumulatorTest.java | 16 +++++++++ .../integration/kafka/api/ProducerSendTest.scala | 26 ++++++++++++-- .../test/scala/unit/kafka/utils/TestUtils.scala | 4 ++- 15 files changed, 171 insertions(+), 26 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index b8cdd14..2d3cf6e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -99,19 +99,15 @@ public final class Metadata { /** * Wait for metadata update until the current version is larger than the last version we know of */ - public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) { + public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { - try { - if (remainingWaitMs != 0) { - wait(remainingWaitMs); - } - } catch (InterruptedException e) { /* this is fine */ - } + if (remainingWaitMs != 0) + wait(remainingWaitMs); long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1fd6917..ed85a86 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; @@ -308,6 +309,7 @@ public class KafkaProducer implements Producer { * @param record The record to send * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null * indicates no callback) + * @throws InterruptException If the thread is interrupted while blocked */ @Override public Future send(ProducerRecord record, Callback callback) { @@ -352,7 +354,7 @@ public class KafkaProducer implements Producer { return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); - throw new KafkaException(e); + throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); throw e; @@ -364,7 +366,7 @@ public class KafkaProducer implements Producer { * @param topic The topic we want metadata for * @param maxWaitMs The maximum time in ms for waiting on the metadata */ - private void waitOnMetadata(String topic, long maxWaitMs) { + private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { if (metadata.fetch().partitionsForTopic(topic) != null) { return; } else { @@ -399,10 +401,39 @@ public class KafkaProducer implements Producer { ProducerConfig.BUFFER_MEMORY_CONFIG + " configuration."); } + + /** + * Immediately flush any records currently buffered in the producer. This will cause any unsent records to immediately + * be sent regardless of the configured linger.ms setting. The post condition of this method is that all previously + * sends will have completed (either successfully or with an error). + * @throws InterruptException If the thread is interrupted while blocked + */ + @Override + public void flush() { + // only allow a single flush at a time to avoid races between simultaneous flushes + synchronized (this) { + log.trace("Flushing accumulated records in producer."); + this.accumulator.beginFlush(); + this.sender.wakeup(); + try { + this.accumulator.awaitFlushCompletion(); + } catch (InterruptedException e) { + throw new InterruptException("Flush interrupted.", e); + } + } + } + /** + * Get the partition metadata for the give topic + * @throws InterruptException If the thread is interrupted while blocked + */ @Override public List partitionsFor(String topic) { - waitOnMetadata(topic, this.metadataFetchTimeoutMs); + try { + waitOnMetadata(topic, this.metadataFetchTimeoutMs); + } catch (InterruptedException e) { + throw new InterruptException(e); + } return this.metadata.fetch().partitionsForTopic(topic); } @@ -413,6 +444,7 @@ public class KafkaProducer implements Producer { /** * Close this producer. This method blocks until all in-flight requests complete. + * @throws InterruptException If the thread is interrupted while blocked */ @Override public void close() { @@ -421,7 +453,7 @@ public class KafkaProducer implements Producer { try { this.ioThread.join(); } catch (InterruptedException e) { - throw new KafkaException(e); + throw new InterruptException(e); } this.metrics.close(); this.keySerializer.close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 84530f2..6913090 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -128,6 +128,11 @@ public class MockProducer implements Producer { return offset; } } + + public synchronized void flush() { + while (!this.completions.isEmpty()) + completeNext(); + } public List partitionsFor(String topic) { return this.cluster.partitionsForTopic(topic); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 17fe541..5b3e75e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -45,6 +45,11 @@ public interface Producer extends Closeable { * Send a record and invoke the given callback when the record has been acknowledged by the server */ public Future send(ProducerRecord record, Callback callback); + + /** + * Flush any accumulated records from the producer. Blocks until all sends are complete. + */ + public void flush(); /** * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java index 4a2da41..e2d9ca8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -51,13 +51,17 @@ public final class FutureRecordMetadata implements Future { return valueOrError(); } - private RecordMetadata valueOrError() throws ExecutionException { + RecordMetadata valueOrError() throws ExecutionException { if (this.result.error() != null) throw new ExecutionException(this.result.error()); else - return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset); + return value(); } - + + RecordMetadata value() { + return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset); + } + public long relativeOffset() { return this.relativeOffset; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ecfe214..7d31527 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -55,6 +55,7 @@ public final class RecordAccumulator { private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class); private volatile boolean closed; + private volatile boolean flushing; private int drainIndex; private final int batchSize; private final long lingerMs; @@ -89,6 +90,7 @@ public final class RecordAccumulator { Map metricTags) { this.drainIndex = 0; this.closed = false; + this.flushing = false; this.batchSize = batchSize; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; @@ -226,7 +228,7 @@ public final class RecordAccumulator { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; - boolean sendable = full || expired || exhausted || closed; + boolean sendable = full || expired || exhausted || closed || flushing; if (sendable && !backingOff) { readyNodes.add(leader); } else { @@ -326,6 +328,29 @@ public final class RecordAccumulator { public void deallocate(RecordBatch batch) { free.deallocate(batch.records.buffer(), batch.records.capacity()); } + + public void beginFlush() { + this.flushing = true; + } + + /** + * Mark all partitions as ready to send and block until the send is complete + */ + public void awaitFlushCompletion() throws InterruptedException { + for (Deque deque: this.batches.values()) { + // lock the deque and make a copy + List batches = null; + synchronized (deque) { + if (deque.size() > 0) + batches = new ArrayList(deque); + } + if (batches != null) { + for (RecordBatch batch: batches) + batch.produceFuture.await(); + } + } + this.flushing = false; + } /** * Close this accumulator and force all the record buffers to be drained diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index dd0af8a..216f731 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -39,7 +39,7 @@ public final class RecordBatch { public long lastAttemptMs; public final MemoryRecords records; public final TopicPartition topicPartition; - private final ProduceRequestResult produceFuture; + public final ProduceRequestResult produceFuture; private final List thunks; public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { @@ -77,7 +77,6 @@ public final class RecordBatch { * @param exception The exception that occurred (or null if the request was successful) */ public void done(long baseOffset, RuntimeException exception) { - this.produceFuture.done(topicPartition, baseOffset, exception); log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, @@ -87,13 +86,14 @@ public final class RecordBatch { try { Thunk thunk = this.thunks.get(i); if (exception == null) - thunk.callback.onCompletion(thunk.future.get(), null); + thunk.callback.onCompletion(thunk.future.value(), null); else thunk.callback.onCompletion(null, exception); } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e); } } + this.produceFuture.done(topicPartition, baseOffset, exception); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java new file mode 100644 index 0000000..fee322f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.KafkaException; + +/** + * An unchecked wrapper for InterruptedException + */ +public class InterruptException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public InterruptException(InterruptedException cause) { + super(cause); + Thread.currentThread().interrupt(); + } + + public InterruptException(String message, InterruptedException cause) { + super(message, cause); + Thread.currentThread().interrupt(); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java index d682bd4..18725de 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java @@ -36,7 +36,7 @@ public class SystemTime implements Time { try { Thread.sleep(ms); } catch (InterruptedException e) { - // no stress + // just wake up early } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java index 4ae43ed..df28197 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java @@ -146,7 +146,7 @@ public class BufferPoolTest { int numThreads = 10; final int iterations = 50000; final int poolableSize = 1024; - final int totalMemory = numThreads / 2 * poolableSize; + final long totalMemory = numThreads / 2 * poolableSize; final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 743aa7e..bd82f4a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -83,8 +83,8 @@ public class MetadataTest { while (metadata.fetch().partitionsForTopic(topic) == null) { try { metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs); - } catch (TimeoutException e) { - // let it go + } catch (Exception e) { + e.printStackTrace(); } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 75513b0..5d7baa6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -67,6 +67,12 @@ public class MockProducerTest { assertEquals(e, err.getCause()); } assertFalse("No more requests to complete", producer.completeNext()); + + Future md3 = producer.send(record1); + Future md4 = producer.send(record2); + assertFalse("Requests should not be completed.", md3.isDone() && md4.isDone()); + producer.flush(); + assertTrue("Requests should be completed.", md3.isDone() && md4.isDone()); } private boolean isError(Future future) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 8333863..0e1fa27 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -203,5 +203,21 @@ public class RecordAccumulatorTest { // but have leaders with other sendable data. assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs); } + + @Test + public void testFlush() throws Exception { + long lingerMs = Long.MAX_VALUE; + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + for (int i = 0; i < 10; i++) + accum.append(tp1, key, value, CompressionType.NONE, null); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + + accum.beginFlush(); + result = accum.ready(cluster, time.milliseconds()); + accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + accum.awaitFlushCompletion(); + assertFalse(accum.hasUnsent()); + } } diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index b15237b..b9bda9d 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.api.test +package kafka.api import java.lang.{Integer, IllegalArgumentException} @@ -27,7 +27,6 @@ import org.junit.Assert._ import kafka.server.KafkaConfig import kafka.utils.{TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer -import kafka.api.FetchRequestBuilder import kafka.message.Message import kafka.integration.KafkaServerTestHarness import org.apache.kafka.common.errors.SerializationException @@ -235,7 +234,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { val responses = for (i <- 1 to numRecords) - yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) val futures = responses.toList futures.map(_.get) for (future <- futures) @@ -294,4 +293,25 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } } } + + /** + * Test that flush immediately sends all accumulated requests. + */ + @Test + def testFlush() { + var producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + try { + TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes) + val responses = (0 until numRecords) map (i => producer.send(record)) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + producer.flush() + assertTrue("All requests are complete.", responses.forall(_.isDone())) + } finally { + if (producer != null) + producer.close() + } + } + + } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 21d0ed2..69ea792 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -387,7 +387,8 @@ object TestUtils extends Logging { metadataFetchTimeout: Long = 3000L, blockOnBufferFull: Boolean = true, bufferSize: Long = 1024L * 1024L, - retries: Int = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { + retries: Int = 0, + lingerMs: Long = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.producer.ProducerConfig val producerProps = new Properties() @@ -399,6 +400,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) -- 1.9.3 (Apple Git-50)