From 9e68d9c389319de29f717cc4a536d24facc74aec Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Sat, 7 Feb 2015 12:01:51 -0800 Subject: [PATCH] KAFKA-1865 Add a flush() method to the producer. --- .../java/org/apache/kafka/clients/Metadata.java | 10 +- .../kafka/clients/producer/KafkaProducer.java | 186 +++++++++++++++++---- .../kafka/clients/producer/MockProducer.java | 5 + .../apache/kafka/clients/producer/Producer.java | 5 + .../kafka/clients/producer/ProducerRecord.java | 20 ++- .../producer/internals/FutureRecordMetadata.java | 10 +- .../producer/internals/RecordAccumulator.java | 34 +++- .../clients/producer/internals/RecordBatch.java | 13 +- .../kafka/common/errors/InterruptException.java | 34 ++++ .../org/apache/kafka/common/utils/SystemTime.java | 2 +- .../kafka/clients/producer/BufferPoolTest.java | 2 +- .../kafka/clients/producer/MetadataTest.java | 4 +- .../kafka/clients/producer/MockProducerTest.java | 6 + .../clients/producer/RecordAccumulatorTest.java | 23 +++ .../scala/integration/kafka/api/ConsumerTest.scala | 2 +- .../integration/kafka/api/ProducerSendTest.scala | 62 +++++-- .../test/scala/unit/kafka/utils/TestUtils.scala | 4 +- 17 files changed, 336 insertions(+), 86 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index e8afecd..c8bde8b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -99,19 +99,15 @@ public final class Metadata { /** * Wait for metadata update until the current version is larger than the last version we know of */ - public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) { + public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { - try { - if (remainingWaitMs != 0) { - wait(remainingWaitMs); - } - } catch (InterruptedException e) { /* this is fine */ - } + if (remainingWaitMs != 0) + wait(remainingWaitMs); long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1fd6917..3f95a46 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; @@ -55,10 +56,66 @@ import org.slf4j.LoggerFactory; /** * A Kafka client that publishes records to the Kafka cluster. *

- * The producer is thread safe and should generally be shared among all threads for best performance. + * The producer is thread safe and sharing a single producer instance across threads will generally be faster than + * having multiple instances. *

- * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it - * needs to communicate with. Failure to close the producer after use will leak these resources. + * Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value + * pairs. + *

+ * {@code
+ * Properties props = new Properties();
+ * props.put("bootstrap.servers", "localhost:4242");
+ * props.put("acks", "all");
+ * props.put("retries", 0);
+ * props.put("batch.size", 16384);
+ * props.put("linger.ms", 1);
+ * props.put("buffer.memory", 33554432);
+ * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ * 
+ * Producer producer = new KafkaProducer(props);
+ * for(int i = 0; i < 100; i++)
+ *     producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i)));
+ * 
+ * producer.close();
+ * }
+ *

+ * The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server + * as well as a background I/O thread that is responsible for turning these records into requests and transmitting them + * to the cluster. Failure to close the producer after use will leak these resources. + *

+ * The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends + * and immediately returns. This allows the producer to batch together individual records for efficiency. + *

+ * The acks config controls the criteria under which requests are considered complete. The "all" setting + * we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. + *

+ * If the request fails, the producer can automatically retry, though since we have specified retries + * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on + * message delivery semantics for details). + *

+ * The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by + * the batch.size config. Making this larger can result in more batching, but requires more memory (since we will + * generally have one of these buffers for each active partition). + *

+ * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you + * want to reduce the number of requests you can set linger.ms to something greater than 0. This will + * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will + * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, + * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting + * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that + * records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load + * batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more + * efficient requests when not under maximal load at the cost of a small amount of latency. + *

+ * The buffer.memory controls the total amount of memory available to the producer for buffering. If records + * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is + * exhausted additional send calls will block. For uses where you want to avoid any blocking you can set block.on.buffer.full=false which + * will cause the send call to result in an exception. + *

+ * The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with + * their ProducerRecord into bytes. You can use the included {@link org.apache.kafka.common.serialization.ByteArraySerializer} or + * {@link org.apache.kafka.common.serialization.StringSerializer} for simple string or byte types. */ public class KafkaProducer implements Producer { @@ -241,8 +298,8 @@ public class KafkaProducer implements Producer { } /** - * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)} - * @param record The record to be sent + * Asynchronously send a record to a topic. Equivalent to send(record, null). + * See {@link #send(ProducerRecord, Callback)} for details. */ @Override public Future send(ProducerRecord record) { @@ -261,53 +318,55 @@ public class KafkaProducer implements Producer { *

* Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get() - * get()} on this future will result in the metadata for the record or throw any exception that occurred while - * sending the record. + * get()} on this future will block until the associated request completes and then return the metadata for the record + * or throw any exception that occurred while sending the record. *

- * If you want to simulate a simple blocking call you can do the following: + * If you want to simulate a simple blocking call you can call the get() method immediately: * - *

{@code
-     * producer.send(new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes())).get();
+     * 
+     * {@code
+     * byte[] key = "key".getBytes();
+     * byte[] value = "value".getBytes();
+     * ProducerRecord record = new ProducerRecord("my-topic", key, value)
+     * producer.send(record).get();
      * }
*

- * Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that + * Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. * - *

{@code
-     * ProducerRecord record = new ProducerRecord("the-topic", "key".getBytes(), "value".getBytes());
-     *   producer.send(myRecord,
-     *                new Callback() {
-     *                     public void onCompletion(RecordMetadata metadata, Exception e) {
-     *                         if(e != null)
-     *                             e.printStackTrace();
-     *                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
-     *                     }
-     *                });
-     * }
+ *
+     * {@code
+     * ProducerRecord record = new ProducerRecord("the-topic", key, value);
+     * producer.send(myRecord,
+     *               new Callback() {
+     *                   public void onCompletion(RecordMetadata metadata, Exception e) {
+     *                       if(e != null)
+     *                           e.printStackTrace();
+     *                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
+     *                   }
+     *               });
+     * }
+     * 
* * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example callback1 is guaranteed to execute before callback2: * - *
{@code
+     * 
+     * {@code
      * producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
      * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
-     * }
+ * } + *
*

* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or * they will delay the sending of messages from other threads. If you want to execute blocking or computationally * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body * to parallelize processing. - *

- * The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is - * controlled by the configuration total.memory.bytes. If send() is called faster than the - * I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in - * this case is to block the send call until the I/O thread catches up and more buffer space is available. However - * in cases where non-blocking usage is desired the setting block.on.buffer.full=false will cause the - * producer to instead throw an exception when buffer memory is exhausted. * * @param record The record to send * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null * indicates no callback) + * @throws InterruptException If the thread is interrupted while blocked */ @Override public Future send(ProducerRecord record, Callback callback) { @@ -352,7 +411,7 @@ public class KafkaProducer implements Producer { return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); - throw new KafkaException(e); + throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); throw e; @@ -364,7 +423,7 @@ public class KafkaProducer implements Producer { * @param topic The topic we want metadata for * @param maxWaitMs The maximum time in ms for waiting on the metadata */ - private void waitOnMetadata(String topic, long maxWaitMs) { + private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { if (metadata.fetch().partitionsForTopic(topic) != null) { return; } else { @@ -399,20 +458,75 @@ public class KafkaProducer implements Producer { ProducerConfig.BUFFER_MEMORY_CONFIG + " configuration."); } + + /** + * Invoking this method makes all buffered records immediately available to send (even if linger.ms is + * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition + * of flush() is that any previously sent record will have completed (e.g. Future.isDone() == true). + * A request is considered completed when it is successfully acknowledged + * according to the acks configuration you have specified or else it results in an error. + *

+ * Other threads can continue sending records while one thread is blocked waiting for a flush call to complete, + * however no guarantee is made about the completion of records sent after the flush call begins. + *

+ * This method can be useful when consuming from some input system and producing into Kafka. The flush() call + * gives a convenient way to ensure all previously sent messages have actually completed. + *

+ * This example shows how to consume from one Kafka topic and produce to another Kafka topic: + *

+     * {@code
+     * for(ConsumerRecord record: consumer.poll(100))
+     *     producer.send(new ProducerRecord("my-topic", record.key(), record.value());
+     * producer.flush();
+     * consumer.commit();
+     * }
+     * 
+ * + * Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur + * we need to set retries=<large_number> in our config. + * + * @throws InterruptException If the thread is interrupted while blocked + */ + @Override + public void flush() { + // only allow a single flush at a time to avoid races between simultaneous flushes + synchronized (this) { + log.trace("Flushing accumulated records in producer."); + this.accumulator.beginFlush(); + this.sender.wakeup(); + try { + this.accumulator.awaitFlushCompletion(); + } catch (InterruptedException e) { + throw new InterruptException("Flush interrupted.", e); + } + } + } + /** + * Get the partition metadata for the give topic. This can be used for custom partitioning. + * @throws InterruptException If the thread is interrupted while blocked + */ @Override public List partitionsFor(String topic) { - waitOnMetadata(topic, this.metadataFetchTimeoutMs); + try { + waitOnMetadata(topic, this.metadataFetchTimeoutMs); + } catch (InterruptedException e) { + throw new InterruptException(e); + } return this.metadata.fetch().partitionsForTopic(topic); } + /** + * Get the full set of internal metrics maintained by the producer. + */ @Override public Map metrics() { return Collections.unmodifiableMap(this.metrics.metrics()); } /** - * Close this producer. This method blocks until all in-flight requests complete. + * Close this producer. This method blocks until all previously sent requests complete. + * @throws InterruptException If the thread is interrupted while blocked */ @Override public void close() { @@ -421,7 +535,7 @@ public class KafkaProducer implements Producer { try { this.ioThread.join(); } catch (InterruptedException e) { - throw new KafkaException(e); + throw new InterruptException(e); } this.metrics.close(); this.keySerializer.close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 84530f2..6913090 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -128,6 +128,11 @@ public class MockProducer implements Producer { return offset; } } + + public synchronized void flush() { + while (!this.completions.isEmpty()) + completeNext(); + } public List partitionsFor(String topic) { return this.cluster.partitionsForTopic(topic); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 17fe541..5b3e75e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -45,6 +45,11 @@ public interface Producer extends Closeable { * Send a record and invoke the given callback when the record has been acknowledged by the server */ public Future send(ProducerRecord record, Callback callback); + + /** + * Flush any accumulated records from the producer. Blocks until all sends are complete. + */ + public void flush(); /** * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java index 4990692..75cd51e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java @@ -102,15 +102,21 @@ public final class ProducerRecord { @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof ProducerRecord)) return false; + if (this == o) + return true; + else if (!(o instanceof ProducerRecord)) + return false; - ProducerRecord that = (ProducerRecord) o; + ProducerRecord that = (ProducerRecord) o; - if (key != null ? !key.equals(that.key) : that.key != null) return false; - if (partition != null ? !partition.equals(that.partition) : that.partition != null) return false; - if (topic != null ? !topic.equals(that.topic) : that.topic != null) return false; - if (value != null ? !value.equals(that.value) : that.value != null) return false; + if (key != null ? !key.equals(that.key) : that.key != null) + return false; + else if (partition != null ? !partition.equals(that.partition) : that.partition != null) + return false; + else if (topic != null ? !topic.equals(that.topic) : that.topic != null) + return false; + else if (value != null ? !value.equals(that.value) : that.value != null) + return false; return true; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java index 4a2da41..e2d9ca8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -51,13 +51,17 @@ public final class FutureRecordMetadata implements Future { return valueOrError(); } - private RecordMetadata valueOrError() throws ExecutionException { + RecordMetadata valueOrError() throws ExecutionException { if (this.result.error() != null) throw new ExecutionException(this.result.error()); else - return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset); + return value(); } - + + RecordMetadata value() { + return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset); + } + public long relativeOffset() { return this.relativeOffset; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ecfe214..5ecbd25 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -55,6 +55,7 @@ public final class RecordAccumulator { private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class); private volatile boolean closed; + private volatile boolean flushing; private int drainIndex; private final int batchSize; private final long lingerMs; @@ -62,6 +63,7 @@ public final class RecordAccumulator { private final BufferPool free; private final Time time; private final ConcurrentMap> batches; + private final Set incomplete; /** * Create a new record accumulator @@ -89,12 +91,14 @@ public final class RecordAccumulator { Map metricTags) { this.drainIndex = 0; this.closed = false; + this.flushing = false; this.batchSize = batchSize; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap>(); String metricGrpName = "producer-metrics"; this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); + this.incomplete = Collections.synchronizedSet(new HashSet()); this.time = time; registerMetrics(metrics, metricGrpName, metricTags); } @@ -146,9 +150,8 @@ public final class RecordAccumulator { RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); - if (future != null) { + if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); - } } } @@ -161,8 +164,7 @@ public final class RecordAccumulator { if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); if (future != null) { - // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen - // often... + // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } @@ -172,6 +174,7 @@ public final class RecordAccumulator { FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); dq.addLast(batch); + incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } @@ -226,7 +229,7 @@ public final class RecordAccumulator { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; - boolean sendable = full || expired || exhausted || closed; + boolean sendable = full || expired || exhausted || closed || flushing; if (sendable && !backingOff) { readyNodes.add(leader); } else { @@ -324,8 +327,29 @@ public final class RecordAccumulator { * Deallocate the record batch */ public void deallocate(RecordBatch batch) { + boolean removed = incomplete.remove(batch); + if (!removed) + throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible."); free.deallocate(batch.records.buffer(), batch.records.capacity()); } + + /** + * Initiate the flushing of data from the accumulator...this makes all requests immediately ready + */ + public void beginFlush() { + this.flushing = true; + } + + /** + * Mark all partitions as ready to send and block until the send is complete + */ + public void awaitFlushCompletion() throws InterruptedException { + // make a copy so that the list of things we wait on doesn't expand while we wait + HashSet copy = new HashSet(this.incomplete); + for (RecordBatch batch: copy) + batch.produceFuture.await(); + this.flushing = false; + } /** * Close this accumulator and force all the record buffers to be drained diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index dd0af8a..06182db 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -16,6 +16,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; @@ -39,7 +40,7 @@ public final class RecordBatch { public long lastAttemptMs; public final MemoryRecords records; public final TopicPartition topicPartition; - private final ProduceRequestResult produceFuture; + public final ProduceRequestResult produceFuture; private final List thunks; public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { @@ -77,7 +78,6 @@ public final class RecordBatch { * @param exception The exception that occurred (or null if the request was successful) */ public void done(long baseOffset, RuntimeException exception) { - this.produceFuture.done(topicPartition, baseOffset, exception); log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, @@ -86,14 +86,17 @@ public final class RecordBatch { for (int i = 0; i < this.thunks.size(); i++) { try { Thunk thunk = this.thunks.get(i); - if (exception == null) - thunk.callback.onCompletion(thunk.future.get(), null); - else + if (exception == null) { + RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset()); + thunk.callback.onCompletion(metadata, null); + } else { thunk.callback.onCompletion(null, exception); + } } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e); } } + this.produceFuture.done(topicPartition, baseOffset, exception); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java new file mode 100644 index 0000000..fee322f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.KafkaException; + +/** + * An unchecked wrapper for InterruptedException + */ +public class InterruptException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public InterruptException(InterruptedException cause) { + super(cause); + Thread.currentThread().interrupt(); + } + + public InterruptException(String message, InterruptedException cause) { + super(message, cause); + Thread.currentThread().interrupt(); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java index d682bd4..18725de 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java @@ -36,7 +36,7 @@ public class SystemTime implements Time { try { Thread.sleep(ms); } catch (InterruptedException e) { - // no stress + // just wake up early } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java index 4ae43ed..df28197 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java @@ -146,7 +146,7 @@ public class BufferPoolTest { int numThreads = 10; final int iterations = 50000; final int poolableSize = 1024; - final int totalMemory = numThreads / 2 * poolableSize; + final long totalMemory = numThreads / 2 * poolableSize; final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 743aa7e..bd82f4a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -83,8 +83,8 @@ public class MetadataTest { while (metadata.fetch().partitionsForTopic(topic) == null) { try { metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs); - } catch (TimeoutException e) { - // let it go + } catch (Exception e) { + e.printStackTrace(); } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 75513b0..5d7baa6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -67,6 +67,12 @@ public class MockProducerTest { assertEquals(e, err.getCause()); } assertFalse("No more requests to complete", producer.completeNext()); + + Future md3 = producer.send(record1); + Future md4 = producer.send(record2); + assertFalse("Requests should not be completed.", md3.isDone() && md4.isDone()); + producer.flush(); + assertTrue("Requests should be completed.", md3.isDone() && md4.isDone()); } private boolean isError(Future future) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 8333863..02a3e5a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -203,5 +203,28 @@ public class RecordAccumulatorTest { // but have leaders with other sendable data. assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs); } + + @Test + public void testFlush() throws Exception { + long lingerMs = Long.MAX_VALUE; + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + for (int i = 0; i < 10; i++) + accum.append(tp1, key, value, CompressionType.NONE, null); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + + accum.beginFlush(); + result = accum.ready(cluster, time.milliseconds()); + + // drain and deallocate all batches + Map> results = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + for (List batches: results.values()) + for (RecordBatch batch: batches) + accum.deallocate(batch); + + // should be complete with no unsent records. + accum.awaitFlushCompletion(); + assertFalse(accum.hasUnsent()); + } } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 2802a39..0db4f22 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -263,7 +263,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { val futures = (0 until numRecords).map { i => this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes)) } - futures.map(_.get) + this.producers(0).flush() } private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int) { diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index b15237b..c282999 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.api.test +package kafka.api import java.lang.{Integer, IllegalArgumentException} @@ -27,7 +27,6 @@ import org.junit.Assert._ import kafka.server.KafkaConfig import kafka.utils.{TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer -import kafka.api.FetchRequestBuilder import kafka.message.Message import kafka.integration.KafkaServerTestHarness import org.apache.kafka.common.errors.SerializationException @@ -66,13 +65,6 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { super.tearDown() } - class CheckErrorCallback extends Callback { - def onCompletion(metadata: RecordMetadata, exception: Exception) { - if (exception != null) - fail("Send callback returns the following exception", exception) - } - } - /** * testSendOffset checks the basic send API behavior * @@ -82,23 +74,36 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testSendOffset() { var producer = TestUtils.createNewProducer(brokerList) - - val callback = new CheckErrorCallback + val partition = new Integer(0) + + object callback extends Callback { + var offset = 0L + def onCompletion(metadata: RecordMetadata, exception: Exception) { + if (exception == null) { + assertEquals(offset, metadata.offset()) + assertEquals(topic, metadata.topic()) + assertEquals(partition, metadata.partition()) + offset += 1 + } else { + fail("Send callback returns the following exception", exception) + } + } + } try { // create topic TestUtils.createTopic(zkClient, topic, 1, 2, servers) // send a normal record - val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes) + val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes) assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset) // send a record with null value should be ok - val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, null) + val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, null) assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset) // send a record with null key should be ok - val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), null, "value".getBytes) + val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, "value".getBytes) assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset) // send a record with null part id should be ok @@ -107,7 +112,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { // send a record with null topic should fail try { - val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, new Integer(0), "key".getBytes, "value".getBytes) + val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, partition, "key".getBytes, "value".getBytes) producer.send(record4, callback) fail("Should not allow sending a record without topic") } catch { @@ -117,7 +122,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { // non-blocking send a list of records for (i <- 1 to numRecords) - producer.send(record0) + producer.send(record0, callback) // check that all messages have been acked via offset assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset) @@ -235,7 +240,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { val responses = for (i <- 1 to numRecords) - yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) val futures = responses.toList futures.map(_.get) for (future <- futures) @@ -294,4 +299,27 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } } } + + /** + * Test that flush immediately sends all accumulated requests. + */ + @Test + def testFlush() { + var producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + try { + TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes) + for(i <- 0 until 500) { + val responses = (0 until numRecords) map (i => producer.send(record)) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + producer.flush() + assertTrue("All requests are complete.", responses.forall(_.isDone())) + } + } finally { + if (producer != null) + producer.close() + } + } + + } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 21d0ed2..69ea792 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -387,7 +387,8 @@ object TestUtils extends Logging { metadataFetchTimeout: Long = 3000L, blockOnBufferFull: Boolean = true, bufferSize: Long = 1024L * 1024L, - retries: Int = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { + retries: Int = 0, + lingerMs: Long = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.producer.ProducerConfig val producerProps = new Properties() @@ -399,6 +400,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) -- 1.9.3 (Apple Git-50)